Skip to content

Commit 83cf146

Browse files
xinzhao3Tomislav Janjusic
authored andcommitted
ompi/oshmem/spml/ucx: defer clean up shmem_ctx to shmem_finalize
Signed-off-by: Tomislav Janjusic <[email protected]> (cherry picked from commit 2baf464)
1 parent ce54b63 commit 83cf146

File tree

5 files changed

+91
-49
lines changed

5 files changed

+91
-49
lines changed

opal/mca/common/ucx/common_ucx.c

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,17 @@ void opal_common_ucx_mca_proc_added(void)
151151
}
152152
}
153153
#endif
154+
155+
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced)
156+
{
157+
int ret = OPAL_SUCCESS;
158+
159+
if (OPAL_SUCCESS != (ret = opal_pmix.fence_nb(NULL, 0,
160+
opal_common_ucx_mca_fence_complete_cb, (void*)fenced))){
161+
return ret;
162+
}
163+
164+
return ret;
154165
}
155166

156167
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker)
@@ -181,9 +192,8 @@ static void opal_common_ucx_wait_all_requests(void **reqs, int count, ucp_worker
181192
}
182193
}
183194

184-
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
185-
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
186-
{
195+
OPAL_DECLSPEC int opal_common_ucx_del_procs_nb(opal_common_ucx_del_proc_t *procs, size_t count,
196+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker) {
187197
size_t num_reqs;
188198
size_t max_reqs;
189199
void *dreq, **dreqs;
@@ -230,7 +240,17 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, s
230240
opal_common_ucx_wait_all_requests(dreqs, num_reqs, worker);
231241
free(dreqs);
232242

233-
opal_common_ucx_mca_pmix_fence(worker);
243+
return OPAL_SUCCESS;
244+
}
245+
246+
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
247+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker)
248+
{
249+
int ret = OPAL_SUCCESS;
250+
opal_common_ucx_del_procs_nb(procs, count, my_rank, max_disconnect, worker);
251+
if (OPAL_SUCCESS != (ret = opal_common_ucx_mca_pmix_fence(worker))) {
252+
return ret;
253+
}
234254

235255
return OPAL_SUCCESS;
236256
}

opal/mca/common/ucx/common_ucx.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,12 @@ OPAL_DECLSPEC void opal_common_ucx_mca_deregister(void);
100100
OPAL_DECLSPEC void opal_common_ucx_mca_proc_added(void);
101101
OPAL_DECLSPEC void opal_common_ucx_empty_complete_cb(void *request, ucs_status_t status);
102102
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence(ucp_worker_h worker);
103-
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
103+
OPAL_DECLSPEC int opal_common_ucx_mca_pmix_fence_nb(int *fenced);
104104
OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, size_t count,
105105
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
106+
OPAL_DECLSPEC int opal_common_ucx_del_procs_nb(opal_common_ucx_del_proc_t *procs, size_t count,
107+
size_t my_rank, size_t max_disconnect, ucp_worker_h worker);
108+
OPAL_DECLSPEC void opal_common_ucx_mca_var_register(const mca_base_component_t *component);
106109

107110
static inline
108111
ucs_status_t opal_common_ucx_request_status(ucs_status_ptr_t request)

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -657,30 +657,7 @@ void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx)
657657
mca_spml_ucx_ctx_list_item_t) {
658658
if ((shmem_ctx_t)(&ctx_item->ctx) == ctx) {
659659
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
660-
661-
opal_common_ucx_del_proc_t *del_procs;
662-
del_procs = malloc(sizeof(*del_procs) * nprocs);
663-
664-
for (i = 0; i < nprocs; ++i) {
665-
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
666-
if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) {
667-
ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey);
668-
}
669-
}
670-
671-
del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn;
672-
del_procs[i].vpid = i;
673-
ctx_item->ctx.ucp_peers[i].ucp_conn = NULL;
674-
}
675-
676-
opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(),
677-
mca_spml_ucx.num_disconnect,
678-
ctx_item->ctx.ucp_worker);
679-
free(del_procs);
680-
free(ctx_item->ctx.ucp_peers);
681-
682-
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
683-
OBJ_RELEASE(ctx_item);
660+
opal_list_append(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super);
684661
break;
685662
}
686663
}

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ struct mca_spml_ucx {
9292
mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
9393
char **remote_addrs_tbl;
9494
opal_list_t ctx_list;
95+
opal_list_t idle_ctx_list;
9596
int priority; /* component priority */
9697
shmem_internal_mutex_t internal_mutex;
9798
};

oshmem/mca/spml/ucx/spml_ucx_component.c

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ static int spml_ucx_init(void)
176176
}
177177

178178
OBJ_CONSTRUCT(&(mca_spml_ucx.ctx_list), opal_list_t);
179+
OBJ_CONSTRUCT(&(mca_spml_ucx.idle_ctx_list), opal_list_t);
179180
SHMEM_MUTEX_INIT(mca_spml_ucx.internal_mutex);
180181

