Skip to content

Commit 69a80fc

Browse files
xinzhao3Tomislav Janjusic
authored andcommitted
ompi/oshmem/spml/ucx: use lockfree array to optimize spml_ucx_progress/delete oshmem_barrier in shmem_ctx_destroy
ompi/oshmem/spml/ucx: optimize spml ucx progress Signed-off-by: Tomislav Janjusic <[email protected]> (cherry picked from commit 9c3d00b)
1 parent 580b584 commit 69a80fc

File tree

4 files changed

+129
-94
lines changed

4 files changed

+129
-94
lines changed

opal/mca/common/ucx/common_ucx.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ void opal_common_ucx_mca_proc_added(void)
151151
}
152152
}
153153
#endif
154+
}
154155

155156
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced)
156157
{

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 66 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ mca_spml_ucx_t mca_spml_ucx = {
8080
.get_mkey_slow = NULL
8181
};
8282

83-
OBJ_CLASS_INSTANCE(mca_spml_ucx_ctx_list_item_t, opal_list_item_t, NULL, NULL);
84-
8583
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
8684
.ucp_worker = NULL,
8785
.ucp_peers = NULL,
@@ -246,7 +244,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
246244
goto error;
247245
}
248246

249-
opal_progress_register(spml_ucx_progress);
247+
opal_progress_register(spml_ucx_default_progress);
250248

251249
mca_spml_ucx.remote_addrs_tbl = (char **)calloc(nprocs, sizeof(char *));
252250
memset(mca_spml_ucx.remote_addrs_tbl, 0, nprocs * sizeof(char *));
@@ -514,9 +512,45 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
514512
return OSHMEM_SUCCESS;
515513
}
516514

