diff --git a/ompi/mca/coll/acoll/README b/ompi/mca/coll/acoll/README index d5b5acae8f1..cef0a1a0f9f 100644 --- a/ompi/mca/coll/acoll/README +++ b/ompi/mca/coll/acoll/README @@ -8,9 +8,9 @@ $HEADER$ =========================================================================== -The collective component, AMD Coll (“acoll”), is a high-performant MPI collective component for the OpenMPI library that is optimized for AMD "Zen"-based processors. “acoll” is optimized for communications within a single node of AMD “Zen”-based processors and provides the following commonly used collective algorithms: boardcast (MPI_Bcast), allreduce (MPI_Allreduce), reduce (MPI_Reduce), gather (MPI_Gather), allgather (MPI_Allgather), and barrier (MPI_Barrier). +The collective component, AMD Coll (“acoll”), is a high-performant MPI collective component for the OpenMPI library that is optimized for AMD "Zen"-based processors. “acoll” is optimized for communications within a single node of AMD “Zen”-based processors and provides the following commonly used collective algorithms: boardcast (MPI_Bcast), allreduce (MPI_Allreduce), reduce (MPI_Reduce), gather (MPI_Gather), allgather (MPI_Allgather), alltoall (MPI_Alltoall), and barrier (MPI_Barrier). -At present, “acoll” has been tested with OpenMPI v5.0.2 and can be built as part of OpenMPI. +At present, “acoll” has been tested with OpenMPI main branch and can be built as part of OpenMPI. To run an application with acoll, use the following command line parameters - mpirun --mca coll acoll,tuned,libnbc,basic --mca coll_acoll_priority 40 diff --git a/ompi/mca/coll/acoll/coll_acoll.h b/ompi/mca/coll/acoll/coll_acoll.h index e5ec8a381a8..8c38f1cd057 100644 --- a/ompi/mca/coll/acoll/coll_acoll.h +++ b/ompi/mca/coll/acoll/coll_acoll.h @@ -26,6 +26,7 @@ #include #endif +#include "opal/mca/accelerator/accelerator.h" #include "opal/mca/shmem/base/base.h" #include "opal/mca/shmem/shmem.h" @@ -40,6 +41,7 @@ extern int mca_coll_acoll_sg_scale; extern int mca_coll_acoll_node_size; extern int mca_coll_acoll_force_numa; extern int mca_coll_acoll_use_dynamic_rules; +extern int mca_coll_acoll_disable_shmbcast; extern int mca_coll_acoll_mnode_enable; extern int mca_coll_acoll_bcast_lin0; extern int mca_coll_acoll_bcast_lin1; @@ -201,6 +203,7 @@ typedef struct coll_acoll_subcomms { coll_acoll_data_t *data; bool initialized_data; bool initialized_shm_data; + int barrier_algo; #ifdef HAVE_XPMEM_H uint64_t xpmem_buf_size; int without_xpmem; @@ -233,6 +236,7 @@ struct mca_coll_acoll_module_t { int log2_node_cnt; int force_numa; int use_dyn_rules; + int disable_shmbcast; // Todo: Use substructure for every API related ones int use_mnode; int use_lin0; diff --git a/ompi/mca/coll/acoll/coll_acoll_allgather.c b/ompi/mca/coll/acoll/coll_acoll_allgather.c index 3fc2167193f..d814d4d0a18 100644 --- a/ompi/mca/coll/acoll/coll_acoll_allgather.c +++ b/ompi/mca/coll/acoll/coll_acoll_allgather.c @@ -29,7 +29,7 @@ static inline int log_sg_bcast_intra(void *buff, size_t count, struct ompi_datat mca_coll_base_module_t *module, ompi_request_t **preq, int *nreqs) { - int msb_pos, sub_rank, peer, err; + int msb_pos, sub_rank, peer, err = MPI_SUCCESS; int i, mask; int end_sg, end_peer; @@ -92,7 +92,7 @@ static inline int lin_sg_bcast_intra(void *buff, size_t count, struct ompi_datat int *nreqs) { int peer; - int err; + int err = MPI_SUCCESS; int sg_end; sg_end = sg_start + sg_size - 1; diff --git a/ompi/mca/coll/acoll/coll_acoll_allreduce.c b/ompi/mca/coll/acoll/coll_acoll_allreduce.c index 79ef9c4807e..173422845ee 100644 --- a/ompi/mca/coll/acoll/coll_acoll_allreduce.c +++ b/ompi/mca/coll/acoll/coll_acoll_allreduce.c @@ -450,7 +450,21 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, ompi_datatype_type_size(dtype, &dsize); total_dsize = dsize * count; - if (1 == size) { + /* Disable shm/xpmem based optimizations if: */ + /* - datatype is not a predefined type */ + /* - it's a gpu buffer */ + uint64_t flags = 0; + int dev_id; + bool is_opt = true; + if (!OMPI_COMM_CHECK_ASSERT_NO_ACCEL_BUF(comm)) { + if (!ompi_datatype_is_predefined(dtype) + || (0 < opal_accelerator.check_addr(sbuf, &dev_id, &flags)) + || (0 < opal_accelerator.check_addr(rbuf, &dev_id, &flags))) { + is_opt = false; + } + } + + if ((1 == size) && is_opt) { if (MPI_IN_PLACE != sbuf) { memcpy((char *) rbuf, sbuf, total_dsize); } @@ -486,7 +500,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, if (total_dsize < 32) { return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op, comm, module); - } else if (total_dsize < 512) { + } else if ((total_dsize < 512) && is_opt) { return mca_coll_acoll_allreduce_small_msgs_h(sbuf, rbuf, count, dtype, op, comm, module, subc, 1); } else if (total_dsize <= 2048) { @@ -505,7 +519,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, } } else if (total_dsize < 4194304) { #ifdef HAVE_XPMEM_H - if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) { + if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) { return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc); } else { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, @@ -517,7 +531,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, #endif } else if (total_dsize <= 16777216) { #ifdef HAVE_XPMEM_H - if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) { + if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) { mca_coll_acoll_reduce_xpmem_h(sbuf, rbuf, count, dtype, op, comm, module, subc); return mca_coll_acoll_bcast(rbuf, count, dtype, 0, comm, module); } else { @@ -530,7 +544,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, #endif } else { #ifdef HAVE_XPMEM_H - if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) { + if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) { return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc); } else { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, diff --git a/ompi/mca/coll/acoll/coll_acoll_alltoall.c b/ompi/mca/coll/acoll/coll_acoll_alltoall.c index ab58a38d503..2d430eb37cb 100644 --- a/ompi/mca/coll/acoll/coll_acoll_alltoall.c +++ b/ompi/mca/coll/acoll/coll_acoll_alltoall.c @@ -529,14 +529,8 @@ int mca_coll_acoll_alltoall struct ompi_communicator_t *split_comm; /* Select the right split_comm. */ - int pow2_idx = -2; - int tmp_grp_split_f = grp_split_f; - while (tmp_grp_split_f > 0) - { - pow2_idx += 1; - tmp_grp_split_f = tmp_grp_split_f / 2; - } - split_comm = subc->split_comm[pow2_idx]; + int comm_idx = grp_split_f > 2 ? opal_cube_dim(grp_split_f/2) : 0; + split_comm = subc->split_comm[comm_idx]; error = mca_coll_acoll_base_alltoall_dispatcher (sbuf, (grp_split_f * scount), sdtype, diff --git a/ompi/mca/coll/acoll/coll_acoll_barrier.c b/ompi/mca/coll/acoll/coll_acoll_barrier.c index d57db48b91f..2398053191b 100644 --- a/ompi/mca/coll/acoll/coll_acoll_barrier.c +++ b/ompi/mca/coll/acoll/coll_acoll_barrier.c @@ -22,6 +22,13 @@ #include "coll_acoll.h" #include "coll_acoll_utils.h" + + +#define PROGRESS_COUNT 10000 + +int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc); +int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc); + static int mca_coll_acoll_barrier_recv_subc(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, ompi_request_t **reqs, int *nreqs, int root) @@ -106,6 +113,170 @@ static int mca_coll_acoll_barrier_send_subc(struct ompi_communicator_t *comm, return err; } +int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc) +{ + int err = MPI_SUCCESS; + int root = 0; + int rank = ompi_comm_rank(comm); + int size = ompi_comm_size(comm); + mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; + coll_acoll_init(module, comm, subc->data, subc, root); + coll_acoll_data_t *data = subc->data; + + if (NULL == data) { + return -1; + } + + int l1_gp_size = data->l1_gp_size; + int *l1_gp = data->l1_gp; + int *l2_gp = data->l2_gp; + int l2_gp_size = data->l2_gp_size; + /* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */ + int offset_barrier = LEADER_SHM_SIZE + 2 * CACHE_LINE_SIZE * size + PER_RANK_SHM_SIZE * size + + CACHE_LINE_SIZE * size; + + volatile int *root_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + + CACHE_LINE_SIZE * rank); + volatile int *l1_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + + offset_barrier + CACHE_LINE_SIZE * rank); + + volatile int *leader_shm; + volatile int *my_leader_shm; + leader_shm = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + + CACHE_LINE_SIZE * root); + my_leader_shm = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier + + CACHE_LINE_SIZE * l1_gp[0]); + int ready; + int count = 0; + if (rank == root) { + ready = *leader_shm; + for (int i = 0; i < l2_gp_size; i++) { + if (l2_gp[i] == root) + continue; + volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + + CACHE_LINE_SIZE * l2_gp[i]); + while (*val != ready + 1) { + count++; + if (count == PROGRESS_COUNT) { + count = 0; + opal_progress(); + } + } + } + ready++; + for (int i = 0; i < l1_gp_size; i++) { + if (l1_gp[i] == root) + continue; + volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + + CACHE_LINE_SIZE * l1_gp[i]); + while (*val != ready) { + count++; + if (count == PROGRESS_COUNT) { + count = 0; + opal_progress(); + } + } + } + *leader_shm = ready; + } else if (rank == l1_gp[0]) { + int val = *l1_rank_offset; + for (int i = 0; i < l1_gp_size; i++) { + if (l1_gp[i] == l1_gp[0]) + continue; + volatile int *vali = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier + + CACHE_LINE_SIZE + * l1_gp[i]); // do we need atomic_load here? + while (*vali != val + 1) { + count++; + if (PROGRESS_COUNT == count) { + count = 0; + opal_progress(); + } + } + } + val++; + *root_rank_offset = val; + while (*leader_shm != val) { + count++; + if (PROGRESS_COUNT == count) { + count = 0; + opal_progress(); + } + } + *l1_rank_offset = val; + } else { + + int done = *l1_rank_offset; + done++; + *l1_rank_offset = done; + while (done != *my_leader_shm) { + count++; + if (10000 == count) { + count = 0; + opal_progress(); + } + } + } + return err; +} + + +int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc) +{ + int err = MPI_SUCCESS; + int root = 0; + int rank = ompi_comm_rank(comm); + int size = ompi_comm_size(comm); + mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; + + coll_acoll_init(module, comm, subc->data, subc, root); + coll_acoll_data_t *data = subc->data; + + if (NULL == data) { + return -1; + } + + /* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */ + int offset_barrier = LEADER_SHM_SIZE + 2 * CACHE_LINE_SIZE * size + PER_RANK_SHM_SIZE * size + + CACHE_LINE_SIZE * size; + + volatile int *root_rank_offset = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + + CACHE_LINE_SIZE * rank); + + volatile int *leader_shm; + leader_shm = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + + CACHE_LINE_SIZE * root); + + int ready = *leader_shm; + int count = 0; + if (rank == root) { + for (int i = 0; i < size; i++) { + if (i == root) + continue; + volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier + + CACHE_LINE_SIZE * i); + while (*val != ready + 1) { + count++; + if (count == PROGRESS_COUNT) { + count = 0; + opal_progress(); + } + } + } + (*leader_shm)++; + } else { + int val = ++(*root_rank_offset); + while (*leader_shm != val) { + count++; + if (PROGRESS_COUNT == count) { + count = 0; + opal_progress(); + } + } + } + return err; +} + /* * mca_coll_acoll_barrier_intra * @@ -152,6 +323,16 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base } num_nodes = size > 1 ? subc->num_nodes : 1; + /* Default barrier for intra-node case - shared memory hierarchical */ + /* ToDo: Need to check how this works with inter-case */ + if (1 == num_nodes) { + if (0 == subc->barrier_algo) { + return mca_coll_acoll_barrier_shm_h(comm, module, subc); + } else if (1 == subc->barrier_algo) { + return mca_coll_acoll_barrier_shm_f(comm, module, subc); + } + } + reqs = ompi_coll_base_comm_get_reqs(module->base_data, size); if (NULL == reqs) { return OMPI_ERR_OUT_OF_RESOURCE; diff --git a/ompi/mca/coll/acoll/coll_acoll_bcast.c b/ompi/mca/coll/acoll/coll_acoll_bcast.c index 9a9d447a167..2b06692fe98 100644 --- a/ompi/mca/coll/acoll/coll_acoll_bcast.c +++ b/ompi/mca/coll/acoll/coll_acoll_bcast.c @@ -24,6 +24,8 @@ typedef int (*bcast_subc_func)(void *buff, size_t count, struct ompi_datatype_t *datatype, int root, struct ompi_communicator_t *comm, ompi_request_t **preq, int *nreqs, int world_rank); +int mca_coll_acoll_bcast_shm(void *buff, size_t count, struct ompi_datatype_t *dtype, int root, + struct ompi_communicator_t *comm, mca_coll_base_module_t *module); /* * bcast_binomial @@ -127,7 +129,7 @@ static int bcast_flat_tree(void *buff, size_t count, struct ompi_datatype_t *dat static inline void coll_bcast_decision_fixed(int size, size_t total_dsize, int node_size, int *sg_cnt, int *use_0, int *use_numa, - int *use_socket, int *lin_0, + int *use_socket, int *use_shm, int *lin_0, int *lin_1, int *lin_2, int num_nodes, mca_coll_acoll_module_t *acoll_module, coll_acoll_subcomms_t *subc) @@ -137,7 +139,12 @@ static inline void coll_bcast_decision_fixed(int size, size_t total_dsize, int n *lin_0 = 0; *use_numa = 0; *use_socket = 0; + *use_shm = 0; if (size <= node_size) { + if (total_dsize <= 8192 && size >= 16 && !acoll_module->disable_shmbcast) { + *use_shm = 1; + return; + } if (acoll_module->use_dyn_rules) { *sg_cnt = (acoll_module->mnode_sg_size == acoll_module->sg_cnt) ? acoll_module->sg_cnt : node_size; *use_0 = 0; @@ -237,12 +244,19 @@ static inline void coll_bcast_decision_fixed(int size, size_t total_dsize, int n *sg_cnt = sg_size; if (2 == num_nodes) { SET_BCAST_PARAMS(1, 1, 1) - *use_socket = 1; - *use_numa = (total_dsize <= 2097152) ? 0 : 1; + if (total_dsize <= 8192) { + *use_shm = 1; + } else { + *use_socket = 1; + *use_numa = (total_dsize <= 2097152) ? 0 : 1; + } } else if (num_nodes <= 4) { - if (total_dsize <= 512) { + if (total_dsize <= 64) { *use_socket = 1; SET_BCAST_PARAMS(1, 1, 0) + } else if (total_dsize <= 512) { + *use_shm = 1; + SET_BCAST_PARAMS(1, 1, 0) } else if (total_dsize <= 2097152) { *use_socket = 1; SET_BCAST_PARAMS(1, 1, 1) @@ -253,7 +267,9 @@ static inline void coll_bcast_decision_fixed(int size, size_t total_dsize, int n } } else if (num_nodes <= 6) { SET_BCAST_PARAMS(1, 1, 1) - if (total_dsize <= 524288) { + if (total_dsize <= 4096) { + *use_shm = 1; + } else if (total_dsize <= 524288) { *use_socket = 1; } else { *use_numa = 1; @@ -261,7 +277,7 @@ static inline void coll_bcast_decision_fixed(int size, size_t total_dsize, int n } else if (num_nodes <= 8) { SET_BCAST_PARAMS(1, 1, 1) if (total_dsize <= 8192) { - *use_numa = 0; + *use_shm = 1; } else { *use_numa = 1; } @@ -294,6 +310,9 @@ static inline void coll_bcast_decision_fixed(int size, size_t total_dsize, int n if (-1 != acoll_module->use_socket) { *use_socket = acoll_module->use_socket; } + if (1 == acoll_module->disable_shmbcast) { + *use_shm = 0; + } } static inline void coll_acoll_bcast_subcomms(struct ompi_communicator_t *comm, @@ -353,7 +372,7 @@ static int mca_coll_acoll_bcast_intra_node(void *buff, size_t count, struct ompi coll_acoll_subcomms_t *subc, struct ompi_communicator_t **subcomms, int *subc_roots, int lin_1, int lin_2, int no_sg, int use_numa, - int use_socket, int world_rank) + int use_socket, int use_shm, int world_rank) { int size; int rank; @@ -368,6 +387,9 @@ static int mca_coll_acoll_bcast_intra_node(void *buff, size_t count, struct ompi rank = ompi_comm_rank(comm); size = ompi_comm_size(comm); + if (use_shm && subc_roots[MCA_COLL_ACOLL_INTRA] == 0 && !use_socket) { + return mca_coll_acoll_bcast_shm(buff, count, datatype, 0, comm, module); + } reqs = ompi_coll_base_comm_get_reqs(module->base_data, size); if (NULL == reqs) { return OMPI_ERR_OUT_OF_RESOURCE; @@ -427,6 +449,152 @@ static int mca_coll_acoll_bcast_intra_node(void *buff, size_t count, struct ompi return err; } +/* + * mca_coll_acoll_bcast_shm + * + * Function: Broadcast operation for small messages ( <= 8K) using shared memory + * Accepts: Same arguments as MPI_Bcast() + * Returns: MPI_SUCCESS or error code + * + * Description: Broadcast is performed across and within subgroups. + * + * Memory: Additional memory is allocated for group leaders + * (around 2MB for comm size of 256). + */ +// 0) all flags are initialized to 0 and increment with each bcast call +// 1) root sets the ready flag and waits for +// - all "done" from l2 members +// - all "done" from its l1 members +// 2) l2 members wait on root's ready flag +// 3) l1 members wait on l1 leader's ready flag + +int mca_coll_acoll_bcast_shm(void *buff, size_t count, struct ompi_datatype_t *dtype, int root, + struct ompi_communicator_t *comm, mca_coll_base_module_t *module) +{ + size_t dsize; + int err = MPI_SUCCESS; + int rank = ompi_comm_rank(comm); + int size = ompi_comm_size(comm); + mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; + coll_acoll_subcomms_t *subc = NULL; + + err = check_and_create_subc(comm, acoll_module, &subc); + if (!subc->initialized) { + err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, root); + if (MPI_SUCCESS != err) { + return err; + } + } + coll_acoll_init(module, comm, subc->data, subc, root); + coll_acoll_data_t *data = subc->data; + + if (NULL == data) { + return -1; + } + ompi_datatype_type_size(dtype, &dsize); + + int l1_gp_size = data->l1_gp_size; + int *l1_gp = data->l1_gp; + int *l2_gp = data->l2_gp; + int l2_gp_size = data->l2_gp_size; + /* 16 * 1024 + 2 * 64 * size + 8 * 1024 * size */ + int offset_bcast = LEADER_SHM_SIZE + 2*CACHE_LINE_SIZE*size + PER_RANK_SHM_SIZE*size; + + volatile int *leader_shm; + if (rank == l1_gp[0]) { + leader_shm = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_bcast + CACHE_LINE_SIZE * root); + } else { + leader_shm = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_bcast + + CACHE_LINE_SIZE * l1_gp[0]); + } + + /* + * 0) all flags are initialized to 0 and increment with each bcast call + * 1) root sets the ready flag and waits for + * - all "done" from l2 members + * - all "done" from its l1 members + * 2) l2 members wait on root's ready flag + * - copy data from root to its buffer + * - increment its ready flag + * - wait for all l1 members to finish + * 3) l1 members wait on l1 leader's ready flag + * - copy data from l1 leader's buffer to its buffer + * - increment its ready flag + */ + int ready; + if (rank == root) { + memcpy((char *) data->allshmmmap_sbuf[root], buff, count * dsize); + ready = __atomic_load_n(leader_shm, __ATOMIC_RELAXED); // we don't need atomic hear! + ready++; + __atomic_store_n(leader_shm, ready, __ATOMIC_RELAXED); + for (int i = 0; i < l2_gp_size; i++) { + if (l2_gp[i] == root) + continue; + volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_bcast + + CACHE_LINE_SIZE * l2_gp[i]); + while (*val != ready) { + ; + } + } + for (int i = 0; i < l1_gp_size; i++) { + if (l1_gp[i] == root) + continue; + volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_bcast + + CACHE_LINE_SIZE * l1_gp[i]); + while (*val != ready) { + ; + } + } + } else if (rank == l1_gp[0]) { + volatile int leader_ready = __atomic_load_n(leader_shm, __ATOMIC_RELAXED); + int done = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[root] + offset_bcast + + CACHE_LINE_SIZE * rank), + __ATOMIC_RELAXED); + while (done == leader_ready) { + leader_ready = __atomic_load_n(leader_shm, __ATOMIC_RELAXED); + } + memcpy(buff, (char *) data->allshmmmap_sbuf[root], count * dsize); + memcpy((char *) data->allshmmmap_sbuf[rank], (char *) data->allshmmmap_sbuf[root], + count * dsize); + int val = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[rank] + offset_bcast + + CACHE_LINE_SIZE * rank), + __ATOMIC_RELAXED); // do we need atomic load? + val++; + int local_val = val; + __atomic_store_n((int *) ((char *) data->allshmmmap_sbuf[root] + offset_bcast + CACHE_LINE_SIZE * rank), + val, __ATOMIC_RELAXED); // do we need atomic store? + __atomic_store_n((int *) ((char *) data->allshmmmap_sbuf[rank] + offset_bcast + CACHE_LINE_SIZE * rank), + val, __ATOMIC_RELAXED); // do we need atomic store? + // do we need wmb() here? + for (int i = 0; i < l1_gp_size; i++) { + if (l1_gp[i] == l1_gp[0]) + continue; + volatile int *vali = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_bcast + + CACHE_LINE_SIZE * l1_gp[i]); // do we need atomic_load here? + while (*vali != local_val) { + ; // can we use a more specific condition than "!=" ? + } + } + } else { + int done = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_bcast + + CACHE_LINE_SIZE * rank), + __ATOMIC_RELAXED); + while (done == *leader_shm) { + ; + } + memcpy(buff, (char *) data->allshmmmap_sbuf[l1_gp[0]], count * dsize); + int val = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_bcast + + CACHE_LINE_SIZE * rank), + __ATOMIC_RELAXED); // do we need atomic load? + val++; + __atomic_store_n((int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_bcast + + CACHE_LINE_SIZE * rank), + val, __ATOMIC_RELAXED); // do we need atomic store? + // do we need wmb() here? + } + return err; +} + /* * mca_coll_acoll_bcast * @@ -455,7 +623,7 @@ int mca_coll_acoll_bcast(void *buff, size_t count, struct ompi_datatype_t *datat int num_nodes; int use_0 = 0; int lin_0 = 0, lin_1 = 0, lin_2 = 0; - int use_numa = 0, use_socket = 0; + int use_numa = 0, use_socket = 0, use_shm = 0; int no_sg; size_t total_dsize, dsize; mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; @@ -511,12 +679,25 @@ int mca_coll_acoll_bcast(void *buff, size_t count, struct ompi_datatype_t *datat /* sg_cnt determines subgroup based communication */ /* lin_1 and lin_2 indicate whether to use linear or log based sends/receives across and within subgroups respectively. */ - coll_bcast_decision_fixed(size, total_dsize, node_size, &sg_cnt, &use_0, &use_numa, &use_socket, &lin_0, + coll_bcast_decision_fixed(size, total_dsize, node_size, &sg_cnt, &use_0, + &use_numa, &use_socket, &use_shm, &lin_0, &lin_1, &lin_2, num_nodes, acoll_module, subc); no_sg = (sg_cnt == node_size) ? 1 : 0; if (size <= 2) no_sg = 1; + /* Disable shm based bcast if: */ + /* - datatype is not a predefined type */ + /* - it's a gpu buffer */ + uint64_t flags = 0; + int dev_id; + if (!OMPI_COMM_CHECK_ASSERT_NO_ACCEL_BUF(comm)) { + if (!ompi_datatype_is_predefined(datatype) + || (0 < opal_accelerator.check_addr(buff, &dev_id, &flags))) { + use_shm = 0; + } + } + coll_acoll_bcast_subcomms(comm, subc, subcomms, subc_roots, root, num_nodes, use_0, no_sg, use_numa, use_socket); @@ -549,7 +730,7 @@ int mca_coll_acoll_bcast(void *buff, size_t count, struct ompi_datatype_t *datat } err = mca_coll_acoll_bcast_intra_node(buff, count, datatype, module, subc, subcomms, subc_roots, - lin_1, lin_2, no_sg, use_numa, use_socket, rank); + lin_1, lin_2, no_sg, use_numa, use_socket, use_shm, rank); if (MPI_SUCCESS != err) { ompi_coll_base_free_reqs(reqs, nreqs); diff --git a/ompi/mca/coll/acoll/coll_acoll_component.c b/ompi/mca/coll/acoll/coll_acoll_component.c index d3c8d2469b1..eb16c68eeb9 100644 --- a/ompi/mca/coll/acoll/coll_acoll_component.c +++ b/ompi/mca/coll/acoll/coll_acoll_component.c @@ -31,6 +31,7 @@ int mca_coll_acoll_sg_scale = 1; int mca_coll_acoll_node_size = 128; int mca_coll_acoll_force_numa = -1; int mca_coll_acoll_use_dynamic_rules = 0; +int mca_coll_acoll_disable_shmbcast = 0; int mca_coll_acoll_mnode_enable = 1; int mca_coll_acoll_bcast_lin0 = 0; int mca_coll_acoll_bcast_lin1 = 0; @@ -49,6 +50,9 @@ size_t mca_coll_acoll_alltoall_xpmem_msg_thres = 0; /* By default utilize xpmem based algorithms applicable when built with xpmem. */ int mca_coll_acoll_without_xpmem = 0; int mca_coll_acoll_xpmem_use_sr_buf = 1; +/* Default barrier algorithm - hierarchical algorithm using shared memory */ +/* ToDo: check how this works with inter-node*/ +int mca_coll_acoll_barrier_algo = 0; /* * Local function @@ -128,6 +132,11 @@ static int acoll_register(void) MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_use_dynamic_rules); + (void) mca_base_component_var_register(&mca_coll_acoll_component.collm_version, "disable_shmbcast", + "Disable shared memory bcast for multinode cases", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_acoll_disable_shmbcast); (void) mca_base_component_var_register(&mca_coll_acoll_component.collm_version, "mnode_enable", "Enable separate algorithm for multinode cases", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, @@ -176,6 +185,11 @@ static int acoll_register(void) "memory inside collective algorithms.", MCA_BASE_VAR_TYPE_UINT64_T, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_reserve_memory_size_for_algo); + (void) mca_base_component_var_register( + &mca_coll_acoll_component.collm_version, "barrier_algo", + "Selection of different barrier algorithms ", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_acoll_barrier_algo); (void) mca_base_component_var_register( &mca_coll_acoll_component.collm_version, "without_xpmem", "By default, xpmem-based algorithms are used when applicable. " diff --git a/ompi/mca/coll/acoll/coll_acoll_module.c b/ompi/mca/coll/acoll/coll_acoll_module.c index 697f47bb8d0..45c1890a4f3 100644 --- a/ompi/mca/coll/acoll/coll_acoll_module.c +++ b/ompi/mca/coll/acoll/coll_acoll_module.c @@ -134,6 +134,7 @@ mca_coll_base_module_t *mca_coll_acoll_comm_query(struct ompi_communicator_t *co acoll_module->force_numa = mca_coll_acoll_force_numa; acoll_module->use_dyn_rules = mca_coll_acoll_use_dynamic_rules; + acoll_module->disable_shmbcast = mca_coll_acoll_disable_shmbcast; acoll_module->use_mnode = mca_coll_acoll_mnode_enable; /* Value of 0 is currently unsupported for mnode_enable */ acoll_module->use_mnode = 1; diff --git a/ompi/mca/coll/acoll/coll_acoll_reduce.c b/ompi/mca/coll/acoll/coll_acoll_reduce.c index 32442fc4889..c79db9525a1 100644 --- a/ompi/mca/coll/acoll/coll_acoll_reduce.c +++ b/ompi/mca/coll/acoll/coll_acoll_reduce.c @@ -119,7 +119,7 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co MCA_COLL_BASE_TAG_REDUCE, MCA_PML_BASE_SEND_STANDARD, subc->base_comm[ind1][ind2])); if (ret != MPI_SUCCESS) { - free(pml_buffer); + free(free_buffer); if (NULL != tmp_rbuf) { coll_acoll_buf_free(reserve_mem_rbuf_reduce, tmp_rbuf); } @@ -134,7 +134,7 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co ret = MCA_PML_CALL(recv(pml_buffer, count, dtype, i, MCA_COLL_BASE_TAG_REDUCE, subc->base_comm[ind1][ind2], MPI_STATUS_IGNORE)); if (ret != MPI_SUCCESS) { - free(pml_buffer); + free(free_buffer); return ret; } ompi_op_reduce(op, pml_buffer, rbuf, count, dtype); @@ -143,8 +143,8 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co } /* if local root, reduce at root */ - if (is_base && (sz > 1)) { - free(pml_buffer); + if (is_base) { + free(free_buffer); if (rank != root && NULL != tmp_rbuf) { coll_acoll_buf_free(reserve_mem_rbuf_reduce, tmp_rbuf); } @@ -329,6 +329,20 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, module, 0, 0); } + /* Disable shm/xpmem based optimizations if: */ + /* - datatype is not a predefined type */ + /* - it's a gpu buffer */ + uint64_t flags = 0; + int dev_id; + bool is_opt = true; + if (!OMPI_COMM_CHECK_ASSERT_NO_ACCEL_BUF(comm)) { + if (!ompi_datatype_is_predefined(dtype) + || (0 < opal_accelerator.check_addr(sbuf, &dev_id, &flags)) + || (0 < opal_accelerator.check_addr(rbuf, &dev_id, &flags))) { + is_opt = false; + } + } + ompi_datatype_type_size(dtype, &dsize); total_dsize = dsize * count; @@ -374,7 +388,7 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, && (acoll_module->reserve_mem_s).reserve_mem_allocate && ((acoll_module->reserve_mem_s).reserve_mem_size >= total_dsize)) || ((0 == subc->xpmem_use_sr_buf) && (subc->xpmem_buf_size > 2 * total_dsize))) - && (subc->without_xpmem != 1)) { + && (subc->without_xpmem != 1) && is_opt) { return mca_coll_acoll_reduce_xpmem(sbuf, rbuf, count, dtype, op, root, comm, module, subc); } else { diff --git a/ompi/mca/coll/acoll/coll_acoll_utils.h b/ompi/mca/coll/acoll/coll_acoll_utils.h index 20b1f26df42..96855a71947 100644 --- a/ompi/mca/coll/acoll/coll_acoll_utils.h +++ b/ompi/mca/coll/acoll/coll_acoll_utils.h @@ -33,6 +33,7 @@ extern uint64_t mca_coll_acoll_xpmem_buffer_size; extern int mca_coll_acoll_without_xpmem; extern int mca_coll_acoll_xpmem_use_sr_buf; +extern int mca_coll_acoll_barrier_algo; /* Function to allocate scratch buffer */ @@ -100,13 +101,15 @@ static inline int check_and_create_subc(ompi_communicator_t *comm, if (NULL == acoll_module->subc) { return OMPI_ERR_OUT_OF_RESOURCE; } - } - - /* Check if subcomms structure is already created for the communicator */ - for (int i = 0; i < num_subc; i++) { - if (acoll_module->subc[i]->cid == cid) { - *subc_ptr = acoll_module->subc[i]; - return MPI_SUCCESS; + } else { + /* Check if subcomms structure is already created for the communicator */ + for (int i = 0; i < num_subc; i++) { + if (NULL != acoll_module->subc[i]) { + if (acoll_module->subc[i]->cid == cid) { + *subc_ptr = acoll_module->subc[i]; + return MPI_SUCCESS; + } + } } } @@ -165,6 +168,7 @@ static inline int check_and_create_subc(ompi_communicator_t *comm, subc->initialized_data = false; subc->initialized_shm_data = false; subc->data = NULL; + subc->barrier_algo = mca_coll_acoll_barrier_algo; #ifdef HAVE_XPMEM_H subc->xpmem_buf_size = mca_coll_acoll_xpmem_buffer_size; subc->without_xpmem = mca_coll_acoll_without_xpmem; @@ -942,7 +946,7 @@ static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communica /* Assuming cacheline size is 64 */ long memsize = (LEADER_SHM_SIZE /* scratch leader */ + CACHE_LINE_SIZE * size /* sync variables l1 group*/ - + CACHE_LINE_SIZE * size /* sync variables l2 group*/ + PER_RANK_SHM_SIZE * size /*data from ranks*/); + + CACHE_LINE_SIZE * size /* sync variables l2 group*/ + PER_RANK_SHM_SIZE * size /*data from ranks*/ + 2 * CACHE_LINE_SIZE * size /* sync variables for bcast and barrier*/); ret = opal_shmem_segment_create(&seg_ds, shfn, memsize); free(shfn); } @@ -971,7 +975,19 @@ static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communica data->allshmmmap_sbuf[root] = opal_shmem_segment_attach(&data->allshmseg_id[0]); int offset = LEADER_SHM_SIZE; - memset(((char *) data->allshmmmap_sbuf[data->l1_gp[0]]) + offset + CACHE_LINE_SIZE * rank, 0, CACHE_LINE_SIZE); + memset(((char *) data->allshmmmap_sbuf[data->l1_gp[0]]) + offset + CACHE_LINE_SIZE * rank, 0, + CACHE_LINE_SIZE); + int offset_bcast = LEADER_SHM_SIZE + 2 * CACHE_LINE_SIZE * size + PER_RANK_SHM_SIZE * size; + int offset_barrier = offset_bcast + CACHE_LINE_SIZE * size; + memset(((char *) data->allshmmmap_sbuf[data->l1_gp[0]]) + + offset_bcast /*16K + 16k + 16k + 2M */ + CACHE_LINE_SIZE * rank, + 0, CACHE_LINE_SIZE); + memset(((char *) data->allshmmmap_sbuf[data->l1_gp[0]]) + + offset_barrier /*16K + 16k + 16k + 2M + 16k*/ + CACHE_LINE_SIZE * rank, + 0, CACHE_LINE_SIZE); + memset(((char *) data->allshmmmap_sbuf[root]) + + offset_barrier /*16K + 16k + 16k + 2M + 16k*/ + CACHE_LINE_SIZE * rank, + 0, CACHE_LINE_SIZE); if (data->l1_gp[0] == rank) { memset(((char *) data->allshmmmap_sbuf[data->l2_gp[0]]) + (offset + CACHE_LINE_SIZE * size) + CACHE_LINE_SIZE * rank, 0, CACHE_LINE_SIZE);