Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ompi/mca/coll/libnbc/coll_libnbc_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ static mca_base_var_enum_value_t iallgather_algorithms[] = {
{0, "ignore"},
{1, "linear"},
{2, "recursive_doubling"},
{3, "bruck"},
{0, NULL}
};

Expand Down Expand Up @@ -215,7 +216,7 @@ libnbc_register(void)
(void) mca_base_var_enum_create("coll_libnbc_iallgather_algorithms", iallgather_algorithms, &new_enum);
mca_base_component_var_register(&mca_coll_libnbc_component.super.collm_version,
"iallgather_algorithm",
"Which iallgather algorithm is used: 0 ignore, 1 linear, 2 recursive_doubling",
"Which iallgather algorithm is used: 0 ignore, 1 linear, 2 recursive_doubling, 3 bruck",
MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE,
OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_ALL,
&libnbc_iallgather_algorithm);
Expand Down
189 changes: 181 additions & 8 deletions ompi/mca/coll/libnbc/nbc_iallgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ static inline int allgather_sched_recursivedoubling(
int rank, int comm_size, NBC_Schedule *schedule, const void *sbuf,
int scount, struct ompi_datatype_t *sdtype, void *rbuf, int rcount,
struct ompi_datatype_t *rdtype);
static inline int allgather_sched_bruck(
int rank, int comm_size, NBC_Schedule *schedule, const void *sbuf,
int scount, struct ompi_datatype_t *sdtype, void *rbuf, int rcount,
struct ompi_datatype_t *rdtype, void *tmpbuf, char inplace);

#ifdef NBC_CACHE_SCHEDULE
/* tree comparison function for schedule cache */
Expand Down Expand Up @@ -56,19 +60,22 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s
int rank, p, res;
MPI_Aint rcvext;
NBC_Schedule *schedule;
void *tmpbuf = NULL;
char *rbuf, inplace;
#ifdef NBC_CACHE_SCHEDULE
NBC_Allgather_args *args, *found, search;
#endif
enum { NBC_ALLGATHER_LINEAR, NBC_ALLGATHER_RDBL} alg;
enum { NBC_ALLGATHER_LINEAR, NBC_ALLGATHER_RDBL, NBC_ALLGATHER_BRUCK } alg;
ompi_coll_libnbc_module_t *libnbc_module = (ompi_coll_libnbc_module_t*) module;
ptrdiff_t span, gap;

NBC_IN_PLACE(sendbuf, recvbuf, inplace);

rank = ompi_comm_rank (comm);
p = ompi_comm_size (comm);
int is_commsize_pow2 = !(p & (p - 1));

/* algorithm selection */
if (libnbc_iallgather_algorithm == 0) {
alg = NBC_ALLGATHER_LINEAR;
} else {
Expand All @@ -77,6 +84,8 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s
alg = NBC_ALLGATHER_LINEAR;
} else if (libnbc_iallgather_algorithm == 2 && is_commsize_pow2) {
alg = NBC_ALLGATHER_RDBL;
} else if (libnbc_iallgather_algorithm == 3) {
alg = NBC_ALLGATHER_BRUCK;
} else {
alg = NBC_ALLGATHER_LINEAR;
}
Expand All @@ -87,18 +96,44 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s
return res;
}

/* allocate temporary buffers */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting ugly. Why can't we have a nice function based init ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that this approach is ugly, but I adhered overall design of libnbc (allreduce, alltoall, etc). I am ready to fix this PR, but it is required an additional discussion of memory allocation in libnbc.

if (alg == NBC_ALLGATHER_BRUCK) {
span = opal_datatype_span(&recvtype->super, (int64_t)(p - rank) * recvcount, &gap);
tmpbuf = (char *)calloc(span, sizeof(char));
if (OPAL_UNLIKELY(NULL == tmpbuf)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
}

if (inplace) {
sendtype = recvtype;
sendcount = recvcount;

if ((alg == NBC_ALLGATHER_BRUCK) && (0 != rank)) { /* non root with MPI_IN_PLACE */
/* copy r^th block from receive buffer to block 0 */
rbuf = (char *) recvbuf + rank * recvcount * rcvext;
res = NBC_Copy (rbuf, recvcount, recvtype, recvbuf, recvcount, recvtype, comm);
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
free(tmpbuf);
return res;
}
}
} else if (!persistent) { /* for persistent, the copy must be scheduled */
/* copy my data to receive buffer */
rbuf = (char *) recvbuf + rank * recvcount * rcvext;
res = NBC_Copy (sendbuf, sendcount, sendtype, rbuf, recvcount, recvtype, comm);
if (alg == NBC_ALLGATHER_BRUCK) {
/* copy send buffer to block 0 of receive buffer */
res = NBC_Copy (sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm);
} else {
/* copy send buffer to r^th block of receive buffer */
rbuf = (char *) recvbuf + rank * recvcount * rcvext;
res = NBC_Copy (sendbuf, sendcount, sendtype, rbuf, recvcount, recvtype, comm);
}
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
free(tmpbuf);
return res;
}
}
if (1 == p && (!persistent || inplace)) {
free(tmpbuf);
return nbc_get_noop_request(persistent, request);
}