515+
static inline void _ctx_add(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
516+
{
517+
int i;
518+
519+
if (array->ctxs_count < array->ctxs_num) {
520+
array->ctxs[array->ctxs_count] = ctx;
521+
} else {
522+
array->ctxs = realloc(array->ctxs, (array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC) * sizeof(mca_spml_ucx_ctx_t *));
523+
opal_atomic_wmb ();
524+
for (i = array->ctxs_num; i < array->ctxs_num + MCA_SPML_UCX_CTXS_ARRAY_INC; i++) {
525+
array->ctxs[i] = NULL;
526+
}
527+
array->ctxs[array->ctxs_num] = ctx;
528+
array->ctxs_num += MCA_SPML_UCX_CTXS_ARRAY_INC;
529+
}
530+
531+
opal_atomic_wmb ();
532+
array->ctxs_count++;
533+
}
534+
535+
static inline void _ctx_remove(mca_spml_ucx_ctx_array_t *array, mca_spml_ucx_ctx_t *ctx)
536+
{
537+
int i;
538+
539+
for (i = 0; i < array->ctxs_count; i++) {
540+
if (array->ctxs[i] == ctx) {
541+
array->ctxs[i] = array->ctxs[array->ctxs_count-1];
542+
array->ctxs[array->ctxs_count-1] = NULL;
543+
break;
544+
}
545+
}
546+
547+
array->ctxs_count--;
548+
opal_atomic_wmb ();
549+
}
550+
517551
int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
518552
{
519-
mca_spml_ucx_ctx_list_item_t *ctx_item;
553+
mca_spml_ucx_ctx_t *ucx_ctx;
520554
ucp_worker_params_t params;
521555
ucp_ep_params_t ep_params;
522556
size_t i, j, nprocs = oshmem_num_procs();
@@ -527,8 +561,8 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
527561
sshmem_mkey_t *mkey;
528562
int rc = OSHMEM_ERROR;
529563

530-
ctx_item = OBJ_NEW(mca_spml_ucx_ctx_list_item_t);
531-
ctx_item->ctx.options = options;
564+
ucx_ctx = malloc(sizeof(mca_spml_ucx_ctx_t));
565+
ucx_ctx->options = options;
532566

533567
params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
534568
if (oshmem_mpi_thread_provided == SHMEM_THREAD_SINGLE || options & SHMEM_CTX_PRIVATE || options & SHMEM_CTX_SERIALIZED) {
@@ -538,22 +572,26 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
538572
}
539573

540574
err = ucp_worker_create(mca_spml_ucx.ucp_context, &params,
541-
&ctx_item->ctx.ucp_worker);
575+
&ucx_ctx->ucp_worker);
542576
if (UCS_OK != err) {
543-
OBJ_RELEASE(ctx_item);
577+
free(ucx_ctx);
544578
return OSHMEM_ERROR;
545579
}
546580

547-
ctx_item->ctx.ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ctx_item->ctx.ucp_peers)));
548-
if (NULL == ctx_item->ctx.ucp_peers) {
581+
ucx_ctx->ucp_peers = (ucp_peer_t *) calloc(nprocs, sizeof(*(ucx_ctx->ucp_peers)));
582+
if (NULL == ucx_ctx->ucp_peers) {
549583
goto error;
550584
}
551585

586+
if (mca_spml_ucx.active_array.ctxs_count == 0) {
587+
opal_progress_register(spml_ucx_ctx_progress);
588+
}
589+
552590
for (i = 0; i < nprocs; i++) {
553591
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
554592
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
555-
err = ucp_ep_create(ctx_item->ctx.ucp_worker, &ep_params,
556-
&ctx_item->ctx.ucp_peers[i].ucp_conn);
593+
err = ucp_ep_create(ucx_ctx->ucp_worker, &ep_params,
594+
&ucx_ctx->ucp_peers[i].ucp_conn);
557595
if (UCS_OK != err) {
558596
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", i, nprocs,
559597
ucs_status_string(err));
@@ -562,68 +600,55 @@ int mca_spml_ucx_ctx_create(long options, shmem_ctx_t *ctx)
562600

563601
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
564602
mkey = &memheap_map->mem_segs[j].mkeys_cache[i][0];
565-
ucx_mkey = &ctx_item->ctx.ucp_peers[i].mkeys[j].key;
566-
err = ucp_ep_rkey_unpack(ctx_item->ctx.ucp_peers[i].ucp_conn,
603+
ucx_mkey = &ucx_ctx->ucp_peers[i].mkeys[j].key;
604+
err = ucp_ep_rkey_unpack(ucx_ctx->ucp_peers[i].ucp_conn,
567605
mkey->u.data,
568606
&ucx_mkey->rkey);
569607
if (UCS_OK != err) {
570608
SPML_UCX_ERROR("failed to unpack rkey");
571609
goto error2;
572610
}
573-
mca_spml_ucx_cache_mkey(&ctx_item->ctx, mkey, j, i);
611+
mca_spml_ucx_cache_mkey(ucx_ctx, mkey, j, i);
574612
}
575613
}
576614

577615
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
578-
579-
opal_list_append(&(mca_spml_ucx.ctx_list), &ctx_item->super);
580-
616+
_ctx_add(&mca_spml_ucx.active_array, ucx_ctx);
581617
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
582618

583-
(*ctx) = (shmem_ctx_t)(&ctx_item->ctx);
584-
619+
(*ctx) = (shmem_ctx_t)ucx_ctx;
585620
return OSHMEM_SUCCESS;
586621

587622
error2:
588623
for (i = 0; i < nprocs; i++) {
589-
if (ctx_item->ctx.ucp_peers[i].ucp_conn) {
590-
ucp_ep_destroy(ctx_item->ctx.ucp_peers[i].ucp_conn);
624+
if (ucx_ctx->ucp_peers[i].ucp_conn) {
625+
ucp_ep_destroy(ucx_ctx->ucp_peers[i].ucp_conn);
591626
}
592627
}
593628

594-
if (ctx_item->ctx.ucp_peers)
595-
free(ctx_item->ctx.ucp_peers);
629+
if (ucx_ctx->ucp_peers)
630+
free(ucx_ctx->ucp_peers);
596631

597632
error:
598-
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
599-
OBJ_RELEASE(ctx_item);
633+
ucp_worker_destroy(ucx_ctx->ucp_worker);
634+
free(ucx_ctx);
600635
rc = OSHMEM_ERR_OUT_OF_RESOURCE;
601636
SPML_ERROR("ctx create FAILED rc=%d", rc);
602637
return rc;
603638
}
604639

605640
void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
606641
{
607-
mca_spml_ucx_ctx_list_item_t *ctx_item, *next;
608-
size_t i, j, nprocs = oshmem_num_procs();
609-
610642
MCA_SPML_CALL(quiet(ctx));
611643

612-
oshmem_shmem_barrier();
613-
614644
SHMEM_MUTEX_LOCK(mca_spml_ucx.internal_mutex);
645+
_ctx_remove(&mca_spml_ucx.active_array, (mca_spml_ucx_ctx_t *)ctx);
646+
_ctx_add(&mca_spml_ucx.idle_array, (mca_spml_ucx_ctx_t *)ctx);
647+
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
615648

616-
/* delete context object from list */
617-
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
618-
mca_spml_ucx_ctx_list_item_t) {
619-
if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) {
620-
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
621-
opal_list_append(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super);
622-
break;
623-
}
649+
if (!mca_spml_ucx.active_array.ctxs_count) {
650+
opal_progress_unregister(spml_ucx_ctx_progress);
624651
}
625-
626-
SHMEM_MUTEX_UNLOCK(mca_spml_ucx.internal_mutex);
627652
}
628653

