Skip to content

Commit 489dbcf

Browse files
committed
spml/ucx: shuffle EPs creation
1 parent afc970c commit 489dbcf

File tree

1 file changed

+28
-9
lines changed

1 file changed

+28
-9
lines changed

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -634,16 +634,16 @@ int mca_spml_ucx_clear_put_op_mask(mca_spml_ucx_ctx_t *ctx)
634634
int mca_spml_ucx_add_procs(oshmem_group_t* group, size_t nprocs)
635635
{
636636
int rc = OSHMEM_ERROR;
637-
int my_rank = oshmem_my_proc_id();
638637
size_t ucp_workers = mca_spml_ucx.ucp_workers;
639638
unsigned int *wk_roffs = NULL;
640639
unsigned int *wk_rsizes = NULL;
641640
char *wk_raddrs = NULL;
642-
size_t i, w, n;
641+
size_t i, j, w, n, temp;
643642
ucs_status_t err;
644643
ucp_address_t **wk_local_addr;
645644
unsigned int *wk_addr_len;
646645
ucp_ep_params_t ep_params;
646+
int *indices;
647647

648648
wk_local_addr = calloc(mca_spml_ucx.ucp_workers, sizeof(ucp_address_t *));
649649
wk_addr_len = calloc(mca_spml_ucx.ucp_workers, sizeof(size_t));
@@ -691,23 +691,40 @@ int mca_spml_ucx_add_procs(oshmem_group_t* group, size_t nprocs)
691691
}
692692
}
693693

694+
indices = malloc(nprocs * sizeof(int));
695+
if (!indices) {
696+
goto error;
697+
}
698+
699+
for (i = 0; i < nprocs; i++) {
700+
indices[i] = i;
701+
}
702+
703+
srand((unsigned int)time(NULL));
704+
694705
/* Get the EP connection requests for all the processes from modex */
695-
for (n = 0; n < nprocs; ++n) {
696-
i = (my_rank + n) % nprocs;
706+
for (i = nprocs - 1; i >= 0; --i) {
707+
/* Fisher-Yates shuffle algorithm */
708+
if (i > 0) {
709+
j = rand() % (i + 1);
710+
temp = indices[i];
711+
indices[i] = indices[j];
712+
indices[j] = temp;
713+
}
697714

698715
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
699-
ep_params.address = (ucp_address_t *)mca_spml_ucx.remote_addrs_tbl[0][i];
716+
ep_params.address = (ucp_address_t *) mca_spml_ucx.remote_addrs_tbl[0][indices[i]];
700717

701718
err = ucp_ep_create(mca_spml_ucx_ctx_default.ucp_worker[0], &ep_params,
702-
&mca_spml_ucx_ctx_default.ucp_peers[i].ucp_conn);
719+
&mca_spml_ucx_ctx_default.ucp_peers[indices[i]].ucp_conn);
703720
if (UCS_OK != err) {
704721
SPML_UCX_ERROR("ucp_ep_create(proc=%zu/%zu) failed: %s", n, nprocs,
705722
ucs_status_string(err));
706723
goto error2;
707724
}
708725

709726
/* Initialize mkeys as NULL for all processes */
710-
mca_spml_ucx_peer_mkey_cache_init(&mca_spml_ucx_ctx_default, i);
727+
mca_spml_ucx_peer_mkey_cache_init(&mca_spml_ucx_ctx_default, indices[i]);
711728
}
712729

713730
for (i = 0; i < mca_spml_ucx.ucp_workers; i++) {
@@ -719,6 +736,7 @@ int mca_spml_ucx_add_procs(oshmem_group_t* group, size_t nprocs)
719736
free(wk_roffs);
720737
free(wk_addr_len);
721738
free(wk_local_addr);
739+
free(indices);
722740

723741
SPML_UCX_VERBOSE(50, "*** ADDED PROCS ***");
724742

@@ -753,6 +771,7 @@ int mca_spml_ucx_add_procs(oshmem_group_t* group, size_t nprocs)
753771
free(wk_raddrs);
754772
free(wk_rsizes);
755773
free(wk_roffs);
774+
free(indices);
756775
error:
757776
free(wk_addr_len);
758777
free(wk_local_addr);
@@ -1025,7 +1044,7 @@ static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx
10251044

10261045
opal_atomic_wmb ();
10271046
}
1028-
1047+
10291048
static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx_ctx_p)
10301049
{
10311050
ucp_worker_params_t params;
@@ -1044,7 +1063,7 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
10441063
ucx_ctx->ucp_worker = calloc(1, sizeof(ucp_worker_h));
10451064
ucx_ctx->ucp_workers = 1;
10461065
ucx_ctx->synchronized_quiet = mca_spml_ucx_ctx_default.synchronized_quiet;
1047-
ucx_ctx->strong_sync = mca_spml_ucx_ctx_default.strong_sync;
1066+
ucx_ctx->strong_sync = mca_spml_ucx_ctx_default.strong_sync;
10481067

10491068
params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
10501069
if (oshmem_mpi_thread_provided == SHMEM_THREAD_SINGLE ||

0 commit comments

Comments
 (0)