Skip to content

Commit 4c28732

Browse files
xinzhao3Tomislav Janjusic
authored andcommitted
ompi/oshmem/spml/ucx: use lockfree array to optimize spml_ucx_progress/delete oshmem_barrier in shmem_ctx_destroy
ompi/oshmem/spml/ucx: optimize spml ucx progress Signed-off-by: Tomislav Janjusic <[email protected]> (cherry picked from commit 280f330)
1 parent 6f5a0a5 commit 4c28732

File tree

4 files changed

+127
-94
lines changed

4 files changed

+127
-94
lines changed

opal/mca/common/ucx/common_ucx.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ void opal_common_ucx_mca_proc_added(void)
151151
}
152152
}
153153
#endif
154+
}
154155

155156
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced)
156157
{

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 67 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ mca_spml_ucx_t mca_spml_ucx = {
8080
.get_mkey_slow = NULL
8181
};
8282

83-
OBJ_CLASS_INSTANCE(mca_spml_ucx_ctx_list_item_t, opal_list_item_t, NULL, NULL);
84-
8583
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
8684
.ucp_worker = NULL,
8785
.ucp_peers = NULL,
@@ -246,7 +244,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
246244
goto error;
247245
}
248246

249-
opal_progress_register(spml_ucx_progress);
247+
opal_progress_register(spml_ucx_default_progress);
250248

251249
mca_spml_ucx.remote_addrs_tbl = (char **)calloc(nprocs, sizeof(char *));
252250
memset(mca_spml_ucx.remote_addrs_tbl, 0, nprocs * sizeof(char *));
@@ -514,9 +512,46 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
514512
return OSHMEM_SUCCESS;
515513
}
516514

515+
static inline void _ctx_add(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
516+
{
517+
int i;
518+
519+
if (array->ctxs_count < array->ctxs_num) {
520+
array->ctxs[array->ctxs_count] = ctx;
521+
}
522+
else {
523+
array->ctxs = realloc(array->ctxs, (array->ctxs_num + 8) * sizeof(mca_spml_ucx_ctx_t *));
524+
opal_atomic_wmb ();
525+
for (i = array->ctxs_num; i < array->ctxs_num + 8; i++) {
526+
array->ctxs[i] = NULL;
527+
}
528+
array->ctxs[array->ctxs_num] = ctx;
529+
array->ctxs_num += 8;
530+
}
531+
532+
opal_atomic_wmb ();
533+
array->ctxs_count++;
534+
}
535+
536+
static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
537+
{
538+
int i;
539+
540+
for (i = 0; i < array->ctxs_count; i++) {
541+
if (array->ctxs[i] == ctx) {
542+
array->ctxs[i] = array->ctxs[array->ctxs_count-1];
543+
array->ctxs[array->ctxs_count-1] = NULL;
544+
break;
545+
}
546+
}
547+
548+
array->ctxs_count--;
549+
opal_atomic_wmb ();
550+
}
551+
517552
int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
518553
{
519-
mca_spml_ucx_ctx_list_item_t *ctx_item;
554+
mca_spml_ucx_ctx_t *ucx_ctx;
520555
ucp_worker_params_t params;
521556
ucp_ep_params_t ep_params;
522557
size_t i, j, nprocs = oshmem_num_procs();
@@ -527,8 +562,8 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
527562
sshmem_mkey_t *mkey;
528563
int rc = OSHMEM_ERROR;
529564

530-
ctx_item = OBJ_NEW(mca_spml_ucx_ctx_list_item_t);
531-
ctx_item->ctx.options = options;
565+
ucx_ctx = malloc(sizeof(mca_spml_ucx_ctx_t));
566+
ucx_ctx->options = options;
532567

533568
params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
534569
if (oshmem_mpi_thread_provided == SHMEM_THREAD_SINGLE || options & SHMEM_CTX_PRIVATE || options & SHMEM_CTX_SERIALIZED) {
@@ -538,22 +573,26 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
538573
}
539574

540575
err = ucp_worker_create(mca_spml_ucx.ucp_context, &params,
541-
&ctx_item->ctx.ucp_worker);
576+
&ucx_ctx->ucp_worker);
542577
if (UCS_OK != err) {
543-
OBJ_RELEASE(ctx_item);
578+
free(ucx_ctx);
544579
return OSHMEM_ERROR;
545580
}
546581

547-
ctx_item->ctx.ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ctx_item->ctx.ucp_peers)));
548-
if (NULL == ctx_item->ctx.ucp_peers) {
582+
ucx_ctx->ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ucx_ctx->ucp_peers)));
583+
if (NULL == ucx_ctx->ucp_peers) {
549584
goto error;
550585
}
551586