Expand All @@ -115,17 +150,24 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s
#endif
schedule = OBJ_NEW(NBC_Schedule);
if (OPAL_UNLIKELY(NULL == schedule)) {
free(tmpbuf);
return OMPI_ERR_OUT_OF_RESOURCE;
}

if (persistent && !inplace) {
/* for nonblocking, data has been copied already */
/* copy my data to receive buffer (= send buffer of NBC_Sched_send) */
rbuf = (char *)recvbuf + rank * recvcount * rcvext;
res = NBC_Sched_copy((void *)sendbuf, false, sendcount, sendtype,
rbuf, false, recvcount, recvtype, schedule, true);
if (alg == NBC_ALLGATHER_BRUCK) {
res = NBC_Sched_copy((void *)sendbuf, false, sendcount, sendtype,
recvbuf, false, recvcount, recvtype, schedule, false);
} else {
rbuf = (char *)recvbuf + rank * recvcount * rcvext;
res = NBC_Sched_copy((void *)sendbuf, false, sendcount, sendtype,
rbuf, false, recvcount, recvtype, schedule, true);
}
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
OBJ_RELEASE(schedule);
free(tmpbuf);
return res;
}
}
Expand All @@ -139,16 +181,22 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s
res = allgather_sched_recursivedoubling(rank, p, schedule, sendbuf, sendcount,
sendtype, recvbuf, recvcount, recvtype);
break;
case NBC_ALLGATHER_BRUCK:
res = allgather_sched_bruck(rank, p, schedule, sendbuf, sendcount,
sendtype, recvbuf, recvcount, recvtype, tmpbuf, inplace);
break;
}

if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
OBJ_RELEASE(schedule);
free(tmpbuf);
return res;
}

res = NBC_Sched_commit(schedule);
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
OBJ_RELEASE(schedule);
free(tmpbuf);
return res;
}

Expand Down Expand Up @@ -181,9 +229,10 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s
}
#endif

res = NBC_Schedule_request(schedule, comm, libnbc_module, persistent, request, NULL);
res = NBC_Schedule_request(schedule, comm, libnbc_module, persistent, request, tmpbuf);
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
OBJ_RELEASE(schedule);
free(tmpbuf);
return res;
}

Expand Down Expand Up @@ -389,6 +438,130 @@ static inline int allgather_sched_recursivedoubling(
return res;
}

