Skip to content

Commit 280f330

Browse files
xinzhao3Tomislav Janjusic
authored andcommitted
ompi/oshmem/spml/ucx: use lockfree array to optimize spml_ucx_progress/delete oshmem_barrier in shmem_ctx_destroy
ompi/oshmem/spml/ucx: optimize spml ucx progress Signed-off-by: Tomislav Janjusic <[email protected]>
1 parent 1b9eaa2 commit 280f330

File tree

4 files changed

+127
-94
lines changed

4 files changed

+127
-94
lines changed

opal/mca/common/ucx/common_ucx.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ void opal_common_ucx_mca_proc_added(void)
151151
}
152152
}
153153
#endif
154+
}
154155

155156
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced)
156157
{

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 67 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ mca_spml_ucx_t mca_spml_ucx = {
7777
.get_mkey_slow = NULL
7878
};
7979

80-
OBJ_CLASS_INSTANCE(mca_spml_ucx_ctx_list_item_t, opal_list_item_t, NULL, NULL);
81-
8280
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
8381
.ucp_worker = NULL,
8482
.ucp_peers = NULL,
@@ -243,7 +241,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
243241
goto error;
244242
}
245243

246-
opal_progress_register(spml_ucx_progress);
244+
opal_progress_register(spml_ucx_default_progress);
247245

248246
mca_spml_ucx.remote_addrs_tbl = (char **)calloc(nprocs, sizeof(char *));
249247
memset(mca_spml_ucx.remote_addrs_tbl, 0, nprocs * sizeof(char *));
@@ -511,9 +509,46 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
511509
return OSHMEM_SUCCESS;
512510
}
513511

512+
static inline void _ctx_add(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
513+
{
514+
int i;
515+
516+
if (array->ctxs_count < array->ctxs_num) {
517+
array->ctxs[array->ctxs_count] = ctx;
518+
}
519+
else {
520+
array->ctxs = realloc(array->ctxs, (array->ctxs_num + 8) * sizeof(mca_spml_ucx_ctx_t *));
521+
opal_atomic_wmb ();
522+
for (i = array->ctxs_num; i < array->ctxs_num + 8; i++) {
523+
array->ctxs[i] = NULL;
524+
}
525+
array->ctxs[array->ctxs_num] = ctx;
526+
array->ctxs_num += 8;
527+
}
528+
529+
opal_atomic_wmb ();
530+
array->ctxs_count++;
531+
}
532+
533+
static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
534+
{
535+
int i;
536+
537+
for (i = 0; i < array->ctxs_count; i++) {
538+
if (array->ctxs[i] == ctx) {
539+
array->ctxs[i] = array->ctxs[array->ctxs_count-1];
540+
array->ctxs[array->ctxs_count-1] = NULL;
541+
break;
542+
}
543+
}
544+
545+
array->ctxs_count--;
546+
opal_atomic_wmb ();
547+
}
548+
514549
int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
515550
{
516-
mca_spml_ucx_ctx_list_item_t *ctx_item;
551+
mca_spml_ucx_ctx_t *ucx_ctx;
517552
ucp_worker_params_t params;
518553
ucp_ep_params_t ep_params;
519554
size_t i, j, nprocs = oshmem_num_procs();
@@ -524,8 +559,8 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
524559
sshmem_mkey_t *mkey;
525560
int rc = OSHMEM_ERROR;
526561

527-
ctx_item = OBJ_NEW(mca_spml_ucx_ctx_list_item_t);
528-
ctx_item->ctx.options = options;
562+
ucx_ctx = malloc(sizeof(mca_spml_ucx_ctx_t));
563+
ucx_ctx->options = options;
529564

530565
params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
531566
if (oshmem_mpi_thread_provided == SHMEM_THREAD_SINGLE || options & SHMEM_CTX_PRIVATE || options & SHMEM_CTX_SERIALIZED) {
@@ -535,22 +570,26 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
535570
}
536571

537572
err = ucp_worker_create(mca_spml_ucx.ucp_context, &params,
538-
&ctx_item->ctx.ucp_worker);
573+
&ucx_ctx->ucp_worker);
539574
if (UCS_OK != err) {
540-
OBJ_RELEASE(ctx_item);
575+
free(ucx_ctx);
541576
return OSHMEM_ERROR;
542577
}
543578

544-
ctx_item->ctx.ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ctx_item->ctx.ucp_peers)));
545-
if (NULL == ctx_item->ctx.ucp_peers) {
579+
ucx_ctx->ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ucx_ctx->ucp_peers)));
580+
if (NULL == ucx_ctx->ucp_peers) {
546581
goto error;
547582
}
548583