629654
int mca_spml_ucx_get(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src)

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@ typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;
7575

7676
extern mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default;
7777

78-
struct mca_spml_ucx_ctx_list_item {
79-
opal_list_item_t super;
80-
mca_spml_ucx_ctx_t ctx;
81-
};
82-
typedef struct mca_spml_ucx_ctx_list_item mca_spml_ucx_ctx_list_item_t;
83-
8478
typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva);
8579

80+
typedef struct mca_spml_ucx_ctx_array {
81+
int ctxs_count;
82+
int ctxs_num;
83+
mca_spml_ucx_ctx_t **ctxs;
84+
} mca_spml_ucx_ctx_array_t;
85+
8686
struct mca_spml_ucx {
8787
mca_spml_base_module_t super;
8888
ucp_context_h ucp_context;
@@ -91,8 +91,8 @@ struct mca_spml_ucx {
9191
bool enabled;
9292
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
9393
char **remote_addrs_tbl;
94-
opal_list_t ctx_list;
95-
opal_list_t idle_ctx_list;
94+
mca_spml_ucx_ctx_array_t active_array;
95+
mca_spml_ucx_ctx_array_t idle_array;
9696
int priority; /* component priority */
9797
shmem_internal_mutex_t internal_mutex;
9898
};
@@ -152,7 +152,8 @@ extern int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs);
152152
extern int mca_spml_ucx_del_procs(ompi_proc_t** procs, size_t nprocs);
153153
extern int mca_spml_ucx_fence(shmem_ctx_t ctx);
154154
extern int mca_spml_ucx_quiet(shmem_ctx_t ctx);
155-
extern int spml_ucx_progress(void);
155+
extern int spml_ucx_default_progress(void);
156+
extern int spml_ucx_ctx_progress(void);
156157

157158
static void mca_spml_ucx_cache_mkey(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe)
158159
{
@@ -193,6 +194,9 @@ static inline int ucx_status_to_oshmem_nb(ucs_status_t status)
193194
#endif
194195
}
195196

197+
#define MCA_SPML_UCX_CTXS_ARRAY_SIZE 64
198+
#define MCA_SPML_UCX_CTXS_ARRAY_INC 64
199+
196200
END_C_DECLS
197201

198202
#endif

0 commit comments

Comments
 (0)