/*
* allgather_sched_bruck
*
* Description: Variation to All-to-all algorithm described by Bruck et al.in
* "Efficient Algorithms for All-to-all Communications
* in Multiport Message-Passing Systems"
* Time: O(log(comm_size))
* Schedule length: O(log(comm_size))
* Memory: non-zero ranks require temp buffer to perform final step
* in the algorithm.
*
* Example on 6 nodes:
* Initialization: everyone has its own buffer at location 0 in rbuf
* # 0 1 2 3 4 5
* [0] [1] [2] [3] [4] [5]
* Step 0: send message to (rank - 2^0), receive message from (rank + 2^0)
* # 0 1 2 3 4 5
* [0] [1] [2] [3] [4] [5]
* [1] [2] [3] [4] [5] [0]
* Step 1: send message to (rank - 2^1), receive message from (rank + 2^1)
* message contains all blocks from location 0 to 2^1*block size
* # 0 1 2 3 4 5
* [0] [1] [2] [3] [4] [5]
* [1] [2] [3] [4] [5] [0]
* [2] [3] [4] [5] [0] [1]
* [3] [4] [5] [0] [1] [2]
* Step 2: send message to (rank - 2^2), receive message from (rank + 2^2)
* message size is "all remaining blocks"
* # 0 1 2 3 4 5
* [0] [1] [2] [3] [4] [5]
* [1] [2] [3] [4] [5] [0]
* [2] [3] [4] [5] [0] [1]
* [3] [4] [5] [0] [1] [2]
* [4] [5] [0] [1] [2] [3]
* [5] [0] [1] [2] [3] [4]
* Finalization: Do a local shift to get data in correct place
* # 0 1 2 3 4 5
* [0] [0] [0] [0] [0] [0]
* [1] [1] [1] [1] [1] [1]
* [2] [2] [2] [2] [2] [2]
* [3] [3] [3] [3] [3] [3]
* [4] [4] [4] [4] [4] [4]
* [5] [5] [5] [5] [5] [5]
*/
static inline int allgather_sched_bruck(
int rank, int comm_size, NBC_Schedule *schedule, const void *sbuf,
int scount, struct ompi_datatype_t *sdtype, void *rbuf, int rcount,
struct ompi_datatype_t *rdtype, void *tmpbuf, char inplace)
{
int res = OMPI_SUCCESS;
ptrdiff_t rlb, rext;
char *tmpsend = NULL, *tmprecv = NULL;

res = ompi_datatype_get_extent(rdtype, &rlb, &rext);
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }

/* Communication step:
At every step i, rank r:
- doubles the distance
- sends message which starts at begining of rbuf and has size
(blockcount * rcount) to rank (r - distance)
- receives message of size blockcount * rcount from rank (r + distance)
at location (rbuf + distance * rcount * rext)
- blockcount doubles until last step when only the remaining data is
exchanged.
*/
int blockcount = 1;
for(int distance = 1; distance < comm_size; distance <<= 1) {
int sendto = (rank - distance + comm_size) % comm_size;
int recvfrom = (rank + distance) % comm_size;

if (distance <= (comm_size >> 1)) {
blockcount = distance;
} else {
blockcount = comm_size - distance;
}

tmpsend = (char *)rbuf;
tmprecv = tmpsend + (ptrdiff_t)distance * (ptrdiff_t)rcount * rext;

res = NBC_Sched_send(tmpsend, false, (ptrdiff_t)blockcount * (ptrdiff_t)rcount,
rdtype, sendto, schedule, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }

res = NBC_Sched_recv(tmprecv, false, (ptrdiff_t)blockcount * (ptrdiff_t)rcount,
rdtype, recvfrom, schedule, true);
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
}

/* Finalization step:
On all nodes except 0, data needs to be shifted locally:
- copy blocks [0 .. (comm_size - rank - 1)] from rbuf to shiftbuf buffer
- move blocks [(comm_size - rank) .. size] from rbuf to begining of rbuf
- copy blocks from shift buffer starting at block [rank] in rbuf.
*/
if (0 != rank) {
ptrdiff_t gap;

(void)opal_datatype_span(&rdtype->super, (int64_t)(comm_size - rank) * rcount, &gap);
char * shiftbuf = (char *)tmpbuf - gap;

/* 1. copy blocks [0 .. (comm_size - rank - 1)] from rbuf to shift buffer */
res = NBC_Sched_copy((char *)rbuf, false, (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount, rdtype,
shiftbuf, false, (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount, rdtype, schedule, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }

/* 2. move blocks [(comm_size - rank) .. size] from rbuf to the begining of rbuf */
tmpsend = (char *)rbuf + (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount * rext;
res = NBC_Sched_copy(tmpsend, false, (ptrdiff_t)rank * (ptrdiff_t)rcount, rdtype,
(char *)rbuf, false, (ptrdiff_t)rank * (ptrdiff_t)rcount, rdtype, schedule, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }

/* 3. copy blocks from shift buffer back to rbuf starting at block [rank]. */
tmprecv = (char *)rbuf + (ptrdiff_t)rank * (ptrdiff_t)rcount * rext;
res = NBC_Sched_copy(shiftbuf, false, (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount, rdtype,
tmprecv, false, (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount, rdtype, schedule, false);
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
}


cleanup_and_return:
return res;
}

int ompi_coll_libnbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
MPI_Datatype recvtype, struct ompi_communicator_t *comm, MPI_Info info, ompi_request_t ** request,
struct mca_coll_base_module_2_3_0_t *module) {
Expand Down