diff --git a/ompi/mca/coll/libnbc/coll_libnbc_component.c b/ompi/mca/coll/libnbc/coll_libnbc_component.c index c0872e556e9..45b92a0c80d 100644 --- a/ompi/mca/coll/libnbc/coll_libnbc_component.c +++ b/ompi/mca/coll/libnbc/coll_libnbc_component.c @@ -51,6 +51,7 @@ static mca_base_var_enum_value_t iallgather_algorithms[] = { {0, "ignore"}, {1, "linear"}, {2, "recursive_doubling"}, + {3, "bruck"}, {0, NULL} }; @@ -215,7 +216,7 @@ libnbc_register(void) (void) mca_base_var_enum_create("coll_libnbc_iallgather_algorithms", iallgather_algorithms, &new_enum); mca_base_component_var_register(&mca_coll_libnbc_component.super.collm_version, "iallgather_algorithm", - "Which iallgather algorithm is used: 0 ignore, 1 linear, 2 recursive_doubling", + "Which iallgather algorithm is used: 0 ignore, 1 linear, 2 recursive_doubling, 3 bruck", MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE, OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_ALL, &libnbc_iallgather_algorithm); diff --git a/ompi/mca/coll/libnbc/nbc_iallgather.c b/ompi/mca/coll/libnbc/nbc_iallgather.c index 29ba7a6a9c1..574deb81572 100644 --- a/ompi/mca/coll/libnbc/nbc_iallgather.c +++ b/ompi/mca/coll/libnbc/nbc_iallgather.c @@ -28,6 +28,10 @@ static inline int allgather_sched_recursivedoubling( int rank, int comm_size, NBC_Schedule *schedule, const void *sbuf, int scount, struct ompi_datatype_t *sdtype, void *rbuf, int rcount, struct ompi_datatype_t *rdtype); +static inline int allgather_sched_bruck( + int rank, int comm_size, NBC_Schedule *schedule, const void *sbuf, + int scount, struct ompi_datatype_t *sdtype, void *rbuf, int rcount, + struct ompi_datatype_t *rdtype, void *tmpbuf, char inplace); #ifdef NBC_CACHE_SCHEDULE /* tree comparison function for schedule cache */ @@ -56,12 +60,14 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s int rank, p, res; MPI_Aint rcvext; NBC_Schedule *schedule; + void *tmpbuf = NULL; char *rbuf, inplace; #ifdef NBC_CACHE_SCHEDULE NBC_Allgather_args *args, *found, search; #endif - enum { NBC_ALLGATHER_LINEAR, NBC_ALLGATHER_RDBL} alg; + enum { NBC_ALLGATHER_LINEAR, NBC_ALLGATHER_RDBL, NBC_ALLGATHER_BRUCK } alg; ompi_coll_libnbc_module_t *libnbc_module = (ompi_coll_libnbc_module_t*) module; + ptrdiff_t span, gap; NBC_IN_PLACE(sendbuf, recvbuf, inplace); @@ -69,6 +75,7 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s p = ompi_comm_size (comm); int is_commsize_pow2 = !(p & (p - 1)); +/* algorithm selection */ if (libnbc_iallgather_algorithm == 0) { alg = NBC_ALLGATHER_LINEAR; } else { @@ -77,6 +84,8 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s alg = NBC_ALLGATHER_LINEAR; } else if (libnbc_iallgather_algorithm == 2 && is_commsize_pow2) { alg = NBC_ALLGATHER_RDBL; + } else if (libnbc_iallgather_algorithm == 3) { + alg = NBC_ALLGATHER_BRUCK; } else { alg = NBC_ALLGATHER_LINEAR; } @@ -87,18 +96,44 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s return res; } + /* allocate temporary buffers */ + if (alg == NBC_ALLGATHER_BRUCK) { + span = opal_datatype_span(&recvtype->super, (int64_t)(p - rank) * recvcount, &gap); + tmpbuf = (char *)calloc(span, sizeof(char)); + if (OPAL_UNLIKELY(NULL == tmpbuf)) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + } + if (inplace) { sendtype = recvtype; sendcount = recvcount; + + if ((alg == NBC_ALLGATHER_BRUCK) && (0 != rank)) { /* non root with MPI_IN_PLACE */ + /* copy r^th block from receive buffer to block 0 */ + rbuf = (char *) recvbuf + rank * recvcount * rcvext; + res = NBC_Copy (rbuf, recvcount, recvtype, recvbuf, recvcount, recvtype, comm); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { + free(tmpbuf); + return res; + } + } } else if (!persistent) { /* for persistent, the copy must be scheduled */ - /* copy my data to receive buffer */ - rbuf = (char *) recvbuf + rank * recvcount * rcvext; - res = NBC_Copy (sendbuf, sendcount, sendtype, rbuf, recvcount, recvtype, comm); + if (alg == NBC_ALLGATHER_BRUCK) { + /* copy send buffer to block 0 of receive buffer */ + res = NBC_Copy (sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm); + } else { + /* copy send buffer to r^th block of receive buffer */ + rbuf = (char *) recvbuf + rank * recvcount * rcvext; + res = NBC_Copy (sendbuf, sendcount, sendtype, rbuf, recvcount, recvtype, comm); + } if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { + free(tmpbuf); return res; } } if (1 == p && (!persistent || inplace)) { + free(tmpbuf); return nbc_get_noop_request(persistent, request); } @@ -115,17 +150,24 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s #endif schedule = OBJ_NEW(NBC_Schedule); if (OPAL_UNLIKELY(NULL == schedule)) { + free(tmpbuf); return OMPI_ERR_OUT_OF_RESOURCE; } if (persistent && !inplace) { /* for nonblocking, data has been copied already */ /* copy my data to receive buffer (= send buffer of NBC_Sched_send) */ - rbuf = (char *)recvbuf + rank * recvcount * rcvext; - res = NBC_Sched_copy((void *)sendbuf, false, sendcount, sendtype, - rbuf, false, recvcount, recvtype, schedule, true); + if (alg == NBC_ALLGATHER_BRUCK) { + res = NBC_Sched_copy((void *)sendbuf, false, sendcount, sendtype, + recvbuf, false, recvcount, recvtype, schedule, false); + } else { + rbuf = (char *)recvbuf + rank * recvcount * rcvext; + res = NBC_Sched_copy((void *)sendbuf, false, sendcount, sendtype, + rbuf, false, recvcount, recvtype, schedule, true); + } if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { OBJ_RELEASE(schedule); + free(tmpbuf); return res; } } @@ -139,16 +181,22 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s res = allgather_sched_recursivedoubling(rank, p, schedule, sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype); break; + case NBC_ALLGATHER_BRUCK: + res = allgather_sched_bruck(rank, p, schedule, sendbuf, sendcount, + sendtype, recvbuf, recvcount, recvtype, tmpbuf, inplace); + break; } if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { OBJ_RELEASE(schedule); + free(tmpbuf); return res; } res = NBC_Sched_commit(schedule); if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { OBJ_RELEASE(schedule); + free(tmpbuf); return res; } @@ -181,9 +229,10 @@ static int nbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype s } #endif - res = NBC_Schedule_request(schedule, comm, libnbc_module, persistent, request, NULL); + res = NBC_Schedule_request(schedule, comm, libnbc_module, persistent, request, tmpbuf); if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { OBJ_RELEASE(schedule); + free(tmpbuf); return res; } @@ -389,6 +438,130 @@ static inline int allgather_sched_recursivedoubling( return res; } +/* + * allgather_sched_bruck + * + * Description: Variation to All-to-all algorithm described by Bruck et al.in + * "Efficient Algorithms for All-to-all Communications + * in Multiport Message-Passing Systems" + * Time: O(log(comm_size)) + * Schedule length: O(log(comm_size)) + * Memory: non-zero ranks require temp buffer to perform final step + * in the algorithm. + * + * Example on 6 nodes: + * Initialization: everyone has its own buffer at location 0 in rbuf + * # 0 1 2 3 4 5 + * [0] [1] [2] [3] [4] [5] + * Step 0: send message to (rank - 2^0), receive message from (rank + 2^0) + * # 0 1 2 3 4 5 + * [0] [1] [2] [3] [4] [5] + * [1] [2] [3] [4] [5] [0] + * Step 1: send message to (rank - 2^1), receive message from (rank + 2^1) + * message contains all blocks from location 0 to 2^1*block size + * # 0 1 2 3 4 5 + * [0] [1] [2] [3] [4] [5] + * [1] [2] [3] [4] [5] [0] + * [2] [3] [4] [5] [0] [1] + * [3] [4] [5] [0] [1] [2] + * Step 2: send message to (rank - 2^2), receive message from (rank + 2^2) + * message size is "all remaining blocks" + * # 0 1 2 3 4 5 + * [0] [1] [2] [3] [4] [5] + * [1] [2] [3] [4] [5] [0] + * [2] [3] [4] [5] [0] [1] + * [3] [4] [5] [0] [1] [2] + * [4] [5] [0] [1] [2] [3] + * [5] [0] [1] [2] [3] [4] + * Finalization: Do a local shift to get data in correct place + * # 0 1 2 3 4 5 + * [0] [0] [0] [0] [0] [0] + * [1] [1] [1] [1] [1] [1] + * [2] [2] [2] [2] [2] [2] + * [3] [3] [3] [3] [3] [3] + * [4] [4] [4] [4] [4] [4] + * [5] [5] [5] [5] [5] [5] + */ +static inline int allgather_sched_bruck( + int rank, int comm_size, NBC_Schedule *schedule, const void *sbuf, + int scount, struct ompi_datatype_t *sdtype, void *rbuf, int rcount, + struct ompi_datatype_t *rdtype, void *tmpbuf, char inplace) +{ + int res = OMPI_SUCCESS; + ptrdiff_t rlb, rext; + char *tmpsend = NULL, *tmprecv = NULL; + + res = ompi_datatype_get_extent(rdtype, &rlb, &rext); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + + /* Communication step: + At every step i, rank r: + - doubles the distance + - sends message which starts at begining of rbuf and has size + (blockcount * rcount) to rank (r - distance) + - receives message of size blockcount * rcount from rank (r + distance) + at location (rbuf + distance * rcount * rext) + - blockcount doubles until last step when only the remaining data is + exchanged. + */ + int blockcount = 1; + for(int distance = 1; distance < comm_size; distance <<= 1) { + int sendto = (rank - distance + comm_size) % comm_size; + int recvfrom = (rank + distance) % comm_size; + + if (distance <= (comm_size >> 1)) { + blockcount = distance; + } else { + blockcount = comm_size - distance; + } + + tmpsend = (char *)rbuf; + tmprecv = tmpsend + (ptrdiff_t)distance * (ptrdiff_t)rcount * rext; + + res = NBC_Sched_send(tmpsend, false, (ptrdiff_t)blockcount * (ptrdiff_t)rcount, + rdtype, sendto, schedule, false); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + + res = NBC_Sched_recv(tmprecv, false, (ptrdiff_t)blockcount * (ptrdiff_t)rcount, + rdtype, recvfrom, schedule, true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + } + + /* Finalization step: + On all nodes except 0, data needs to be shifted locally: + - copy blocks [0 .. (comm_size - rank - 1)] from rbuf to shiftbuf buffer + - move blocks [(comm_size - rank) .. size] from rbuf to begining of rbuf + - copy blocks from shift buffer starting at block [rank] in rbuf. + */ + if (0 != rank) { + ptrdiff_t gap; + + (void)opal_datatype_span(&rdtype->super, (int64_t)(comm_size - rank) * rcount, &gap); + char * shiftbuf = (char *)tmpbuf - gap; + + /* 1. copy blocks [0 .. (comm_size - rank - 1)] from rbuf to shift buffer */ + res = NBC_Sched_copy((char *)rbuf, false, (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount, rdtype, + shiftbuf, false, (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount, rdtype, schedule, false); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + + /* 2. move blocks [(comm_size - rank) .. size] from rbuf to the begining of rbuf */ + tmpsend = (char *)rbuf + (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount * rext; + res = NBC_Sched_copy(tmpsend, false, (ptrdiff_t)rank * (ptrdiff_t)rcount, rdtype, + (char *)rbuf, false, (ptrdiff_t)rank * (ptrdiff_t)rcount, rdtype, schedule, false); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + + /* 3. copy blocks from shift buffer back to rbuf starting at block [rank]. */ + tmprecv = (char *)rbuf + (ptrdiff_t)rank * (ptrdiff_t)rcount * rext; + res = NBC_Sched_copy(shiftbuf, false, (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount, rdtype, + tmprecv, false, (ptrdiff_t)(comm_size - rank) * (ptrdiff_t)rcount, rdtype, schedule, false); + if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; } + } + + +cleanup_and_return: + return res; +} + int ompi_coll_libnbc_allgather_init(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, struct ompi_communicator_t *comm, MPI_Info info, ompi_request_t ** request, struct mca_coll_base_module_2_3_0_t *module) {