Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions src/mpi/orbit_mpi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@
#include <ctime>

#if !defined(DOXYGEN_SHOULD_SKIP_THIS)

#if USE_MPI == 0
static std::size_t ORBIT_MPI_Type_size(MPI_Datatype data) {
switch(data){
case MPI_CHAR: return sizeof(char);
case MPI_UNSIGNED_CHAR: return sizeof(unsigned char);
case MPI_BYTE: return sizeof(char);
case MPI_SHORT: return sizeof(short);
case MPI_UNSIGNED_SHORT: return sizeof(unsigned short);
case MPI_INT: return sizeof(int);
case MPI_UNSIGNED: return sizeof(unsigned);
case MPI_LONG: return sizeof(long);
case MPI_UNSIGNED_LONG: return sizeof(unsigned long);
case MPI_FLOAT: return sizeof(float);
case MPI_DOUBLE: return sizeof(double);
case MPI_LONG_DOUBLE: return sizeof(long double);
case MPI_LONG_LONG_INT: return sizeof(long long);
default: return 0;
}
}
#endif

/** A C wrapper around MPI_Init. */
int ORBIT_MPI_Init(){
int res = 0;
Expand Down Expand Up @@ -552,15 +574,19 @@ int ORBIT_MPI_Wait(MPI_Request *request, MPI_Status *status){
}

/** A C wrapper around MPI_Allreduce. */
int ORBIT_MPI_Allreduce(void* ar1, void* ar2, int n, MPI_Datatype data, MPI_Op op, MPI_Comm comm){
int res = 0;
int ORBIT_MPI_Allreduce(void* ar1, void* ar2, int n, MPI_Datatype data, MPI_Op op, MPI_Comm comm) {
#if USE_MPI > 0
res = MPI_Allreduce(ar1, ar2, n, data, op, comm);
return MPI_Allreduce(ar1, ar2, n, data, op, comm);
#else
memcpy(ar2, ar1, n*sizeof(ar1));
res = MPI_SUCCESS;
if (ar1 == MPI_IN_PLACE || ar1 == ar2) return MPI_SUCCESS;

std::size_t nbytes = (std::size_t)n * ORBIT_MPI_Type_size(data);
if (nbytes > 0) {
std::memcpy(ar2, ar1, nbytes);
}

return MPI_SUCCESS;
#endif
return res;
}

/** A C wrapper around MPI_Bcast. */
Expand Down
4 changes: 4 additions & 0 deletions src/mpi/orbit_mpi.hh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@
#define MPI_ANY_SOURCE (-2)
#define MPI_ANY_TAG (-1)

/* MPI in-place operations */
#include <cstdint>
#define MPI_IN_PLACE ((void*)(intptr_t)-1)

#endif
//-------------------------------------------------------------
//END the case when USE_MPI is defined or not.
Expand Down
200 changes: 131 additions & 69 deletions src/mpi/wrap_orbit_mpi_send_receive_functions.hh
Original file line number Diff line number Diff line change
Expand Up @@ -28,79 +28,141 @@ static PyObject* mpi_wait(PyObject *self, PyObject *args){
return Py_BuildValue("i",res);
}

static PyObject* mpi_allreduce(PyObject *self, PyObject *args){
PyObject* pyO_arr; PyObject* pyO_datatype; PyObject* pyO_op; PyObject* pyO_comm;
if(!PyArg_ParseTuple( args,"OOOO:mpi_allreduce",&pyO_arr,&pyO_datatype,&pyO_op,&pyO_comm)){
error("MPI_Allreduce([...data],MPI_Datatype type,MPI_Op op,MPI_Comm out) - needs 4 params.");
}
pyORBIT_MPI_Comm* pyComm = (pyORBIT_MPI_Comm*) pyO_comm;
pyORBIT_MPI_Datatype* pyDatatype = (pyORBIT_MPI_Datatype*) pyO_datatype;
pyORBIT_MPI_Op* pyOp = (pyORBIT_MPI_Op*) pyO_op;
//check the data type
if(pyDatatype->datatype != MPI_INT && pyDatatype->datatype != MPI_DOUBLE){
error("MPI_Allreduce(...) data type could be INT or DOUBLE. STOP.");
}
//check if it is not sequence
int is_seq = 0;
if(PySequence_Check(pyO_arr) == 1){
is_seq = 1;
static PyObject* allreduce_scalar_int(PyObject* obj, pyORBIT_MPI_Op* pyOp, pyORBIT_MPI_Comm* pyComm) {
long v = PyLong_AsLong(obj);
if (PyErr_Occurred()) return NULL;

int val = (int)v;
ORBIT_MPI_Allreduce(MPI_IN_PLACE, &val, 1, MPI_INT, pyOp->op, pyComm->comm);
return Py_BuildValue("i", val);
}

static PyObject* allreduce_scalar_double(PyObject* obj, pyORBIT_MPI_Op* pyOp, pyORBIT_MPI_Comm* pyComm) {
double val = PyFloat_AsDouble(obj);
if (PyErr_Occurred()) return NULL;

ORBIT_MPI_Allreduce(MPI_IN_PLACE, &val, 1, MPI_DOUBLE, pyOp->op, pyComm->comm);
return Py_BuildValue("d", val);
}

static PyObject* allreduce_sequence_int(PyObject* obj, pyORBIT_MPI_Op* pyOp, pyORBIT_MPI_Comm* pyComm) {
PyObject *seq = PySequence_Fast(obj, "MPI_Allreduce(...) expected a sequence of ints");
if (seq == NULL) return NULL;

Py_ssize_t size = PySequence_Fast_GET_SIZE(seq);
PyObject *pyRes = PyTuple_New(size);

if (pyRes == NULL) {
Py_DECREF(seq);
return NULL;
}

int buff_index = 0;
int *buf = BufferStore::getBufferStore()->getFreeIntArr(buff_index, (int)size);

for (Py_ssize_t i = 0; i < size; ++i) {
long v = PyLong_AsLong(PySequence_Fast_GET_ITEM(seq, i));
if (PyErr_Occurred()) {
BufferStore::getBufferStore()->setUnusedIntArr(buff_index);
Py_DECREF(pyRes);
Py_DECREF(seq);
return NULL;
}
//it is NOT SEQUENCE
if(is_seq == 0){
if(pyDatatype->datatype == MPI_INT){
int val = (int) PyLong_AsLong(pyO_arr);
int val_out = 0;
ORBIT_MPI_Allreduce(&val,&val_out,1,MPI_INT,pyOp->op,pyComm->comm);
return Py_BuildValue("i",val_out);
}
if(pyDatatype->datatype == MPI_DOUBLE){
double val = PyFloat_AsDouble(pyO_arr);
double val_out = 0.;
ORBIT_MPI_Allreduce(&val,&val_out,1,MPI_DOUBLE,pyOp->op,pyComm->comm);
return Py_BuildValue("d",val_out);
}
error("MPI_Allreduce(...) - use only INT or DOUBLE data types");
buf[i] = (int)v;
}

ORBIT_MPI_Allreduce(MPI_IN_PLACE, buf, (int)size, MPI_INT, pyOp->op, pyComm->comm);

for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *item = Py_BuildValue("i", buf[i]);
if (item == NULL || PyTuple_SetItem(pyRes, i, item) != 0) {
Py_XDECREF(item);
BufferStore::getBufferStore()->setUnusedIntArr(buff_index);
Py_DECREF(pyRes);
Py_DECREF(seq);
return NULL;
}
//it IS A SEQUENCE
int size = PySequence_Size(pyO_arr);
PyObject* pyRes = PyTuple_New(size);
//data is an INT array
if(pyDatatype->datatype == MPI_INT){
int buff_index0 = 0;
int buff_index1 = 0;
int* arr = BufferStore::getBufferStore()->getFreeIntArr(buff_index0,size);
int* arr_out = BufferStore::getBufferStore()->getFreeIntArr(buff_index1,size);
for(int i = 0; i < size; i++){
arr[i]= (int) PyLong_AsLong(PySequence_Fast_GET_ITEM(pyO_arr, i));
}
ORBIT_MPI_Allreduce(arr,arr_out,size,MPI_INT,pyOp->op,pyComm->comm);
for(int i = 0; i < size; i++){
if(PyTuple_SetItem(pyRes,i,Py_BuildValue("i",arr_out[i])) != 0){
error("MPI_Allreduce(...) cannot create a resulting tuple.");
}
}
BufferStore::getBufferStore()->setUnusedIntArr(buff_index0);
BufferStore::getBufferStore()->setUnusedIntArr(buff_index1);
}

BufferStore::getBufferStore()->setUnusedIntArr(buff_index);
Py_DECREF(seq);
return pyRes;
}

static PyObject* allreduce_sequence_double(PyObject* obj, pyORBIT_MPI_Op* pyOp, pyORBIT_MPI_Comm* pyComm) {
PyObject *seq = PySequence_Fast(obj, "MPI_Allreduce(...) expected a sequence of ints");
if (seq == NULL) return NULL;

Py_ssize_t size = PySequence_Fast_GET_SIZE(seq);
PyObject *pyRes = PyTuple_New(size);

if (pyRes == NULL) {
Py_DECREF(seq);
return NULL;
}

int buff_index = 0;
double *buf = BufferStore::getBufferStore()->getFreeDoubleArr(buff_index, (int)size);

for (Py_ssize_t i = 0; i < size; ++i) {
double v = PyFloat_AsDouble(PySequence_Fast_GET_ITEM(seq, i));
if (PyErr_Occurred()) {
BufferStore::getBufferStore()->setUnusedDoubleArr(buff_index);
Py_DECREF(pyRes);
Py_DECREF(seq);
return NULL;
}
//data is an DOUBLE array
if(pyDatatype->datatype == MPI_DOUBLE){
int buff_index0 = 0;
int buff_index1 = 0;
double* arr = BufferStore::getBufferStore()->getFreeDoubleArr(buff_index0,size);
double* arr_out = BufferStore::getBufferStore()->getFreeDoubleArr(buff_index1,size);
for(int i = 0; i < size; i++){
arr[i]= PyFloat_AsDouble(PySequence_Fast_GET_ITEM(pyO_arr, i));
}
ORBIT_MPI_Allreduce(arr,arr_out,size,MPI_DOUBLE,pyOp->op,pyComm->comm);
for(int i = 0; i < size; i++){
if(PyTuple_SetItem(pyRes,i,Py_BuildValue("d",arr_out[i])) != 0){
error("MPI_Allreduce(...) cannot create a resulting tuple.");
}
}
BufferStore::getBufferStore()->setUnusedDoubleArr(buff_index0);
BufferStore::getBufferStore()->setUnusedDoubleArr(buff_index1);
buf[i] = v;
}

ORBIT_MPI_Allreduce(MPI_IN_PLACE, buf, (int)size, MPI_DOUBLE, pyOp->op, pyComm->comm);

for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *item = Py_BuildValue("d", buf[i]);
if (item == NULL || PyTuple_SetItem(pyRes, i, item) != 0) {
Py_XDECREF(item);
BufferStore::getBufferStore()->setUnusedDoubleArr(buff_index);
Py_DECREF(pyRes);
Py_DECREF(seq);
return NULL;
}
return pyRes;
}

BufferStore::getBufferStore()->setUnusedDoubleArr(buff_index);
Py_DECREF(seq);
return pyRes;
}

static PyObject* mpi_allreduce(PyObject *self, PyObject *args){
PyObject *pyO_arr, *pyO_datatype, *pyO_op, *pyO_comm;

if(!PyArg_ParseTuple(args,"OOOO:mpi_allreduce", &pyO_arr, &pyO_datatype, &pyO_op, &pyO_comm)) {
error("MPI_Allreduce([...data],MPI_Datatype type,MPI_Op op,MPI_Comm out) - needs 4 params.");
}

pyORBIT_MPI_Comm* pyComm = (pyORBIT_MPI_Comm*) pyO_comm;
pyORBIT_MPI_Datatype* pyDatatype = (pyORBIT_MPI_Datatype*) pyO_datatype;
pyORBIT_MPI_Op* pyOp = (pyORBIT_MPI_Op*) pyO_op;

//check the data type
MPI_Datatype dtype = pyDatatype->datatype;

if(dtype != MPI_INT && dtype != MPI_DOUBLE){
error("MPI_Allreduce(...) data type should be INT or DOUBLE. STOP.");
}

//check if it is a sequence or a scalar
if (PySequence_Check(pyO_arr) == 0) {
if (dtype == MPI_INT) {
return allreduce_scalar_int(pyO_arr, pyOp, pyComm);
}
return allreduce_scalar_double(pyO_arr, pyOp, pyComm);
} else {
if (dtype == MPI_INT) {
return allreduce_sequence_int(pyO_arr, pyOp, pyComm);
}
return allreduce_sequence_double(pyO_arr, pyOp, pyComm);
}
}

static PyObject* mpi_bcast(PyObject *self, PyObject *args){
Expand Down
113 changes: 113 additions & 0 deletions tests/py/orbit/core/test_orbit_mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import pytest
from orbit.core.orbit_mpi import (
mpi_comm,
mpi_datatype,
mpi_op,
MPI_Allreduce,
MPI_Comm_size,
)

_MPI_SIZE = MPI_Comm_size(mpi_comm.MPI_COMM_WORLD)

_OPS = [
pytest.param(mpi_op.MPI_SUM, "sum", id="MPI_SUM"),
pytest.param(mpi_op.MPI_MAX, "max", id="MPI_MAX"),
pytest.param(mpi_op.MPI_MIN, "min", id="MPI_MIN"),
pytest.param(mpi_op.MPI_PROD, "prod", id="MPI_PROD"),
]


def _expected(val, op_name, size):
if op_name == "sum":
return val * size
elif op_name == "max":
return val
elif op_name == "min":
return val
elif op_name == "prod":
return val**size
raise ValueError(f"unknown op: {op_name}")


def _expected_seq(vals, op_name, size):
return tuple(_expected(v, op_name, size) for v in vals)


class TestMPI_Allreduce:
@pytest.mark.parametrize("op,op_name", _OPS)
def test_double_scalar(self, op, op_name):
val = 3.14
expected = _expected(val, op_name, _MPI_SIZE)
res = MPI_Allreduce(val, mpi_datatype.MPI_DOUBLE, op, mpi_comm.MPI_COMM_WORLD)
assert isinstance(res, float)
assert res == pytest.approx(expected)

@pytest.mark.parametrize("op,op_name", _OPS)
def test_int_scalar(self, op, op_name):
val = 42
expected = _expected(val, op_name, _MPI_SIZE)
res = MPI_Allreduce(val, mpi_datatype.MPI_INT, op, mpi_comm.MPI_COMM_WORLD)
assert isinstance(res, int)
assert res == expected

@pytest.mark.parametrize("op,op_name", _OPS)
def test_double_tuple(self, op, op_name):
vals = (1.5, 2.5, 3.5)
expected = _expected_seq(vals, op_name, _MPI_SIZE)
res = MPI_Allreduce(vals, mpi_datatype.MPI_DOUBLE, op, mpi_comm.MPI_COMM_WORLD)
assert isinstance(res, tuple)
assert len(res) == len(expected)
for a, b in zip(res, expected):
assert a == pytest.approx(b)

@pytest.mark.parametrize("op,op_name", _OPS)
def test_int_tuple(self, op, op_name):
vals = (10, 20, 30)
expected = _expected_seq(vals, op_name, _MPI_SIZE)
res = MPI_Allreduce(vals, mpi_datatype.MPI_INT, op, mpi_comm.MPI_COMM_WORLD)
assert isinstance(res, tuple)
assert list(res) == list(expected)

@pytest.mark.parametrize("op,op_name", _OPS)
def test_double_list(self, op, op_name):
vals = [1.5, 2.5, 3.5]
expected = _expected_seq(vals, op_name, _MPI_SIZE)
res = MPI_Allreduce(vals, mpi_datatype.MPI_DOUBLE, op, mpi_comm.MPI_COMM_WORLD)
assert isinstance(res, tuple)
assert len(res) == len(expected)
for a, b in zip(res, expected):
assert a == pytest.approx(b)

@pytest.mark.parametrize("op,op_name", _OPS)
def test_int_list(self, op, op_name):
vals = [10, 20, 30]
expected = _expected_seq(vals, op_name, _MPI_SIZE)
res = MPI_Allreduce(vals, mpi_datatype.MPI_INT, op, mpi_comm.MPI_COMM_WORLD)
assert isinstance(res, tuple)
assert list(res) == list(expected)

@pytest.mark.skip(
reason="numpy arrays not yet supported by MPI_Allreduce C wrapper – segfaults"
)
@pytest.mark.parametrize("op,op_name", _OPS)
def test_double_ndarray(self, op, op_name):
np = pytest.importorskip("numpy")
vals = np.array([1.5, 2.5, 3.5])
expected = _expected_seq(vals, op_name, _MPI_SIZE)
res = MPI_Allreduce(vals, mpi_datatype.MPI_DOUBLE, op, mpi_comm.MPI_COMM_WORLD)
assert isinstance(res, tuple)
assert len(res) == len(expected)
for a, b in zip(res, expected):
assert a == pytest.approx(float(b))

@pytest.mark.skip(
reason="numpy arrays not yet supported by MPI_Allreduce C wrapper – segfaults"
)
@pytest.mark.parametrize("op,op_name", _OPS)
def test_int_ndarray(self, op, op_name):
np = pytest.importorskip("numpy")
vals = np.array([10, 20, 30], dtype=np.int64)
expected = _expected_seq(vals, op_name, _MPI_SIZE)
res = MPI_Allreduce(vals, mpi_datatype.MPI_INT, op, mpi_comm.MPI_COMM_WORLD)
assert isinstance(res, tuple)
assert list(res) == list(map(int, expected))
Loading