From 81bd1b97815312a3325436483cbbf87650bc37ff Mon Sep 17 00:00:00 2001 From: Manu Shantharam Date: Tue, 7 Oct 2025 13:47:19 -0700 Subject: [PATCH] Updates to the acoll component This commit has the following updates: - converts the xpmem-based collectives to SMSC-based for collective operations. - fixes coverity issues - misc bug-fixes Signed-off-by: Manu Shantharam --- ompi/mca/coll/acoll/coll_acoll.h | 46 ++-- ompi/mca/coll/acoll/coll_acoll_allgather.c | 6 + ompi/mca/coll/acoll/coll_acoll_allreduce.c | 90 +++--- ompi/mca/coll/acoll/coll_acoll_component.c | 98 ++----- ompi/mca/coll/acoll/coll_acoll_module.c | 13 +- ompi/mca/coll/acoll/coll_acoll_reduce.c | 52 ++-- ompi/mca/coll/acoll/coll_acoll_utils.h | 303 ++++++++------------- 7 files changed, 249 insertions(+), 359 deletions(-) diff --git a/ompi/mca/coll/acoll/coll_acoll.h b/ompi/mca/coll/acoll/coll_acoll.h index 8c38f1cd057..2aaa7537c1b 100644 --- a/ompi/mca/coll/acoll/coll_acoll.h +++ b/ompi/mca/coll/acoll/coll_acoll.h @@ -20,22 +20,20 @@ #include "ompi/mca/mca.h" #include "ompi/request/request.h" -#ifdef HAVE_XPMEM_H -#include "opal/mca/rcache/base/base.h" -#include "opal/class/opal_hash_table.h" -#include -#endif - #include "opal/mca/accelerator/accelerator.h" #include "opal/mca/shmem/base/base.h" #include "opal/mca/shmem/shmem.h" +// For smsc +#include "opal/mca/smsc/smsc.h" + BEGIN_C_DECLS /* Globally exported variables */ OMPI_DECLSPEC extern const mca_coll_base_component_3_0_0_t mca_coll_acoll_component; extern int mca_coll_acoll_priority; extern int mca_coll_acoll_max_comms; +extern int mca_coll_acoll_comm_size_thresh; extern int mca_coll_acoll_sg_size; extern int mca_coll_acoll_sg_scale; extern int mca_coll_acoll_node_size; @@ -127,20 +125,22 @@ typedef enum MCA_COLL_ACOLL_BASE_LYRS { MCA_COLL_ACOLL_NUM_BASE_LYRS } MCA_COLL_ACOLL_BASE_LYRS; +typedef struct coll_acoll_smsc_info { + mca_smsc_endpoint_t **ep; + void **rreg; + void **sreg; +} coll_acoll_smsc_info_t; + typedef struct coll_acoll_data { -#ifdef HAVE_XPMEM_H - xpmem_segid_t *allseg_id; - xpmem_apid_t *all_apid; void **allshm_sbuf; void **allshm_rbuf; - void **xpmem_saddr; - void **xpmem_raddr; - mca_rcache_base_module_t **rcache; void *scratch; - opal_hash_table_t **xpmem_reg_tracker_ht; -#endif + void **smsc_saddr; + void **smsc_raddr; + opal_shmem_ds_t *allshmseg_id; void **allshmmmap_sbuf; + coll_acoll_smsc_info_t smsc_info; int comm_size; int l1_local_rank; @@ -204,11 +204,9 @@ typedef struct coll_acoll_subcomms { bool initialized_data; bool initialized_shm_data; int barrier_algo; -#ifdef HAVE_XPMEM_H - uint64_t xpmem_buf_size; - int without_xpmem; - int xpmem_use_sr_buf; -#endif + uint64_t smsc_buf_size; + int without_smsc; + int smsc_use_sr_buf; } coll_acoll_subcomms_t; @@ -222,7 +220,6 @@ typedef struct coll_acoll_reserve_mem { typedef struct { int split_factor; size_t psplit_msg_thresh; - size_t xpmem_msg_thresh; } coll_acoll_alltoall_attr_t; struct mca_coll_acoll_module_t { @@ -252,15 +249,10 @@ struct mca_coll_acoll_module_t { coll_acoll_reserve_mem_t reserve_mem_s; int num_subc; coll_acoll_alltoall_attr_t alltoall_attr; + // 1 if SMSC, in particular xpmem is available, 0 otherwise + int has_smsc; }; -#ifdef HAVE_XPMEM_H -struct acoll_xpmem_rcache_reg_t { - mca_rcache_base_registration_t base; - void *xpmem_vaddr; -}; -#endif - typedef struct mca_coll_acoll_module_t mca_coll_acoll_module_t; OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_coll_acoll_module_t); diff --git a/ompi/mca/coll/acoll/coll_acoll_allgather.c b/ompi/mca/coll/acoll/coll_acoll_allgather.c index d814d4d0a18..8ae0aa0c9fe 100644 --- a/ompi/mca/coll/acoll/coll_acoll_allgather.c +++ b/ompi/mca/coll/acoll/coll_acoll_allgather.c @@ -537,6 +537,9 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty /* Return if intra-node communicator */ if ((1 == num_nodes) || (size <= 2)) { + /* Call barrier to ensure that the data is copied properly before returning */ + ompi_coll_base_barrier_intra_basic_linear(comm, module); + /* All done */ return err; } @@ -620,6 +623,9 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty } } + /* Call barrier to ensure that the data is copied properly before returning */ + ompi_coll_base_barrier_intra_basic_linear(comm, module); + /* All done */ return err; } diff --git a/ompi/mca/coll/acoll/coll_acoll_allreduce.c b/ompi/mca/coll/acoll/coll_acoll_allreduce.c index 173422845ee..133b01af0f9 100644 --- a/ompi/mca/coll/acoll/coll_acoll_allreduce.c +++ b/ompi/mca/coll/acoll/coll_acoll_allreduce.c @@ -49,8 +49,7 @@ static inline int coll_allreduce_decision_fixed(int comm_size, size_t msg_size) return alg; } -#ifdef HAVE_XPMEM_H -static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, size_t count, +static inline int mca_coll_acoll_reduce_smsc_h(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm, mca_coll_base_module_t *module, @@ -79,9 +78,9 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si int l2_local_rank = data->l2_local_rank; char *tmp_sbuf = NULL; char *tmp_rbuf = NULL; - if (!subc->xpmem_use_sr_buf) { + if (!subc->smsc_use_sr_buf) { tmp_rbuf = (char *) data->scratch; - tmp_sbuf = (char *) data->scratch + (subc->xpmem_buf_size) / 2; + tmp_sbuf = (char *) data->scratch + (subc->smsc_buf_size) / 2; if ((MPI_IN_PLACE == sbuf)) { memcpy(tmp_sbuf, rbuf, total_dsize); } else { @@ -112,7 +111,10 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si return err; } - register_and_cache(size, total_dsize, rank, data); + err = register_mem_with_smsc(rank, size, total_dsize, data, comm); + if (err != MPI_SUCCESS) { + return err; + } /* reduce to the local group leader */ size_t chunk = count / l1_gp_size; @@ -123,21 +125,21 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si memcpy(tmp_rbuf, sbuf, my_count_size * dsize); for (int i = 1; i < l1_gp_size; i++) { - ompi_op_reduce(op, (char *) data->xpmem_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize, + ompi_op_reduce(op, (char *) data->smsc_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize, (char *) tmp_rbuf + chunk * l1_local_rank * dsize, my_count_size, dtype); } } else { ompi_3buff_op_reduce(op, - (char *) data->xpmem_saddr[l1_gp[0]] + chunk * l1_local_rank * dsize, + (char *) data->smsc_saddr[l1_gp[0]] + chunk * l1_local_rank * dsize, (char *) tmp_sbuf + chunk * l1_local_rank * dsize, - (char *) data->xpmem_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize, + (char *) data->smsc_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize, my_count_size, dtype); for (int i = 1; i < l1_gp_size; i++) { if (i == l1_local_rank) { continue; } - ompi_op_reduce(op, (char *) data->xpmem_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize, - (char *) data->xpmem_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize, + ompi_op_reduce(op, (char *) data->smsc_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize, + (char *) data->smsc_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize, my_count_size, dtype); } } @@ -155,7 +157,7 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si if (0 == l2_local_rank) { for (int i = 1; i < local_size; i++) { - ompi_op_reduce(op, (char *) data->xpmem_raddr[l2_gp[i]], (char *) tmp_rbuf, + ompi_op_reduce(op, (char *) data->smsc_raddr[l2_gp[i]], (char *) tmp_rbuf, my_count_size, dtype); } } else { @@ -165,24 +167,26 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si } ompi_op_reduce(op, - (char *) data->xpmem_raddr[l2_gp[i]] + chunk * l2_local_rank * dsize, - (char *) data->xpmem_raddr[0] + chunk * l2_local_rank * dsize, + (char *) data->smsc_raddr[l2_gp[i]] + chunk * l2_local_rank * dsize, + (char *) data->smsc_raddr[0] + chunk * l2_local_rank * dsize, my_count_size, dtype); } ompi_op_reduce(op, (char *) tmp_rbuf + chunk * l2_local_rank * dsize, - (char *) data->xpmem_raddr[0] + chunk * l2_local_rank * dsize, + (char *) data->smsc_raddr[0] + chunk * l2_local_rank * dsize, my_count_size, dtype); } } err = ompi_coll_base_barrier_intra_tree(comm, module); - if (!subc->xpmem_use_sr_buf) { + if (!subc->smsc_use_sr_buf) { memcpy(rbuf, tmp_rbuf, total_dsize); } + // Note: neither unmap nor deregister will have any effect here, just having it for consistency + unmap_mem_with_smsc(rank, size, data); return err; } -static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf, size_t count, +static inline int mca_coll_acoll_allreduce_smsc_f(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm, @@ -204,9 +208,9 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf, char *tmp_sbuf = NULL; char *tmp_rbuf = NULL; - if (!subc->xpmem_use_sr_buf) { + if (!subc->smsc_use_sr_buf) { tmp_rbuf = (char *) data->scratch; - tmp_sbuf = (char *) data->scratch + (subc->xpmem_buf_size) / 2; + tmp_sbuf = (char *) data->scratch + (subc->smsc_buf_size) / 2; if ((MPI_IN_PLACE == sbuf)) { memcpy(tmp_sbuf, rbuf, total_dsize); } else { @@ -238,7 +242,10 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf, return err; } - register_and_cache(size, total_dsize, rank, data); + err = register_mem_with_smsc(rank, size, total_dsize, data, comm); + if (err != MPI_SUCCESS) { + return err; + } size_t chunk = count / size; size_t my_count_size = (rank == (size - 1)) ? (count / size) + count % size : count / size; @@ -246,7 +253,7 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf, if (sbuf != MPI_IN_PLACE) memcpy(tmp_rbuf, sbuf, my_count_size * dsize); } else { - ompi_3buff_op_reduce(op, (char *) data->xpmem_saddr[0] + chunk * rank * dsize, + ompi_3buff_op_reduce(op, (char *) data->smsc_saddr[0] + chunk * rank * dsize, (char *) tmp_sbuf + chunk * rank * dsize, (char *) tmp_rbuf + chunk * rank * dsize, my_count_size, dtype); } @@ -260,7 +267,7 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf, if (rank == i) { continue; } - ompi_op_reduce(op, (char *) data->xpmem_saddr[i] + chunk * rank * dsize, + ompi_op_reduce(op, (char *) data->smsc_saddr[i] + chunk * rank * dsize, (char *) tmp_rbuf + chunk * rank * dsize, my_count_size, dtype); } err = ompi_coll_base_barrier_intra_tree(comm, module); @@ -270,21 +277,23 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf, size_t tmp = chunk * dsize; for (int i = 0; i < size; i++) { - if (subc->xpmem_use_sr_buf && (rank == i)) { + if (subc->smsc_use_sr_buf && (rank == i)) { continue; } my_count_size = (i == (size - 1)) ? (count / size) + count % size : count / size; size_t tmp1 = i * tmp; char *dst = (char *) rbuf + tmp1; - char *src = (char *) data->xpmem_raddr[i] + tmp1; + char *src = (char *) data->smsc_raddr[i] + tmp1; memcpy(dst, src, my_count_size * dsize); } err = ompi_coll_base_barrier_intra_tree(comm, module); + // Note: neither unmap nor deregister will have any effect here, just having it for consistency + unmap_mem_with_smsc(rank, size, data); + return err; } -#endif void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp_size, int rank, int up) @@ -450,7 +459,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, ompi_datatype_type_size(dtype, &dsize); total_dsize = dsize * count; - /* Disable shm/xpmem based optimizations if: */ + /* Disable smsc/shm/xpmem based optimizations if: */ /* - datatype is not a predefined type */ /* - it's a gpu buffer */ uint64_t flags = 0; @@ -481,6 +490,10 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, coll_acoll_subcomms_t *subc = NULL; err = check_and_create_subc(comm, acoll_module, &subc); + if (MPI_SUCCESS != err) { + return err; + } + /* Fallback to knomial if subc is not obtained */ if (NULL == subc) { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm, @@ -518,42 +531,27 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, comm, module, 0); } } else if (total_dsize < 4194304) { -#ifdef HAVE_XPMEM_H - if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) { - return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc); + if (((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && (subc->without_smsc != 1) && is_opt) { + return mca_coll_acoll_allreduce_smsc_f(sbuf, rbuf, count, dtype, op, comm, module, subc); } else { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm, module); } -#else - return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, - comm, module); -#endif } else if (total_dsize <= 16777216) { -#ifdef HAVE_XPMEM_H - if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) { - mca_coll_acoll_reduce_xpmem_h(sbuf, rbuf, count, dtype, op, comm, module, subc); + if (((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && (subc->without_smsc != 1) && is_opt) { + mca_coll_acoll_reduce_smsc_h(sbuf, rbuf, count, dtype, op, comm, module, subc); return mca_coll_acoll_bcast(rbuf, count, dtype, 0, comm, module); } else { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm, module); } -#else - return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, - comm, module); -#endif } else { -#ifdef HAVE_XPMEM_H - if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) { - return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc); + if (((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && (subc->without_smsc != 1) && is_opt) { + return mca_coll_acoll_allreduce_smsc_f(sbuf, rbuf, count, dtype, op, comm, module, subc); } else { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm, module); } -#else - return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, - comm, module); -#endif } } else { diff --git a/ompi/mca/coll/acoll/coll_acoll_component.c b/ompi/mca/coll/acoll/coll_acoll_component.c index c493e66867c..d364058990c 100644 --- a/ompi/mca/coll/acoll/coll_acoll_component.c +++ b/ompi/mca/coll/acoll/coll_acoll_component.c @@ -26,6 +26,7 @@ const char *mca_coll_acoll_component_version_string */ int mca_coll_acoll_priority = 0; int mca_coll_acoll_max_comms = 10; +int mca_coll_acoll_comm_size_thresh = 16; int mca_coll_acoll_sg_size = 8; int mca_coll_acoll_sg_scale = 1; int mca_coll_acoll_node_size = 128; @@ -42,14 +43,13 @@ int mca_coll_acoll_allgather_lin = 0; int mca_coll_acoll_allgather_ring_1 = 0; int mca_coll_acoll_reserve_memory_for_algo = 0; uint64_t mca_coll_acoll_reserve_memory_size_for_algo = 128 * 32768; // 4 MB -uint64_t mca_coll_acoll_xpmem_buffer_size = 128 * 32768; +uint64_t mca_coll_acoll_smsc_buffer_size = 128 * 32768; int mca_coll_acoll_alltoall_split_factor = 0; size_t mca_coll_acoll_alltoall_psplit_msg_thres = 0; -size_t mca_coll_acoll_alltoall_xpmem_msg_thres = 0; -/* By default utilize xpmem based algorithms applicable when built with xpmem. */ -int mca_coll_acoll_without_xpmem = 0; -int mca_coll_acoll_xpmem_use_sr_buf = 1; +/* By default utilize smsc based algorithms applicable when built with smsc. */ +int mca_coll_acoll_without_smsc = 0; +int mca_coll_acoll_smsc_use_sr_buf = 1; /* Default barrier algorithm - hierarchical algorithm using shared memory */ /* ToDo: check how this works with inter-node*/ int mca_coll_acoll_barrier_algo = 0; @@ -102,6 +102,12 @@ static int acoll_register(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_priority); /* Defaults on topology */ + (void) + mca_base_component_var_register(&mca_coll_acoll_component.collm_version, "comm_size_thresh", + "Disable acoll below this communicator size threshold", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_comm_size_thresh); + (void) mca_base_component_var_register(&mca_coll_acoll_component.collm_version, "max_comms", "Maximum no. of communicators using subgroup based algorithms", @@ -192,25 +198,25 @@ static int acoll_register(void) MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_barrier_algo); (void) mca_base_component_var_register( - &mca_coll_acoll_component.collm_version, "without_xpmem", - "By default, xpmem-based algorithms are used when applicable. " - "When this flag is set to 1, xpmem-based algorithms are disabled.", + &mca_coll_acoll_component.collm_version, "without_smsc", + "By default, smsc (xpmem)-based algorithms are used when applicable. " + "When this flag is set to 1, smsc-based algorithms are disabled.", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, - &mca_coll_acoll_without_xpmem); + &mca_coll_acoll_without_smsc); (void) mca_base_component_var_register( - &mca_coll_acoll_component.collm_version, "xpmem_buffer_size", + &mca_coll_acoll_component.collm_version, "smsc_buffer_size", "Maximum size of memory that can be used for temporary buffers for " - "xpmem-based algorithms. By default these buffers are not created or " - "used unless xpmem_use_sr_buf is set to 0.", + "smsc-based algorithms. By default these buffers are not created or " + "used unless smsc_use_sr_buf is set to 0.", MCA_BASE_VAR_TYPE_UINT64_T, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, - &mca_coll_acoll_xpmem_buffer_size); + &mca_coll_acoll_smsc_buffer_size); (void) mca_base_component_var_register( - &mca_coll_acoll_component.collm_version, "xpmem_use_sr_buf", - "Uses application provided send/recv buffers during xpmem registration " + &mca_coll_acoll_component.collm_version, "smsc_use_sr_buf", + "Uses application provided send/recv buffers during smsc registration " "when set to 1 instead of temporary buffers. The send/recv buffers are " "assumed to persist for the duration of the application.", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, - &mca_coll_acoll_xpmem_use_sr_buf); + &mca_coll_acoll_smsc_use_sr_buf); (void) mca_base_component_var_register( &mca_coll_acoll_component.collm_version, "alltoall_split_factor", "Split factor value to be used in alltoall parallel split algorithm," @@ -223,12 +229,6 @@ static int acoll_register(void) "should not be used.", MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_alltoall_psplit_msg_thres); - (void) mca_base_component_var_register( - &mca_coll_acoll_component.collm_version, "alltoall_xpmem_msg_thresh", - "Message threshold above which xpmem based linear alltoall algorithm " - "should be used for intra node cases.", - MCA_BASE_VAR_TYPE_SIZE_T, NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, - &mca_coll_acoll_alltoall_xpmem_msg_thres); return OMPI_SUCCESS; } @@ -275,12 +275,6 @@ static void mca_coll_acoll_module_construct(mca_coll_acoll_module_t *module) (module->alltoall_attr).psplit_msg_thresh = mca_coll_acoll_alltoall_psplit_msg_thres; } - - (module->alltoall_attr).xpmem_msg_thresh = 0; - if (0 < mca_coll_acoll_alltoall_xpmem_msg_thres) { - (module->alltoall_attr).xpmem_msg_thresh = - mca_coll_acoll_alltoall_xpmem_msg_thres; - } } /* @@ -301,51 +295,20 @@ static void mca_coll_acoll_module_destruct(mca_coll_acoll_module_t *module) } coll_acoll_data_t *data = subc->data; if (NULL != data) { -#ifdef HAVE_XPMEM_H - for (int j = 0; j < data->comm_size; j++) { - if (ompi_comm_rank(subc->orig_comm) == j) { - continue; - } - // Dereg all rcache regs. - uint64_t key = 0; - uint64_t value = 0; - uint64_t zero_value = 0; - OPAL_HASH_TABLE_FOREACH(key,uint64,value,(data->xpmem_reg_tracker_ht[j])) { - mca_rcache_base_registration_t* reg = - (mca_rcache_base_registration_t*) key; - - for (uint64_t d_i = 0; d_i < value; ++d_i) { - (data->rcache[j])->rcache_deregister(data->rcache[j], reg); - } - opal_hash_table_set_value_uint64(data->xpmem_reg_tracker_ht[j], - key, (void*)(zero_value)); - } - xpmem_release(data->all_apid[j]); - mca_rcache_base_module_destroy(data->rcache[j]); - opal_hash_table_remove_all(data->xpmem_reg_tracker_ht[j]); - OBJ_RELEASE(data->xpmem_reg_tracker_ht[j]); - } - xpmem_remove(data->allseg_id[ompi_comm_rank(subc->orig_comm)]); - - free(data->allseg_id); - data->allseg_id = NULL; - free(data->all_apid); - data->all_apid = NULL; + free(data->smsc_info.sreg); + data->smsc_info.sreg = NULL; + free(data->smsc_info.rreg); + data->smsc_info.rreg = NULL; + free(data->smsc_saddr); + data->smsc_saddr = NULL; + free(data->smsc_raddr); + data->smsc_raddr = NULL; free(data->allshm_sbuf); data->allshm_sbuf = NULL; free(data->allshm_rbuf); data->allshm_rbuf = NULL; - free(data->xpmem_saddr); - data->xpmem_saddr = NULL; - free(data->xpmem_raddr); - data->xpmem_raddr = NULL; free(data->scratch); data->scratch = NULL; - free(data->xpmem_reg_tracker_ht); - data->xpmem_reg_tracker_ht = NULL; - free(data->rcache); - data->rcache = NULL; -#endif free(data->allshmseg_id); data->allshmseg_id = NULL; free(data->allshmmmap_sbuf); @@ -418,7 +381,6 @@ static void mca_coll_acoll_module_destruct(mca_coll_acoll_module_t *module) (module->alltoall_attr).split_factor = 0; (module->alltoall_attr).psplit_msg_thresh = 0; - (module->alltoall_attr).xpmem_msg_thresh = 0; } OBJ_CLASS_INSTANCE(mca_coll_acoll_module_t, mca_coll_base_module_t, mca_coll_acoll_module_construct, diff --git a/ompi/mca/coll/acoll/coll_acoll_module.c b/ompi/mca/coll/acoll/coll_acoll_module.c index 45c1890a4f3..8d54830085d 100644 --- a/ompi/mca/coll/acoll/coll_acoll_module.c +++ b/ompi/mca/coll/acoll/coll_acoll_module.c @@ -64,7 +64,7 @@ mca_coll_base_module_t *mca_coll_acoll_comm_query(struct ompi_communicator_t *co *priority = 0; return NULL; } - if (OMPI_COMM_IS_INTRA(comm) && ompi_comm_size(comm) < 2) { + if (ompi_comm_size(comm) < mca_coll_acoll_comm_size_thresh) { *priority = 0; return NULL; } @@ -132,6 +132,17 @@ mca_coll_base_module_t *mca_coll_acoll_comm_query(struct ompi_communicator_t *co break; } + // Check SMSC availability (currently only for XPMEM) + if (!mca_smsc_base_has_feature(MCA_SMSC_FEATURE_CAN_MAP)) { + opal_output_verbose(MCA_BASE_VERBOSE_ERROR, ompi_coll_base_framework.framework_output, + "coll:acoll: Error: SMSC's MAP feature is not available. " + "SMSC will be disabled for this communicator irrespective of " + "the mca parameters."); + acoll_module->has_smsc = 0; + } else { + acoll_module->has_smsc = 1; + } + acoll_module->force_numa = mca_coll_acoll_force_numa; acoll_module->use_dyn_rules = mca_coll_acoll_use_dynamic_rules; acoll_module->disable_shmbcast = mca_coll_acoll_disable_shmbcast; diff --git a/ompi/mca/coll/acoll/coll_acoll_reduce.c b/ompi/mca/coll/acoll/coll_acoll_reduce.c index c79db9525a1..7913af1678d 100644 --- a/ompi/mca/coll/acoll/coll_acoll_reduce.c +++ b/ompi/mca/coll/acoll/coll_acoll_reduce.c @@ -153,8 +153,7 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co return ret; } -#ifdef HAVE_XPMEM_H -static inline int mca_coll_acoll_reduce_xpmem(const void *sbuf, void *rbuf, size_t count, +static inline int mca_coll_acoll_reduce_smsc(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, struct ompi_communicator_t *comm, mca_coll_base_module_t *module, @@ -168,7 +167,7 @@ static inline int mca_coll_acoll_reduce_xpmem(const void *sbuf, void *rbuf, size coll_acoll_init(module, comm, subc->data, subc, 0); coll_acoll_reserve_mem_t *reserve_mem_rbuf_reduce = NULL; - if (subc->xpmem_use_sr_buf != 0) { + if (subc->smsc_use_sr_buf != 0) { reserve_mem_rbuf_reduce = &(acoll_module->reserve_mem_s); } coll_acoll_data_t *data = subc->data; @@ -192,9 +191,9 @@ static inline int mca_coll_acoll_reduce_xpmem(const void *sbuf, void *rbuf, size char *tmp_sbuf = NULL; char *tmp_rbuf = NULL; - if (0 == subc->xpmem_use_sr_buf) { + if (0 == subc->smsc_use_sr_buf) { tmp_rbuf = (char *) data->scratch; - tmp_sbuf = (char *) data->scratch + (subc->xpmem_buf_size) / 2; + tmp_sbuf = (char *) data->scratch + (subc->smsc_buf_size) / 2; if ((MPI_IN_PLACE == sbuf) && (rank == root)) { memcpy(tmp_sbuf, rbuf, total_dsize); } else { @@ -234,7 +233,10 @@ static inline int mca_coll_acoll_reduce_xpmem(const void *sbuf, void *rbuf, size return ret; } - register_and_cache(size, total_dsize, rank, data); + ret = register_mem_with_smsc(rank, size, total_dsize, data, comm); + if (ret != MPI_SUCCESS) { + return ret; + } /* reduce to the group leader */ size_t chunk = count / l1_gp_size; @@ -244,21 +246,21 @@ static inline int mca_coll_acoll_reduce_xpmem(const void *sbuf, void *rbuf, size if (sbuf != MPI_IN_PLACE) memcpy(tmp_rbuf, sbuf, my_count_size * dsize); for (int i = 1; i < l1_gp_size; i++) { - ompi_op_reduce(op, (char *) data->xpmem_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize, + ompi_op_reduce(op, (char *) data->smsc_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize, (char *) tmp_rbuf + chunk * l1_local_rank * dsize, my_count_size, dtype); } } else { ompi_3buff_op_reduce(op, - (char *) data->xpmem_saddr[l1_gp[0]] + chunk * l1_local_rank * dsize, + (char *) data->smsc_saddr[l1_gp[0]] + chunk * l1_local_rank * dsize, (char *) tmp_sbuf + chunk * l1_local_rank * dsize, - (char *) data->xpmem_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize, + (char *) data->smsc_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize, my_count_size, dtype); for (int i = 1; i < l1_gp_size; i++) { if (i == l1_local_rank) { continue; } - ompi_op_reduce(op, (char *) data->xpmem_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize, - (char *) data->xpmem_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize, + ompi_op_reduce(op, (char *) data->smsc_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize, + (char *) data->smsc_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize, my_count_size, dtype); } } @@ -272,7 +274,7 @@ static inline int mca_coll_acoll_reduce_xpmem(const void *sbuf, void *rbuf, size if (0 == l2_local_rank) { for (int i = 1; i < local_size; i++) { - ompi_op_reduce(op, (char *) data->xpmem_raddr[l2_gp[i]], (char *) tmp_rbuf, + ompi_op_reduce(op, (char *) data->smsc_raddr[l2_gp[i]], (char *) tmp_rbuf, my_count_size, dtype); } } else { @@ -281,29 +283,28 @@ static inline int mca_coll_acoll_reduce_xpmem(const void *sbuf, void *rbuf, size continue; } ompi_op_reduce(op, - (char *) data->xpmem_raddr[l2_gp[i]] + chunk * l2_local_rank * dsize, - (char *) data->xpmem_raddr[0] + chunk * l2_local_rank * dsize, + (char *) data->smsc_raddr[l2_gp[i]] + chunk * l2_local_rank * dsize, + (char *) data->smsc_raddr[0] + chunk * l2_local_rank * dsize, my_count_size, dtype); } ompi_op_reduce(op, (char *) tmp_rbuf + chunk * l2_local_rank * dsize, - (char *) data->xpmem_raddr[0] + chunk * l2_local_rank * dsize, + (char *) data->smsc_raddr[0] + chunk * l2_local_rank * dsize, my_count_size, dtype); } } ompi_coll_base_barrier_intra_tree(comm, module); - if (0 == subc->xpmem_use_sr_buf) { + if (0 == subc->smsc_use_sr_buf) { if (rank == root) { memcpy(rbuf, tmp_rbuf, total_dsize); } } else { - if ((rank != root) && (subc->xpmem_use_sr_buf != 0)) { + if ((rank != root) && (subc->smsc_use_sr_buf != 0)) { coll_acoll_buf_free(reserve_mem_rbuf_reduce, tmp_rbuf); } } - + unmap_mem_with_smsc(rank, size, data); return MPI_SUCCESS; } -#endif int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, @@ -383,22 +384,17 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, root, comm, module, 0, 0); } } else { -#ifdef HAVE_XPMEM_H - if ((((subc->xpmem_use_sr_buf != 0) + if ((((subc->smsc_use_sr_buf != 0) && (acoll_module->reserve_mem_s).reserve_mem_allocate && ((acoll_module->reserve_mem_s).reserve_mem_size >= total_dsize)) - || ((0 == subc->xpmem_use_sr_buf) && (subc->xpmem_buf_size > 2 * total_dsize))) - && (subc->without_xpmem != 1) && is_opt) { - return mca_coll_acoll_reduce_xpmem(sbuf, rbuf, count, dtype, op, root, comm, + || ((0 == subc->smsc_use_sr_buf) && (subc->smsc_buf_size > 2 * total_dsize))) + && (subc->without_smsc != 1) && is_opt) { + return mca_coll_acoll_reduce_smsc(sbuf, rbuf, count, dtype, op, root, comm, module, subc); } else { return ompi_coll_base_reduce_intra_binomial(sbuf, rbuf, count, dtype, op, root, comm, module, 0, 0); } -#else - return ompi_coll_base_reduce_intra_binomial(sbuf, rbuf, count, dtype, op, root, - comm, module, 0, 0); -#endif } } else { return ompi_coll_base_reduce_intra_binomial(sbuf, rbuf, count, dtype, op, root, comm, diff --git a/ompi/mca/coll/acoll/coll_acoll_utils.h b/ompi/mca/coll/acoll/coll_acoll_utils.h index 96855a71947..5707165c0a1 100644 --- a/ompi/mca/coll/acoll/coll_acoll_utils.h +++ b/ompi/mca/coll/acoll/coll_acoll_utils.h @@ -14,11 +14,8 @@ #include "ompi/communicator/communicator.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "opal/include/opal/align.h" - -#ifdef HAVE_XPMEM_H #include "opal/mca/rcache/base/base.h" -#include -#endif + /* shared memory structure */ @@ -30,9 +27,9 @@ #define LEADER_SHM_SIZE 16384 #define PER_RANK_SHM_SIZE 8192 -extern uint64_t mca_coll_acoll_xpmem_buffer_size; -extern int mca_coll_acoll_without_xpmem; -extern int mca_coll_acoll_xpmem_use_sr_buf; +extern uint64_t mca_coll_acoll_smsc_buffer_size; +extern int mca_coll_acoll_without_smsc; +extern int mca_coll_acoll_smsc_use_sr_buf; extern int mca_coll_acoll_barrier_algo; @@ -169,11 +166,16 @@ static inline int check_and_create_subc(ompi_communicator_t *comm, subc->initialized_shm_data = false; subc->data = NULL; subc->barrier_algo = mca_coll_acoll_barrier_algo; -#ifdef HAVE_XPMEM_H - subc->xpmem_buf_size = mca_coll_acoll_xpmem_buffer_size; - subc->without_xpmem = mca_coll_acoll_without_xpmem; - subc->xpmem_use_sr_buf = mca_coll_acoll_xpmem_use_sr_buf; -#endif + + if (acoll_module->has_smsc) { + subc->smsc_buf_size = mca_coll_acoll_smsc_buffer_size; + subc->smsc_use_sr_buf = mca_coll_acoll_smsc_use_sr_buf; + subc->without_smsc = mca_coll_acoll_without_smsc; + } else { + subc->smsc_buf_size = 0; + subc->smsc_use_sr_buf = 0; + subc->without_smsc = 1; + } return MPI_SUCCESS; } @@ -229,8 +231,6 @@ static inline int comm_grp_ranks_local(ompi_communicator_t *comm, ompi_communica } } - err = ompi_group_free(&grp); - err = ompi_group_free(&local_grp); free(ranks); free(local_ranks); @@ -755,36 +755,11 @@ static inline int mca_coll_acoll_comm_split_init(ompi_communicator_t *comm, return err; } -#ifdef HAVE_XPMEM_H -static inline int mca_coll_acoll_xpmem_register(void *xpmem_apid, void *base, size_t size, - mca_rcache_base_registration_t *reg) -{ - struct xpmem_addr xpmem_addr; - xpmem_addr.apid = *((xpmem_apid_t *) xpmem_apid); - xpmem_addr.offset = (uintptr_t) base; - struct acoll_xpmem_rcache_reg_t *xpmem_reg = (struct acoll_xpmem_rcache_reg_t *) reg; - xpmem_reg->xpmem_vaddr = xpmem_attach(xpmem_addr, size, NULL); - - if ((void *) -1 == xpmem_reg->xpmem_vaddr) { - return -1; - } - return 0; -} - -static inline int mca_coll_acoll_xpmem_deregister(void *xpmem_apid, - mca_rcache_base_registration_t *reg) -{ - int status = xpmem_detach(((struct acoll_xpmem_rcache_reg_t *) reg)->xpmem_vaddr); - return status; -} -#endif - static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communicator_t *comm, coll_acoll_data_t *data, coll_acoll_subcomms_t *subc, int root) { int size, ret = 0, rank, line; - int cid = ompi_comm_get_local_cid(comm); if (subc->initialized_data) { return ret; } @@ -794,13 +769,47 @@ static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communica ret = OMPI_ERR_OUT_OF_RESOURCE; goto error_hndl; } + + // initialize data + data->allshm_rbuf = NULL; + data->allshm_sbuf = NULL; + data->allshmmmap_sbuf = NULL; + data->scratch = NULL; + data->smsc_info.sreg = NULL; + data->smsc_info.rreg = NULL; + data->smsc_info.ep = NULL; + data->smsc_saddr = NULL; + data->smsc_raddr = NULL; + data->l1_gp = NULL; + data->l2_gp = NULL; + data->allshmseg_id = NULL; + + size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); data->comm_size = size; -#ifdef HAVE_XPMEM_H - if (0 == subc->xpmem_use_sr_buf) { - data->scratch = (char *) malloc(subc->xpmem_buf_size); + data->smsc_info.sreg = (void **) malloc(sizeof(void *) * size); + data->smsc_info.rreg = (void **) malloc(sizeof(void *) * size); + if (NULL == data->smsc_info.sreg || NULL == data->smsc_info.rreg) { + line = __LINE__; + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto error_hndl; + } + data->smsc_info.ep = (mca_smsc_endpoint_t **) malloc(sizeof(mca_smsc_endpoint_t *) * size); + if (NULL == data->smsc_info.ep) { + line = __LINE__; + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto error_hndl; + } + for (int i = 0 ; i < size; i++) { + data->smsc_info.ep[i] = NULL; + data->smsc_info.sreg[i] = NULL; + data->smsc_info.rreg[i] = NULL; + } + + if (0 == subc->smsc_use_sr_buf) { + data->scratch = (char *) malloc(subc->smsc_buf_size); if (NULL == data->scratch) { line = __LINE__; ret = OMPI_ERR_OUT_OF_RESOURCE; @@ -810,19 +819,6 @@ static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communica data->scratch = NULL; } - xpmem_segid_t seg_id; - data->allseg_id = (xpmem_segid_t *) malloc(sizeof(xpmem_segid_t) * size); - if (NULL == data->allseg_id) { - line = __LINE__; - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto error_hndl; - } - data->all_apid = (xpmem_apid_t *) malloc(sizeof(xpmem_apid_t) * size); - if (NULL == data->all_apid) { - line = __LINE__; - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto error_hndl; - } data->allshm_sbuf = (void **) malloc(sizeof(void *) * size); if (NULL == data->allshm_sbuf) { line = __LINE__; @@ -835,86 +831,38 @@ static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communica ret = OMPI_ERR_OUT_OF_RESOURCE; goto error_hndl; } - data->xpmem_saddr = (void **) malloc(sizeof(void *) * size); - if (NULL == data->xpmem_saddr) { - line = __LINE__; - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto error_hndl; - } - data->xpmem_raddr = (void **) malloc(sizeof(void *) * size); - if (NULL == data->xpmem_raddr) { - line = __LINE__; - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto error_hndl; - } - data->rcache = (mca_rcache_base_module_t **) malloc(sizeof(mca_rcache_base_module_t *) * size); - if (NULL == data->rcache) { + + data->smsc_saddr = (void **) malloc(sizeof(void *) * size); + if (NULL == data->smsc_saddr) { line = __LINE__; ret = OMPI_ERR_OUT_OF_RESOURCE; goto error_hndl; } - data->xpmem_reg_tracker_ht = NULL; - data->xpmem_reg_tracker_ht = (opal_hash_table_t **) malloc(sizeof(opal_hash_table_t*) * size); - if (NULL == data->xpmem_reg_tracker_ht) { + data->smsc_raddr = (void **) malloc(sizeof(void *) * size); + if (NULL == data->smsc_raddr) { line = __LINE__; ret = OMPI_ERR_OUT_OF_RESOURCE; goto error_hndl; } - seg_id = xpmem_make(0, XPMEM_MAXADDR_SIZE, XPMEM_PERMIT_MODE, (void *) 0666); - if (-1 == seg_id) { - line = __LINE__; - ret = -1; - goto error_hndl; - } - - ret = comm->c_coll->coll_allgather(&seg_id, sizeof(xpmem_segid_t), MPI_BYTE, data->allseg_id, - sizeof(xpmem_segid_t), MPI_BYTE, comm, - comm->c_coll->coll_allgather_module); - - /* Assuming the length of rcache name is less than 50 characters */ - char rc_name[50]; - for (int i = 0; i < size; i++) { - if (rank != i) { - data->all_apid[i] = xpmem_get(data->allseg_id[i], XPMEM_RDWR, XPMEM_PERMIT_MODE, - (void *) 0666); - if (-1 == data->all_apid[i]) { - line = __LINE__; - ret = -1; - goto error_hndl; - } - if (-1 == data->all_apid[i]) { - line = __LINE__; - ret = -1; - goto error_hndl; - } - sprintf(rc_name, "acoll_%d_%d_%d", cid, rank, i); - mca_rcache_base_resources_t rcache_element - = {.cache_name = rc_name, - .reg_data = &data->all_apid[i], - .sizeof_reg = sizeof(struct acoll_xpmem_rcache_reg_t), - .register_mem = mca_coll_acoll_xpmem_register, - .deregister_mem = mca_coll_acoll_xpmem_deregister}; - - data->rcache[i] = mca_rcache_base_module_create("grdma", NULL, &rcache_element); - if (NULL == data->rcache[i]) { - ret = -1; - line = __LINE__; - goto error_hndl; - } - data->xpmem_reg_tracker_ht[i] = OBJ_NEW(opal_hash_table_t); - opal_hash_table_init(data->xpmem_reg_tracker_ht[i], 2048); - } - } -#endif - /* temporary variables */ int tmp1, tmp2, tmp3 = root; comm_grp_ranks_local(comm, subc->numa_comm, &tmp1, &tmp2, &data->l1_gp, tmp3); + if (NULL == data->l1_gp) { + line = __LINE__; + ret = OMPI_ERROR; + goto error_hndl; + } data->l1_gp_size = ompi_comm_size(subc->numa_comm); data->l1_local_rank = ompi_comm_rank(subc->numa_comm); comm_grp_ranks_local(comm, subc->numa_comm_ldrs, &tmp1, &tmp2, &data->l2_gp, tmp3); + if (NULL == data->l2_gp) { + line = __LINE__; + ret = OMPI_ERROR; + goto error_hndl; + } + data->l2_gp_size = ompi_comm_size(subc->numa_comm_ldrs); data->l2_local_rank = ompi_comm_rank(subc->numa_comm_ldrs); data->offset[0] = LEADER_SHM_SIZE; @@ -923,6 +871,13 @@ static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communica data->offset[3] = data->offset[2] + rank * PER_RANK_SHM_SIZE; data->allshmseg_id = (opal_shmem_ds_t *) malloc(sizeof(opal_shmem_ds_t) * size); data->allshmmmap_sbuf = (void **) malloc(sizeof(void *) * size); + + if (NULL == data->allshmseg_id || NULL == data->allshmmmap_sbuf) { + line = __LINE__; + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto error_hndl; + } + data->sync[0] = 0; data->sync[1] = 0; char *shfn; @@ -1001,26 +956,22 @@ static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communica error_hndl: (void) line; if (NULL != data) { -#ifdef HAVE_XPMEM_H - free(data->allseg_id); - data->allseg_id = NULL; - free(data->all_apid); - data->all_apid = NULL; free(data->allshm_sbuf); data->allshm_sbuf = NULL; free(data->allshm_rbuf); data->allshm_rbuf = NULL; - free(data->xpmem_saddr); - data->xpmem_saddr = NULL; - free(data->xpmem_raddr); - data->xpmem_raddr = NULL; - free(data->xpmem_reg_tracker_ht); - data->xpmem_reg_tracker_ht = NULL; - free(data->rcache); - data->rcache = NULL; + free(data->smsc_saddr); + data->smsc_saddr = NULL; + free(data->smsc_raddr); + data->smsc_raddr = NULL; free(data->scratch); data->scratch = NULL; -#endif + free(data->smsc_info.ep); + data->smsc_info.ep = NULL; + free(data->smsc_info.sreg); + data->smsc_info.sreg = NULL; + free(data->smsc_info.rreg); + data->smsc_info.rreg = NULL; free(data->allshmseg_id); data->allshmseg_id = NULL; free(data->allshmmmap_sbuf); @@ -1035,70 +986,44 @@ static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communica return ret; } -#ifdef HAVE_XPMEM_H -static inline void update_rcache_reg_hashtable_entry - (struct acoll_xpmem_rcache_reg_t *reg, - opal_hash_table_t* ht) -{ - // Converting pointer to uint64 to use as key. - uint64_t key = (uint64_t)reg; - // Converting uint64_t to pointer type to use for value. - uint64_t value = 1; - int ht_ret = opal_hash_table_get_value_uint64(ht, key, (void**)(&value)); - - if (OPAL_ERR_NOT_FOUND == ht_ret) { - value = 1; - opal_hash_table_set_value_uint64(ht, key, (void*)(value)); - } else if (OPAL_SUCCESS == ht_ret) { - value += 1; - opal_hash_table_set_value_uint64(ht, key, (void*)(value)); +static inline int register_mem_with_smsc(int rank, int size, size_t total_dsize, coll_acoll_data_t *data, struct ompi_communicator_t *comm) { + ompi_proc_t *proc = NULL; + mca_smsc_endpoint_t *smsc_ep = NULL; + if (NULL == data->smsc_info.ep) { + return MPI_ERR_OTHER; } -} -static inline void register_and_cache(int size, size_t total_dsize, int rank, - coll_acoll_data_t *data) -{ - uintptr_t base, bound; for (int i = 0; i < size; i++) { if (rank != i) { - mca_rcache_base_module_t *rcache_i = data->rcache[i]; - int access_flags = 0; - struct acoll_xpmem_rcache_reg_t *sbuf_reg = NULL, *rbuf_reg = NULL; - base = OPAL_DOWN_ALIGN((uintptr_t) data->allshm_sbuf[i], 4096, uintptr_t); - bound = OPAL_ALIGN((uintptr_t) data->allshm_sbuf[i] + total_dsize, 4096, uintptr_t); - int ret = rcache_i->rcache_register(rcache_i, (void *) base, bound - base, access_flags, - MCA_RCACHE_ACCESS_ANY, - (mca_rcache_base_registration_t **) &sbuf_reg); - - if (ret != 0) { - sbuf_reg = NULL; - return; + if (NULL == data->smsc_info.ep[i]) { + proc = ompi_comm_peer_lookup(comm, i); + data->smsc_info.ep[i] = MCA_SMSC_CALL(get_endpoint, &proc->super); } - update_rcache_reg_hashtable_entry(sbuf_reg, data->xpmem_reg_tracker_ht[i]); - - data->xpmem_saddr[i] = (void *) ((uintptr_t) sbuf_reg->xpmem_vaddr - + ((uintptr_t) data->allshm_sbuf[i] - - (uintptr_t) sbuf_reg->base.base)); - - base = OPAL_DOWN_ALIGN((uintptr_t) data->allshm_rbuf[i], 4096, uintptr_t); - bound = OPAL_ALIGN((uintptr_t) data->allshm_rbuf[i] + total_dsize, 4096, uintptr_t); - ret = rcache_i->rcache_register(rcache_i, (void *) base, bound - base, access_flags, - MCA_RCACHE_ACCESS_ANY, - (mca_rcache_base_registration_t **) &rbuf_reg); - - if (ret != 0) { - rbuf_reg = NULL; - return; + if (NULL == data->smsc_info.ep[i]) { + opal_output_verbose(MCA_BASE_VERBOSE_ERROR, ompi_coll_base_framework.framework_output, + "coll:acoll: SMSC endpoint not available for processes.\n"); + return MPI_ERR_OTHER; } - update_rcache_reg_hashtable_entry(rbuf_reg, data->xpmem_reg_tracker_ht[i]); + smsc_ep = data->smsc_info.ep[i]; + data->smsc_info.sreg[i] = MCA_SMSC_CALL(map_peer_region, smsc_ep, + MCA_RCACHE_FLAGS_PERSIST, data->allshm_sbuf[i], total_dsize, &data->smsc_saddr[i]); + data->smsc_info.rreg[i] = MCA_SMSC_CALL(map_peer_region, smsc_ep, + MCA_RCACHE_FLAGS_PERSIST, data->allshm_rbuf[i], total_dsize, &data->smsc_raddr[i]); - data->xpmem_raddr[i] = (void *) ((uintptr_t) rbuf_reg->xpmem_vaddr - + ((uintptr_t) data->allshm_rbuf[i] - - (uintptr_t) rbuf_reg->base.base)); } else { - data->xpmem_saddr[i] = data->allshm_sbuf[i]; - data->xpmem_raddr[i] = data->allshm_rbuf[i]; + data->smsc_saddr[i] = data->allshm_sbuf[i]; + data->smsc_raddr[i] = data->allshm_rbuf[i]; + } + } + return MPI_SUCCESS; +} + +static inline void unmap_mem_with_smsc(int rank, int size, coll_acoll_data_t *data) +{ + for (int i = 0; i < size; i++) { + if (rank != i && NULL != data->smsc_info.sreg[i] && NULL != data->smsc_info.rreg[i]) { + MCA_SMSC_CALL(unmap_peer_region, data->smsc_info.sreg[i]); + MCA_SMSC_CALL(unmap_peer_region, data->smsc_info.rreg[i]); } } } -#endif