diff --git a/ompi/mca/coll/sync/Makefile.am b/ompi/mca/coll/sync/Makefile.am index 61c2437e96e..8283ebbfd8a 100644 --- a/ompi/mca/coll/sync/Makefile.am +++ b/ompi/mca/coll/sync/Makefile.am @@ -32,7 +32,9 @@ sources = \ coll_sync_reduce_scatter.c \ coll_sync_scan.c \ coll_sync_scatter.c \ - coll_sync_scatterv.c + coll_sync_scatterv.c \ + coll_sync_alltoallv.c \ + coll_sync_alltoallw.c if MCA_BUILD_ompi_coll_sync_DSO component_noinst = diff --git a/ompi/mca/coll/sync/coll_sync.h b/ompi/mca/coll/sync/coll_sync.h index 081eda721b2..4c7333f1a6d 100644 --- a/ompi/mca/coll/sync/coll_sync.h +++ b/ompi/mca/coll/sync/coll_sync.h @@ -46,6 +46,20 @@ mca_coll_base_module_t int mca_coll_sync_module_enable(mca_coll_base_module_t *module, struct ompi_communicator_t *comm); +int mca_coll_sync_alltoallv(const void *sbuf, const int *scounts, const int *sdisps, + struct ompi_datatype_t *sdtype, + void *rbuf, const int *rcounts, const int *rdisps, + struct ompi_datatype_t *rdtype, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module); + +int mca_coll_sync_alltoallw(const void *sbuf, const int *scounts, const int *sdisps, + struct ompi_datatype_t * const *sdtypes, + void *rbuf, const int *rcounts, const int *rdisps, + struct ompi_datatype_t * const *rdtypes, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module); + int mca_coll_sync_barrier(struct ompi_communicator_t *comm, mca_coll_base_module_t *module); @@ -130,6 +144,14 @@ typedef struct mca_coll_sync_module_t { /* How many ops we've executed (it's easier to have 2) */ int after_num_operations; + /* How many ops we've executed */ + int before_num_operations_alltoallv; + int before_num_operations_alltoallw; + + /* How many ops we've executed (it's easier to have 2) */ + int after_num_operations_alltoallv; + int after_num_operations_alltoallw; + /* Avoid recursion of syncs */ bool in_operation; } mca_coll_sync_module_t; @@ -146,9 +168,14 @@ typedef struct mca_coll_sync_component_t { /* Do a sync *before* each Nth collective */ int barrier_before_nops; + int barrier_before_nops_alltoallv; + int barrier_before_nops_alltoallw; /* Do a sync *after* each Nth collective */ int barrier_after_nops; + int barrier_after_nops_alltoallv; + int barrier_after_nops_alltoallw; + } mca_coll_sync_component_t; /* Globally exported variables */ diff --git a/ompi/mca/coll/sync/coll_sync_alltoallv.c b/ompi/mca/coll/sync/coll_sync_alltoallv.c new file mode 100644 index 00000000000..b1e5f67bcf5 --- /dev/null +++ b/ompi/mca/coll/sync/coll_sync_alltoallv.c @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "coll_sync.h" + + +/* + * gatherv + * + * Function: - gatherv + * Accepts: - same arguments as MPI_Alltoallv() + * Returns: - MPI_SUCCESS or error code + */ +int mca_coll_sync_alltoallv(const void *sbuf, const int *scounts, const int *sdisps, + struct ompi_datatype_t *sdtype, + void *rbuf, const int *rcounts, const int *rdisps, + struct ompi_datatype_t *rdtype, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + mca_coll_sync_module_t *s = (mca_coll_sync_module_t*) module; + + if (s->in_operation) { + return s->c_coll.coll_alltoallv(sbuf, scounts, sdisps, sdtype, + rbuf, rcounts, rdisps, rdtype, comm, + s->c_coll.coll_alltoallv_module); + } else { + do { + int err = MPI_SUCCESS; + s->in_operation = true; + if (OPAL_UNLIKELY(++(s->before_num_operations_alltoallv) == + mca_coll_sync_component.barrier_before_nops_alltoallv || + ++(s->before_num_operations) == + mca_coll_sync_component.barrier_before_nops)) { + s->before_num_operations = 0; + s->before_num_operations_alltoallv = 0; + err = s->c_coll.coll_barrier(comm, s->c_coll.coll_barrier_module); + } + if (OPAL_LIKELY(MPI_SUCCESS == err)) + err = s->c_coll.coll_alltoallv(sbuf, scounts, sdisps, sdtype, + rbuf, rcounts, rdisps, rdtype, comm, + s->c_coll.coll_alltoallv_module); + if (OPAL_UNLIKELY(++(s->after_num_operations_alltoallv) == + mca_coll_sync_component.barrier_after_nops_alltoallv || + ++(s->after_num_operations) == + mca_coll_sync_component.barrier_after_nops)) { + s->after_num_operations = 0; + s->after_num_operations_alltoallv = 0; + err = s->c_coll.coll_barrier(comm, s->c_coll.coll_barrier_module); + } + s->in_operation = false; + return err; + } while(0); + } +} diff --git a/ompi/mca/coll/sync/coll_sync_alltoallw.c b/ompi/mca/coll/sync/coll_sync_alltoallw.c new file mode 100644 index 00000000000..36cb25b7ecd --- /dev/null +++ b/ompi/mca/coll/sync/coll_sync_alltoallw.c @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "coll_sync.h" + + +/* + * gatherv + * + * Function: - gatherv + * Accepts: - same arguments as MPI_Alltoallv() + * Returns: - MPI_SUCCESS or error code + */ +int mca_coll_sync_alltoallw(const void *sbuf, const int *scounts, const int *sdisps, + struct ompi_datatype_t * const *sdtypes, + void *rbuf, const int *rcounts, const int *rdisps, + struct ompi_datatype_t * const *rdtypes, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + mca_coll_sync_module_t *s = (mca_coll_sync_module_t*) module; + + if (s->in_operation) { + return s->c_coll.coll_alltoallw(sbuf, scounts, sdisps, sdtypes, + rbuf, rcounts, rdisps, rdtypes, comm, + s->c_coll.coll_alltoallw_module); + } else { + do { + int err = MPI_SUCCESS; + s->in_operation = true; + if (OPAL_UNLIKELY(++(s->before_num_operations_alltoallw) == + mca_coll_sync_component.barrier_before_nops_alltoallw || + ++(s->before_num_operations) == + mca_coll_sync_component.barrier_before_nops)) { + s->before_num_operations = 0; + s->before_num_operations_alltoallw = 0; + err = s->c_coll.coll_barrier(comm, s->c_coll.coll_barrier_module); + } + if (OPAL_LIKELY(MPI_SUCCESS == err)) + err = s->c_coll.coll_alltoallw(sbuf, scounts, sdisps, sdtypes, + rbuf, rcounts, rdisps, rdtypes, comm, + s->c_coll.coll_alltoallw_module); + if (OPAL_UNLIKELY(++(s->after_num_operations_alltoallw) == + mca_coll_sync_component.barrier_after_nops_alltoallw || + ++(s->after_num_operations) == + mca_coll_sync_component.barrier_after_nops)) { + s->after_num_operations = 0; + s->after_num_operations_alltoallw = 0; + err = s->c_coll.coll_barrier(comm, s->c_coll.coll_barrier_module); + } + s->in_operation = false; + return err; + } while(0); + } +} diff --git a/ompi/mca/coll/sync/coll_sync_component.c b/ompi/mca/coll/sync/coll_sync_component.c index 46243f0c91e..bfc172981b7 100644 --- a/ompi/mca/coll/sync/coll_sync_component.c +++ b/ompi/mca/coll/sync/coll_sync_component.c @@ -92,6 +92,23 @@ static int sync_register(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_sync_component.barrier_before_nops); + mca_coll_sync_component.barrier_before_nops_alltoallv = 0; + (void) mca_base_component_var_register(c, "barrier_before_alltoallv", + "Do a synchronization before each Nth alltoallv", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_sync_component.barrier_before_nops_alltoallv); + + mca_coll_sync_component.barrier_before_nops_alltoallw = 0; + (void) mca_base_component_var_register(c, "barrier_before_alltoallw", + "Do a synchronization before each Nth alltoallw", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_sync_component.barrier_before_nops_alltoallw); + + mca_coll_sync_component.barrier_after_nops = 0; (void) mca_base_component_var_register(c, "barrier_after", "Do a synchronization after each Nth collective", @@ -100,5 +117,21 @@ static int sync_register(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_sync_component.barrier_after_nops); + mca_coll_sync_component.barrier_after_nops_alltoallv = 0; + (void) mca_base_component_var_register(c, "barrier_after_alltoallv", + "Do a synchronization before each Nth alltoallv", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_sync_component.barrier_before_nops_alltoallv); + + mca_coll_sync_component.barrier_after_nops_alltoallw = 0; + (void) mca_base_component_var_register(c, "barrier_after_alltoallw", + "Do a synchronization before each Nth alltoallw", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_coll_sync_component.barrier_before_nops_alltoallw); + return OMPI_SUCCESS; } diff --git a/ompi/mca/coll/sync/coll_sync_module.c b/ompi/mca/coll/sync/coll_sync_module.c index 2e99c9925c0..d8fc268d35d 100644 --- a/ompi/mca/coll/sync/coll_sync_module.c +++ b/ompi/mca/coll/sync/coll_sync_module.c @@ -45,18 +45,34 @@ static void mca_coll_sync_module_construct(mca_coll_sync_module_t *module) memset(&(module->c_coll), 0, sizeof(module->c_coll)); module->before_num_operations = 0; module->after_num_operations = 0; + module->before_num_operations_alltoallv = 0; + module->after_num_operations_alltoallv = 0; + module->before_num_operations_alltoallw = 0; + module->after_num_operations_alltoallw = 0; module->in_operation = false; } static void mca_coll_sync_module_destruct(mca_coll_sync_module_t *module) { - OBJ_RELEASE(module->c_coll.coll_bcast_module); - OBJ_RELEASE(module->c_coll.coll_gather_module); - OBJ_RELEASE(module->c_coll.coll_gatherv_module); - OBJ_RELEASE(module->c_coll.coll_reduce_module); - OBJ_RELEASE(module->c_coll.coll_reduce_scatter_module); - OBJ_RELEASE(module->c_coll.coll_scatter_module); - OBJ_RELEASE(module->c_coll.coll_scatterv_module); + if (0 != mca_coll_sync_component.barrier_before_nops || + 0 != mca_coll_sync_component.barrier_after_nops) { + OBJ_RELEASE(module->c_coll.coll_alltoallv_module); + OBJ_RELEASE(module->c_coll.coll_alltoallw_module); + OBJ_RELEASE(module->c_coll.coll_bcast_module); + OBJ_RELEASE(module->c_coll.coll_gather_module); + OBJ_RELEASE(module->c_coll.coll_gatherv_module); + OBJ_RELEASE(module->c_coll.coll_reduce_module); + OBJ_RELEASE(module->c_coll.coll_reduce_scatter_module); + OBJ_RELEASE(module->c_coll.coll_scatter_module); + OBJ_RELEASE(module->c_coll.coll_scatterv_module); + } else { + if (0 != mca_coll_sync_component.barrier_before_nops_alltoallv || + 0 != mca_coll_sync_component.barrier_after_nops_alltoallv) + OBJ_RELEASE(module->c_coll.coll_alltoallv_module); + if (0 != mca_coll_sync_component.barrier_before_nops_alltoallw || + 0 != mca_coll_sync_component.barrier_after_nops_alltoallw) + OBJ_RELEASE(module->c_coll.coll_alltoallw_module); + } /* If the exscan module is not NULL, then this was an intracommunicator, and therefore scan will have a module as well. */ @@ -97,7 +113,11 @@ mca_coll_sync_comm_query(struct ompi_communicator_t *comm, /* If both MCA params are 0, then disqualify us */ if (0 == mca_coll_sync_component.barrier_before_nops && - 0 == mca_coll_sync_component.barrier_after_nops) { + 0 == mca_coll_sync_component.barrier_after_nops && + 0 == mca_coll_sync_component.barrier_before_nops_alltoallv && + 0 == mca_coll_sync_component.barrier_after_nops_alltoallv && + 0 == mca_coll_sync_component.barrier_before_nops_alltoallw && + 0 == mca_coll_sync_component.barrier_after_nops_alltoallw) { return NULL; } @@ -114,23 +134,50 @@ mca_coll_sync_comm_query(struct ompi_communicator_t *comm, /* The "all" versions are already synchronous. So no need for an additional barrier there. */ - sync_module->super.coll_allgather = NULL; - sync_module->super.coll_allgatherv = NULL; - sync_module->super.coll_allreduce = NULL; - sync_module->super.coll_alltoall = NULL; - sync_module->super.coll_alltoallv = NULL; - sync_module->super.coll_alltoallw = NULL; - sync_module->super.coll_barrier = NULL; - sync_module->super.coll_bcast = mca_coll_sync_bcast; - sync_module->super.coll_exscan = mca_coll_sync_exscan; - sync_module->super.coll_gather = mca_coll_sync_gather; - sync_module->super.coll_gatherv = mca_coll_sync_gatherv; - sync_module->super.coll_reduce = mca_coll_sync_reduce; - sync_module->super.coll_reduce_scatter = mca_coll_sync_reduce_scatter; - sync_module->super.coll_scan = mca_coll_sync_scan; - sync_module->super.coll_scatter = mca_coll_sync_scatter; - sync_module->super.coll_scatterv = mca_coll_sync_scatterv; - + if(0 == mca_coll_sync_component.barrier_before_nops && + 0 == mca_coll_sync_component.barrier_after_nops) { + sync_module->super.coll_allgather = NULL; + sync_module->super.coll_allgatherv = NULL; + sync_module->super.coll_allreduce = NULL; + sync_module->super.coll_alltoall = NULL; + if (0 != mca_coll_sync_component.barrier_before_nops_alltoallv || + 0 != mca_coll_sync_component.barrier_after_nops_alltoallv) + sync_module->super.coll_alltoallv = mca_coll_sync_alltoallv; + else + sync_module->super.coll_alltoallv = NULL; + if (0 != mca_coll_sync_component.barrier_before_nops_alltoallw || + 0 != mca_coll_sync_component.barrier_after_nops_alltoallw) + sync_module->super.coll_alltoallw = mca_coll_sync_alltoallw; + else + sync_module->super.coll_alltoallw = NULL; + sync_module->super.coll_barrier = NULL; + sync_module->super.coll_bcast = NULL; + sync_module->super.coll_exscan = NULL; + sync_module->super.coll_gather = NULL; + sync_module->super.coll_gatherv = NULL; + sync_module->super.coll_reduce = NULL; + sync_module->super.coll_reduce_scatter = NULL; + sync_module->super.coll_scan = NULL; + sync_module->super.coll_scatter = NULL; + sync_module->super.coll_scatterv = NULL; + } else { + sync_module->super.coll_allgather = NULL; + sync_module->super.coll_allgatherv = NULL; + sync_module->super.coll_allreduce = NULL; + sync_module->super.coll_alltoall = NULL; + sync_module->super.coll_alltoallv = mca_coll_sync_alltoallv; + sync_module->super.coll_alltoallw = mca_coll_sync_alltoallw; + sync_module->super.coll_barrier = NULL; + sync_module->super.coll_bcast = mca_coll_sync_bcast; + sync_module->super.coll_exscan = mca_coll_sync_exscan; + sync_module->super.coll_gather = mca_coll_sync_gather; + sync_module->super.coll_gatherv = mca_coll_sync_gatherv; + sync_module->super.coll_reduce = mca_coll_sync_reduce; + sync_module->super.coll_reduce_scatter = mca_coll_sync_reduce_scatter; + sync_module->super.coll_scan = mca_coll_sync_scan; + sync_module->super.coll_scatter = mca_coll_sync_scatter; + sync_module->super.coll_scatterv = mca_coll_sync_scatterv; + } return &(sync_module->super); } @@ -156,13 +203,26 @@ int mca_coll_sync_module_enable(mca_coll_base_module_t *module, OBJ_RETAIN(s->c_coll.coll_ ## name ## _module); \ } - CHECK_AND_RETAIN(bcast); - CHECK_AND_RETAIN(gather); - CHECK_AND_RETAIN(gatherv); - CHECK_AND_RETAIN(reduce); - CHECK_AND_RETAIN(reduce_scatter); - CHECK_AND_RETAIN(scatter); - CHECK_AND_RETAIN(scatterv); + if (0 != mca_coll_sync_component.barrier_before_nops || + 0 != mca_coll_sync_component.barrier_after_nops) { + CHECK_AND_RETAIN(alltoallv); + CHECK_AND_RETAIN(alltoallw); + CHECK_AND_RETAIN(bcast); + CHECK_AND_RETAIN(gather); + CHECK_AND_RETAIN(gatherv); + CHECK_AND_RETAIN(reduce); + CHECK_AND_RETAIN(reduce_scatter); + CHECK_AND_RETAIN(scatter); + CHECK_AND_RETAIN(scatterv); + } else { + if (0 != mca_coll_sync_component.barrier_before_nops_alltoallv || + 0 != mca_coll_sync_component.barrier_after_nops_alltoallv) + CHECK_AND_RETAIN(alltoallv); + if (0 != mca_coll_sync_component.barrier_before_nops_alltoallw || + 0 != mca_coll_sync_component.barrier_after_nops_alltoallw) + CHECK_AND_RETAIN(alltoallw); + } + if (!OMPI_COMM_IS_INTER(comm)) { /* MPI does not define scan/exscan on intercommunicators */ CHECK_AND_RETAIN(exscan);