584+
if (mca_spml_ucx.active_array.ctxs_count == 0) {
585+
opal_progress_register(spml_ucx_ctx_progress);
586+
}
587+
549588
for (i = 0; i < nprocs; i++) {
550589
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
551590
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
552-
err = ucp_ep_create(ctx_item->ctx.ucp_worker, &ep_params,
553-
&ctx_item->ctx.ucp_peers[i].ucp_conn);
591+
err = ucp_ep_create(ucx_ctx->ucp_worker, &ep_params,
592+
&ucx_ctx->ucp_peers[i].ucp_conn);
554593
if (UCS_OK != err) {
555594
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", i, nprocs,
556595
ucs_status_string(err));
@@ -559,68 +598,55 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
559598

560599
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
561600
mkey = &memheap_map->mem_segs[j].mkeys_cache[i][0];
562-
ucx_mkey = &ctx_item->ctx.ucp_peers[i].mkeys[j].key;
563-
err = ucp_ep_rkey_unpack(ctx_item->ctx.ucp_peers[i].ucp_conn,
601+
ucx_mkey = &ucx_ctx->ucp_peers[i].mkeys[j].key;
602+
err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[i].ucp_conn,
564603
mkey->u.data,
565604
&ucx_mkey->rkey);
566605
if (UCS_OK != err) {
567606
SPML_UCX_ERROR("failed to unpack rkey");
568607
goto error2;
569608
}
570-
mca_spml_ucx_cache_mkey(&ctx_item->ctx, mkey, j, i);
609+
mca_spml_ucx_cache_mkey(ucx_ctx, mkey, j, i);
571610
}
572611
}
573612

574613
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
575-
576-
opal_list_append(&(mca_spml_ucx.ctx_list), &ctx_item->super);
577-
614+
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
578615
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
579616

580-
(*ctx) = (shmem_ctx_t)(&ctx_item->ctx);
581-
617+
(*ctx) = (shmem_ctx_t)ucx_ctx;
582618
return OSHMEM_SUCCESS;
583619

584620
error2:
585621
for (i = 0; i < nprocs; i++) {
586-
if (ctx_item->ctx.ucp_peers[i].ucp_conn) {
587-
ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn);
622+
if (ucx_ctx->ucp_peers[i].ucp_conn) {
623+
ucp_ep_destroy(ucx_ctx->ucp_peers[i].ucp_conn);
588624
}
589625
}
590626

591-
if (ctx_item->ctx.ucp_peers)
592-
free(ctx_item->ctx.ucp_peers);
627+
if (ucx_ctx->ucp_peers)
628+
free(ucx_ctx->ucp_peers);
593629

594630
error:
595-
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
596-
OBJ_RELEASE(ctx_item);
631+
ucp_worker_destroy(ucx_ctx->ucp_worker);
632+
free(ucx_ctx);
597633
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
598634
SPML_ERROR("ctx create FAILED rc=%d", rc);
599635
return rc;
600636
}
601637

602638
void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
603639
{
604-
mca_spml_ucx_ctx_list_item_t *ctx_item, *next;
605-
size_t i, j, nprocs = oshmem_num_procs();
606-
607640
MCA_SPML_CALL(quiet(ctx));
608641

609-
oshmem_shmem_barrier();
610-
611642
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
643+
_ctx_remove(&mca_spml_ucx.active_array, (mca_spml_ucx_ctx_t *)ctx);
644+
_ctx_add(&mca_spml_ucx.idle_array, (mca_spml_ucx_ctx_t *)ctx);
645+
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
612646

613-
/* delete context object from list */
614-
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
615-
mca_spml_ucx_ctx_list_item_t) {
616-
if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) {
617-
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
618-
opal_list_append(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super);
619-
break;
620-
}
647+
if (!mca_spml_ucx.active_array.ctxs_count) {
648+
opal_progress_unregister(spml_ucx_ctx_progress);
621649
}
622-
623-
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
624650
}
625651

626652
int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src)

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;
7474

7575
extern mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default;
7676

77-
struct mca_spml_ucx_ctx_list_item {
78-
opal_list_item_t super;
79-
mca_spml_ucx_ctx_t ctx;
80-
};
81-
typedef struct mca_spml_ucx_ctx_list_item mca_spml_ucx_ctx_list_item_t;
82-
8377
typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva);
8478

79+
typedef struct mca_spml_ucx_ctx_array {
80+
int ctxs_count;
81+
int ctxs_num;
82+
mca_spml_ucx_ctx_t **ctxs;
83+
} mca_spml_ucx_ctx_array_t;
84+
8585
struct mca_spml_ucx {
8686
mca_spml_base_module_t super;
8787
ucp_context_h ucp_context;
@@ -90,8 +90,8 @@ struct mca_spml_ucx {
9090
bool enabled;
9191
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
9292
char **remote_addrs_tbl;
93-
opal_list_t ctx_list;
94-
opal_list_t idle_ctx_list;
93+
mca_spml_ucx_ctx_array_t active_array;
94+
mca_spml_ucx_ctx_array_t idle_array;
9595
int priority; /* component priority */
9696
shmem_internal_mutex_t internal_mutex;
9797
};
@@ -151,7 +151,8 @@ extern int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs);
151151
extern int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs);
152152
extern int mca_spml_ucx_fence(shmem_ctx_t ctx);
153153
extern int mca_spml_ucx_quiet(shmem_ctx_t ctx);
154-
extern int spml_ucx_progress(void);
154+
extern int spml_ucx_default_progress(void);
155+
extern int spml_ucx_ctx_progress(void);
155156

156157
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
157158
{

0 commit comments

Comments
 (0)