Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ompi/mca/coll/tuned/coll_tuned.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ extern int ompi_coll_tuned_scatter_large_msg;
extern int ompi_coll_tuned_scatter_min_procs;
extern int ompi_coll_tuned_scatter_blocking_send_ratio;

/* Congestion variables */
extern int ompi_coll_tuned_alltoall_congest_algorithm;
extern int ompi_coll_tuned_alltoall_congest_threshold;

/* forced algorithm choices */
/* this structure is for storing the indexes to the forced algorithm mca params... */
/* we get these at component query (so that registered values appear in ompi_infoi) */
Expand Down Expand Up @@ -184,6 +188,10 @@ int ompi_coll_tuned_scan_intra_check_forced_init (coll_tuned_force_algorithm_mca

int mca_coll_tuned_ft_event(int state);

/* Congestion functions */
int ompi_coll_tuned_get_congest_algo(void);
int ompi_coll_tuned_isCongested(struct ompi_communicator_t *comm);

struct mca_coll_tuned_component_t {
/** Base coll component */
mca_coll_base_component_2_0_0_t super;
Expand Down
25 changes: 25 additions & 0 deletions ompi/mca/coll/tuned/coll_tuned_alltoall_decision.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,34 @@ int ompi_coll_tuned_alltoall_intra_do_this(const void *sbuf, int scount,
int algorithm, int faninout, int segsize,
int max_requests)
{

/*
* TJN: Check if congested, and if so change the algorithm.
* TODO: Should likely make this coll-type specific,
* i.e., ompi_coll_tuned_alltoall_isCongested()
* They can share code, but likely different heuristics
* for different coll types.
*/
if ( ompi_coll_tuned_isCongested(comm) ) {
int new_alg;
int comm_rank = 0;
comm_rank = ompi_comm_rank(comm);

new_alg = ompi_coll_tuned_get_congest_algo();

if (new_alg >= 0) {
OPAL_OUTPUT((ompi_coll_tuned_stream, " # (Rank %d) DBG: intra_do_this CONGESTED OVERRIDE algorithm = %d with new_alg = %d\n", comm_rank, algorithm, new_alg));
//fprintf(stderr, " # (Rank %d) DBG: intra_do_this CONGESTED OVERRIDE algorithm = %d with new_alg = %d\n", comm_rank, algorithm, new_alg);
algorithm = new_alg;
}
}

OPAL_OUTPUT((ompi_coll_tuned_stream,"coll:tuned:alltoall_intra_do_this selected algorithm %d topo faninout %d segsize %d",
algorithm, faninout, segsize));

//fprintf(stderr, " # (Rank %d) DBG: coll:tuned:alltoall_intra_do_this selected algorithm %d topo faninout %d segsize %d\n",
// comm_rank, algorithm, faninout, segsize);

switch (algorithm) {
case (0):
return ompi_coll_tuned_alltoall_intra_dec_fixed(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm, module);
Expand Down
24 changes: 24 additions & 0 deletions ompi/mca/coll/tuned/coll_tuned_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ int ompi_coll_tuned_alltoall_large_msg = 3000;
int ompi_coll_tuned_alltoall_min_procs = 0; /* disable by default */
int ompi_coll_tuned_alltoall_max_requests = 0; /* no limit for alltoall by default */


/* Congestion variables */
int ompi_coll_tuned_alltoall_congest_algorithm = 3; /* Default algo during congestion */
int ompi_coll_tuned_alltoall_congest_threshold = 100; /* Threshold to decide on congestion */


/* Disable by default */
int ompi_coll_tuned_scatter_intermediate_msg = 0;
int ompi_coll_tuned_scatter_large_msg = 0;
Expand Down Expand Up @@ -191,6 +197,24 @@ static int tuned_register(void)
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_coll_tuned_dynamic_rules_filename);

ompi_coll_tuned_alltoall_congest_threshold = 100;
(void) mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version,
"alltoall_congest_threshold",
"Threshold (if SPCs enabled) to decide if congestion present in alltoall algorithm (integer: num cycles diff)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_6,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_coll_tuned_alltoall_congest_threshold);

ompi_coll_tuned_alltoall_congest_algorithm = 3;
(void) mca_base_component_var_register(&mca_coll_tuned_component.super.collm_version,
"alltoall_congest_algorithm",
"Algorithm to use when congestion is present in alltoall operation (integer: 0-5)",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_6,
MCA_BASE_VAR_SCOPE_READONLY,
&ompi_coll_tuned_alltoall_congest_algorithm);

/* register forced params */
ompi_coll_tuned_allreduce_intra_check_forced_init(&ompi_coll_tuned_forced_params[ALLREDUCE]);
ompi_coll_tuned_alltoall_intra_check_forced_init(&ompi_coll_tuned_forced_params[ALLTOALL]);
Expand Down
175 changes: 175 additions & 0 deletions ompi/mca/coll/tuned/coll_tuned_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "mpi.h"
#include "ompi/communicator/communicator.h"
#include "ompi/runtime/ompi_spc.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/base.h"
#include "ompi/mca/coll/base/coll_base_topo.h"
Expand Down Expand Up @@ -119,6 +120,156 @@ ompi_coll_tuned_comm_query(struct ompi_communicator_t *comm, int *priority)
/* We put all routines that handle the MCA user forced algorithm and parameter choices here */
/* recheck the setting of forced, called on module create (i.e. for each new comm) */


/* Congestion - past value for alltoall */
static int _congest_spc_time_alltoall_past_value = 0;



/*
* Congestion - detection function
*
* TJN: This is a very simplistic congestion detection method.
* It has several flaws, but shows a basic threshold based
* method for determining "congested".
* The threshold can be adjust via an MCA parameter.
* There are also a few magic envvars to override things
* for testing.
*
* NOTE: This is specific to Alltoall for now.
*
* - MCA: coll_tuned_alltoall_congest_threshold
* The threshold at which point we decide the difference
* in current/past value of SPC indicates "congested".
*
* - EnvVar: 'OMPIX_SKIP_CONGESTED_ALLREDUCE'
* This will not detect congestion, becasue only do a
* local check and need to have concensus across the comm.
* This is just a way to show the difference at each
* rank, but always returns "not-congested" overall.
*
* - EnvVar: 'OMPIX_FORCE_CONGESTED'
* This is just a hardcoded flag to force the congestion
* check to return true regardless of the actual status
* of the network.
*/
int
ompi_coll_tuned_isCongested(struct ompi_communicator_t *comm)
{
long long new_value = 0;
long long diff = 0;
int rc;
int comm_rank;
long long diff_max = 0;

OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned: alltoall_congest_threshold = %d\n", ompi_coll_tuned_alltoall_congest_threshold));

/* Get our local congestion info */
rc = ompi_spc_value_diff("OMPI_SPC_TIME_ALLTOALL",
_congest_spc_time_alltoall_past_value,
&new_value,
&diff);
if (0 != rc) {
return 0; /* Ignore error for now (treat as not congested) */
}

comm_rank = ompi_comm_rank(comm);

OPAL_OUTPUT((ompi_coll_tuned_stream, " #-- DBG: (Rank %d) MY-CONGESTION-INFO (thresh=%d, past_value=%d, new_value=%d, diff=%d)\n", comm_rank, ompi_coll_tuned_alltoall_congest_threshold, _congest_spc_time_alltoall_past_value, new_value, diff));
/* TJN: quick hack to see congest diff info per rank (w/o full verbose) */
if (NULL != getenv("OMPIX_SHOW_CONGEST_INFO")) {
fprintf(stderr, " #-- DBG: (Rank %d) MY-CONGESTION-INFO (thresh=%d, past_value=%d, new_value=%d, diff=%d)\n", comm_rank, ompi_coll_tuned_alltoall_congest_threshold, _congest_spc_time_alltoall_past_value, new_value, diff);
}

/* Check if this is our first measurement */
if ((diff == new_value) && (0 == _congest_spc_time_alltoall_past_value)) {

_congest_spc_time_alltoall_past_value = new_value;

return 0; /* Ignore first measurement as not congested */
}

_congest_spc_time_alltoall_past_value = new_value;

/*
* TJN: Skip the allreduce and do *only* the local diff check,
* but in this case we will not adjust the algorithm, just
* report the difference and move on.
*/
if (NULL != getenv("OMPIX_SKIP_CONGESTED_ALLREDUCE")) {

/* diff: (local-only) Decide how different from past */
if ((0 != diff) && (diff > ompi_coll_tuned_alltoall_congest_threshold)) {
OPAL_OUTPUT((ompi_coll_tuned_stream, " #-- DBG: (Rank %d) LOCAL-ONLY CONGESTION SKIP-ALLREDUCE (thresh=%d, new_value=%d, diff=%d)!\n", comm_rank, ompi_coll_tuned_alltoall_congest_threshold, new_value, diff));
fprintf(stderr, " #-- DBG: (Rank %d) LOCAL-ONLY CONGESTION SKIP-ALLREDUCE (thresh=%d, new_value=%d, diff=%d)!\n", comm_rank, ompi_coll_tuned_alltoall_congest_threshold, new_value, diff);

return 0; /* Always return 'not congested' for this case */
}

} else {
comm_rank = ompi_comm_rank(comm);

/*
* Aggregate all of the information using MPI_Allreduce(MAX)
* on diff value to see if any rank in comm exceeded the
* max threshold.
*
* TODO TJN: Change this when we add congestion checks for MPI_Reduce()!
*/
(void)comm->c_coll->coll_allreduce(&diff, &diff_max,
1, MPI_LONG_LONG, MPI_MAX,
comm,
comm->c_coll->coll_allreduce_module);

(void)comm->c_coll->coll_barrier(comm, comm->c_coll->coll_barrier_module);

/* diff_max: (global max) Decide how different from past */
if ((0 != diff_max) && (diff_max > ompi_coll_tuned_alltoall_congest_threshold)) {
OPAL_OUTPUT((ompi_coll_tuned_stream, " #-- DBG: (Rank %d) EXCEED CONGESTION THRESHOLD -- CONGESTED (thresh=%d, new_value=%d, diff=%d, diff_max=%d)!\n", comm_rank, ompi_coll_tuned_alltoall_congest_threshold, new_value, diff, diff_max));
//fprintf(stderr, " #-- DBG: (Rank %d) EXCEED CONGESTION THRESHOLD -- CONGESTED (thresh=%d, new_value=%d, diff=%d, diff_max=%d)!\n", comm_rank, ompi_coll_tuned_alltoall_congest_threshold, new_value, diff, diff_max);
return 1; /* Yes congested */
}
}

#if 1
/* XXX: for now if have env var set we call it congested */
if (NULL != getenv("OMPIX_FORCE_CONGESTED")) {
fprintf(stderr, " #-- DBG: TJN_HACK_CONGESTED CONGESTION FORCED -- CONGESTED!\n");
return 1; /* Yes congested */
}
#endif

return 0; /* Not congested */
}

/*
* Congestion - get algorithm to use when congested
*
* TJN: This is a very simplistic method to return the registered
* default algorithm to use when congestion is detected.
* This is set via an MCA parameter, which can be overriden
* at runtime.
*
* NOTE: This is specific to Alltoall for now.
*
* - MCA: coll_tuned_alltoall_congest_algorithm
* The alltoall algorithm to use when we detect
* congestion, i.e., ompi_coll_tuned_isCongested() is true.
*/
int
ompi_coll_tuned_get_congest_algo(void)
{
int alg = -1;

/* TODO: Should check this is a valid alltoall algo */
alg = ompi_coll_tuned_alltoall_congest_algorithm;

OPAL_OUTPUT((ompi_coll_tuned_stream, "coll:tuned: alltoall_congest_algorithm = %d\n", alg));

return (alg);
}


static int
ompi_coll_tuned_forced_getvalues( enum COLLTYPE type,
coll_tuned_force_algorithm_params_t *forced_values )
Expand All @@ -136,6 +287,30 @@ ompi_coll_tuned_forced_getvalues( enum COLLTYPE type,
mca_base_var_get_value(mca_params->algorithm_param_index, &tmp, NULL, NULL);
forced_values->algorithm = tmp ? tmp[0] : 0;

#if 0
/* Congestion stuff (likely cut this) */
/* TJN: We are only changing the algorithm for ALLTOALL (if congested) */
if( ALLTOALL == type ) {

//fprintf(stderr, " #-- DBG: FORCED_GETVALUES (cur) algorithm = %d\n",
// forced_values->algorithm);

if ( ompi_coll_tuned_isCongested() ) {
int new_alg;
new_alg = ompi_coll_tuned_get_congest_algo();
if (new_alg >= 0) {
forced_values->algorithm = new_alg;

//fprintf(stderr, " #-- DBG: HACK OVERRIDE ALLTOALL FORCED_GETVALUES algorithm = %d (new_alg=%d)\n",
// forced_values->algorithm, new_alg);
}
}

//fprintf(stderr, " #-- DBG: FORCED_GETVALUES (new) algorithm = %d\n",
// forced_values->algorithm);
}
#endif

if( BARRIER != type ) {
mca_base_var_get_value(mca_params->segsize_param_index, &tmp, NULL, NULL);
if (tmp) forced_values->segsize = tmp[0];
Expand Down
6 changes: 6 additions & 0 deletions ompi/mpi/c/alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
{
int err;
size_t recvtype_size;
opal_timer_t timer = 0; /* SPC */

SPC_RECORD(OMPI_SPC_ALLTOALL, 1);

Expand Down Expand Up @@ -106,10 +107,15 @@ int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype,

OPAL_CR_ENTER_LIBRARY();

SPC_TIMER_START(OMPI_SPC_TIME_ALLTOALL, &timer);

/* Invoke the coll component to perform the back-end operation */
err = comm->c_coll->coll_alltoall(sendbuf, sendcount, sendtype,
recvbuf, recvcount, recvtype,
comm, comm->c_coll->coll_alltoall_module);

SPC_TIMER_STOP(OMPI_SPC_TIME_ALLTOALL, &timer);

OMPI_ERRHANDLER_RETURN(err, comm, err, FUNC_NAME);
}

Loading