587+
if (mca_spml_ucx.active_array.ctxs_count == 0) {
588+
opal_progress_register(spml_ucx_ctx_progress);
589+
}
590+
552591
for (i = 0; i < nprocs; i++) {
553592
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
554593
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
555-
err = ucp_ep_create(ctx_item->ctx.ucp_worker, &ep_params,
556-
&ctx_item->ctx.ucp_peers[i].ucp_conn);
594+
err = ucp_ep_create(ucx_ctx->ucp_worker, &ep_params,
595+
&ucx_ctx->ucp_peers[i].ucp_conn);
557596
if (UCS_OK != err) {
558597
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", i, nprocs,
559598
ucs_status_string(err));
@@ -562,68 +601,55 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
562601

563602
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
564603
mkey = &memheap_map->mem_segs[j].mkeys_cache[i][0];
565-
ucx_mkey = &ctx_item->ctx.ucp_peers[i].mkeys[j].key;
566-
err = ucp_ep_rkey_unpack(ctx_item->ctx.ucp_peers[i].ucp_conn,
604+
ucx_mkey = &ucx_ctx->ucp_peers[i].mkeys[j].key;
605+
err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[i].ucp_conn,
567606
mkey->u.data,
568607
&ucx_mkey->rkey);
569608
if (UCS_OK != err) {
570609
SPML_UCX_ERROR("failed to unpack rkey");
571610
goto error2;
572611
}
573-
mca_spml_ucx_cache_mkey(&ctx_item->ctx, mkey, j, i);
612+
mca_spml_ucx_cache_mkey(ucx_ctx, mkey, j, i);
574613
}
575614
}
576615

577616
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
578-
579-
opal_list_append(&(mca_spml_ucx.ctx_list), &ctx_item->super);
580-
617+
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
581618
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
582619

583-
(*ctx) = (shmem_ctx_t)(&ctx_item->ctx);
584-
620+
(*ctx) = (shmem_ctx_t)ucx_ctx;
585621
return OSHMEM_SUCCESS;
586622

587623
error2:
588624
for (i = 0; i < nprocs; i++) {
589-
if (ctx_item->ctx.ucp_peers[i].ucp_conn) {
590-
ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn);
625+
if (ucx_ctx->ucp_peers[i].ucp_conn) {
626+
ucp_ep_destroy(ucx_ctx->ucp_peers[i].ucp_conn);
591627
}
592628
}
593629

594-
if (ctx_item->ctx.ucp_peers)
595-
free(ctx_item->ctx.ucp_peers);
630+
if (ucx_ctx->ucp_peers)
631+
free(ucx_ctx->ucp_peers);
596632

597633
error:
598-
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
599-
OBJ_RELEASE(ctx_item);
634+
ucp_worker_destroy(ucx_ctx->ucp_worker);
635+
free(ucx_ctx);
600636
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
601637
SPML_ERROR("ctx create FAILED rc=%d", rc);
602638
return rc;
603639
}
604640

605641
void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
606642
{
607-
mca_spml_ucx_ctx_list_item_t *ctx_item, *next;
608-
size_t i, j, nprocs = oshmem_num_procs();
609-
610643
MCA_SPML_CALL(quiet(ctx));
611644

612-
oshmem_shmem_barrier();
613-
614645
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
646+
_ctx_remove(&mca_spml_ucx.active_array, (mca_spml_ucx_ctx_t *)ctx);
647+
_ctx_add(&mca_spml_ucx.idle_array, (mca_spml_ucx_ctx_t *)ctx);
648+
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
615649

616-
/* delete context object from list */
617-
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
618-
mca_spml_ucx_ctx_list_item_t) {
619-
if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) {
620-
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
621-
opal_list_append(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super);
622-
break;
623-
}
650+
if (!mca_spml_ucx.active_array.ctxs_count) {
651+
opal_progress_unregister(spml_ucx_ctx_progress);
624652
}
625-
626-
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
627653
}
628654

629655
int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src)

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@ typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;
7575

7676
extern mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default;
7777

78-
struct mca_spml_ucx_ctx_list_item {
79-
opal_list_item_t super;
80-
mca_spml_ucx_ctx_t ctx;
81-
};
82-
typedef struct mca_spml_ucx_ctx_list_item mca_spml_ucx_ctx_list_item_t;
83-
8478
typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva);
8579

80+
typedef struct mca_spml_ucx_ctx_array {
81+
int ctxs_count;
82+
int ctxs_num;
83+
mca_spml_ucx_ctx_t **ctxs;
84+
} mca_spml_ucx_ctx_array_t;
85+
8686
struct mca_spml_ucx {
8787
mca_spml_base_module_t super;
8888
ucp_context_h ucp_context;
@@ -91,8 +91,8 @@ struct mca_spml_ucx {
9191
bool enabled;
9292
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
9393
char **remote_addrs_tbl;
94-
opal_list_t ctx_list;
95-
opal_list_t idle_ctx_list;
94+
mca_spml_ucx_ctx_array_t active_array;
95+
mca_spml_ucx_ctx_array_t idle_array;
9696
int priority; /* component priority */
9797
shmem_internal_mutex_t internal_mutex;
9898
};
@@ -152,7 +152,8 @@ extern int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs);
152152
extern int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs);
153153
extern int mca_spml_ucx_fence(shmem_ctx_t ctx);
154154
extern int mca_spml_ucx_quiet(shmem_ctx_t ctx);
155-
extern int spml_ucx_progress(void);
155+
extern int spml_ucx_default_progress(void);
156+
extern int spml_ucx_ctx_progress(void);
156157

157158
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
158159
{

0 commit comments

Comments
 (0)