181182
wkr_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
@@ -224,42 +225,81 @@ mca_spml_ucx_component_init(int* priority,
224225
return &mca_spml_ucx.super;
225226
}
226227

228+
static void _ctx_cleanup(mca_spml_ucx_ctx_list_item_t *ctx_item)
229+
{
230+
int i, j, nprocs = oshmem_num_procs();
231+
opal_common_ucx_del_proc_t *del_procs;
232+
233+
del_procs = malloc(sizeof(*del_procs) * nprocs);
234+
235+
for (i = 0; i < nprocs; ++i) {
236+
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
237+
if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) {
238+
ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey);
239+
}
240+
}
241+
242+
del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn;
243+
del_procs[i].vpid = i;
244+
ctx_item->ctx.ucp_peers[i].ucp_conn = NULL;
245+
}
246+
247+
opal_common_ucx_del_procs_nb(del_procs, nprocs, oshmem_my_proc_id(),
248+
mca_spml_ucx.num_disconnect,
249+
ctx_item->ctx.ucp_worker);
250+
free(del_procs);
251+
free(ctx_item->ctx.ucp_peers);
252+
}
253+
227254
static int mca_spml_ucx_component_fini(void)
228255
{
229256
mca_spml_ucx_ctx_list_item_t *ctx_item, *next;
230-
size_t i, j, nprocs = oshmem_num_procs();
257+
int fenced = 0;
258+
int ret = OSHMEM_SUCCESS;
231259

232260
opal_progress_unregister(spml_ucx_progress);
233261

234262
if(!mca_spml_ucx.enabled)
235263
return OSHMEM_SUCCESS; /* never selected.. return success.. */
236264

237265
/* delete context objects from list */
238-
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
266+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list),
239267
mca_spml_ucx_ctx_list_item_t) {
240-
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
268+
_ctx_cleanup(ctx_item);
269+
}
241270

242-
opal_common_ucx_del_proc_t *del_procs;
243-
del_procs = malloc(sizeof(*del_procs) * nprocs);
271+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
272+
mca_spml_ucx_ctx_list_item_t) {
273+
_ctx_cleanup(ctx_item);
274+
}
244275

245-
for (i = 0; i < nprocs; ++i) {
246-
for (j = 0; j < MCA_MEMHEAP_SEG_COUNT; j++) {
247-
if (ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey != NULL) {
248-
ucp_rkey_destroy(ctx_item->ctx.ucp_peers[i].mkeys[j].key.rkey);
249-
}
250-
}
276+
ret = opal_common_ucx_mca_pmix_fence_nb(&fenced);
277+
if (OPAL_SUCCESS != ret) {
278+
return ret;
279+
}
251280

252-
del_procs[i].ep = ctx_item->ctx.ucp_peers[i].ucp_conn;
253-
del_procs[i].vpid = i;
254-
ctx_item->ctx.ucp_peers[i].ucp_conn = NULL;
281+
while (!fenced) {
282+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
283+
mca_spml_ucx_ctx_list_item_t) {
284+
ucp_worker_progress(ctx_item->ctx.ucp_worker);
255285
}
286+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list),
287+
mca_spml_ucx_ctx_list_item_t) {
288+
ucp_worker_progress(ctx_item->ctx.ucp_worker);
289+
}
290+
ucp_worker_progress(mca_spml_ucx_ctx_default.ucp_worker);
291+
}
256292

257-
opal_common_ucx_del_procs(del_procs, nprocs, oshmem_my_proc_id(),
258-
mca_spml_ucx.num_disconnect,
259-
ctx_item->ctx.ucp_worker);
260-
free(del_procs);
261-
free(ctx_item->ctx.ucp_peers);
262-
293+
/* delete all workers */
294+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.idle_ctx_list),
295+
mca_spml_ucx_ctx_list_item_t) {
296+
opal_list_remove_item(&(mca_spml_ucx.idle_ctx_list), &ctx_item->super);
297+
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
298+
OBJ_RELEASE(ctx_item);
299+
}
300+
OPAL_LIST_FOREACH_SAFE(ctx_item, next, &(mca_spml_ucx.ctx_list),
301+
mca_spml_ucx_ctx_list_item_t) {
302+
opal_list_remove_item(&(mca_spml_ucx.ctx_list), &ctx_item->super);
263303
ucp_worker_destroy(ctx_item->ctx.ucp_worker);
264304
OBJ_RELEASE(ctx_item);
265305
}
@@ -271,6 +311,7 @@ static int mca_spml_ucx_component_fini(void)
271311
mca_spml_ucx.enabled = false; /* not anymore */
272312

273313
OBJ_DESTRUCT(&(mca_spml_ucx.ctx_list));
314+
OBJ_DESTRUCT(&(mca_spml_ucx.idle_ctx_list));
274315
SHMEM_MUTEX_DESTROY(mca_spml_ucx.internal_mutex);
275316

276317
if (mca_spml_ucx.ucp_context) {

0 commit comments

Comments
 (0)