Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 76 additions & 38 deletions src/mca/grpcomm/direct/grpcomm_direct_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* Copyright (c) 2014-2020 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2021-2025 Nanook Consulting All rights reserved.
* Copyright (c) 2021-2026 Nanook Consulting All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -801,7 +801,7 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
prte_grpcomm_direct_group_signature_t *sig = NULL;
prte_pmix_grp_caddy_t cd2, *cd;
int32_t cnt;
pmix_status_t rc = PMIX_SUCCESS, st;
pmix_status_t rc = PMIX_SUCCESS, st = PMIX_SUCCESS;
pmix_proc_t *finalmembership = NULL;
size_t nfinal = 0;
size_t nendpts = 0;
Expand All @@ -811,7 +811,7 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
pmix_info_t *grpinfo = NULL;
pmix_info_t *endpts = NULL;
prte_pmix_server_pset_t *pset;
void *ilist;
void *ilist, *nlist;
PRTE_HIDE_UNUSED_PARAMS(status, sender, tag, cbdata);

PMIX_ACQUIRE_OBJECT(cd);
Expand All @@ -837,7 +837,6 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
goto notify;
}

/* if this was a destruct operation, then there is nothing
Expand All @@ -854,22 +853,37 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
}
if (NULL != coll && NULL != coll->cbfunc) {
/* return to the local procs in the collective */
coll->cbfunc(rc, NULL, 0, coll->cbdata, NULL, NULL);
coll->cbfunc(st, NULL, 0, coll->cbdata, NULL, NULL);
}
// remove the tracker, if found
find_delete_tracker(sig);
PMIX_RELEASE(sig);
return;
}

// must be a construct operation - continue unpacking
// setup to cache info
ilist = PMIx_Info_list_start();
nlist = PMIx_Info_list_start();

// must be a construct operation - continue unpacking
if (PMIX_SUCCESS != st) {
PMIX_INFO_LIST_RELEASE(ilist);
goto notify;
}

if (sig->ctxid_assigned) {
PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_CONTEXT_ID, &sig->ctxid, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
goto notify;
}
PMIX_INFO_LIST_ADD(rc, nlist, PMIX_GROUP_CONTEXT_ID, &sig->ctxid, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
goto notify;
}
}
Expand All @@ -880,6 +894,7 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
goto notify;
}
if (0 < nfinal) {
Expand All @@ -889,6 +904,20 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
goto notify;
}
// pass back the final group membership
darray.type = PMIX_PROC;
darray.array = finalmembership;
darray.size = nfinal;
// load the array - note: this copies the array!
PMIX_INFO_LIST_ADD(rc, nlist, PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY);
PMIX_PROC_FREE(finalmembership, nfinal);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
goto notify;
}
}
Expand All @@ -899,6 +928,7 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
goto notify;
}
if (0 < ngrpinfo) {
Expand All @@ -908,13 +938,27 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
PMIX_INFO_FREE(grpinfo, ngrpinfo);
goto notify;
}
// transfer them to our list
// transfer them to both lists
for (n=0; n < ngrpinfo; n++) {
rc = PMIx_Info_list_add_value(ilist, PMIX_GROUP_INFO, &grpinfo[n].value);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
PMIX_INFO_FREE(grpinfo, ngrpinfo);
goto notify;
}
rc = PMIx_Info_list_add_value(nlist, PMIX_GROUP_INFO, &grpinfo[n].value);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
PMIX_INFO_FREE(grpinfo, ngrpinfo);
goto notify;
}
}
PMIX_INFO_FREE(grpinfo, ngrpinfo);
Expand All @@ -927,6 +971,7 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
goto notify;
}
if (0 < nendpts) {
Expand All @@ -936,13 +981,27 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
PMIX_INFO_FREE(endpts, nendpts);
goto notify;
}
// transfer them to our list
// transfer them to both lists
for (n=0; n < nendpts; n++) {
rc = PMIx_Info_list_add_value(ilist, PMIX_GROUP_ENDPT_DATA, &endpts[n].value);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
PMIX_INFO_FREE(endpts, nendpts);
goto notify;
}
rc = PMIx_Info_list_add_value(nlist, PMIX_GROUP_ENDPT_DATA, &endpts[n].value);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
st = rc;
PMIX_INFO_LIST_RELEASE(ilist);
PMIX_INFO_FREE(endpts, nendpts);
goto notify;
}
}
PMIX_INFO_FREE(endpts, nendpts);
Expand Down Expand Up @@ -987,50 +1046,29 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
if (NULL != coll && NULL != coll->cbfunc) {
// service the procs that are part of the collective

PMIX_INFO_LIST_START(ilist);
if (NULL != finalmembership) {
// pass back the final group membership
darray.type = PMIX_PROC;
darray.array = finalmembership;
darray.size = nfinal;
// load the array - note: this copies the array!
PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
}

if (sig->ctxid_assigned) {
PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_CONTEXT_ID, &sig->ctxid, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
// convert for returning to PMIx server library
cd = PMIX_NEW(prte_pmix_grp_caddy_t);
if (PMIX_SUCCESS == st) {
PMIX_INFO_LIST_CONVERT(rc, nlist, &darray);
if (PMIX_SUCCESS != rc && PMIX_ERR_EMPTY != rc) {
PMIX_ERROR_LOG(rc);
}
cd->info = (pmix_info_t*)darray.array;
cd->ninfo = darray.size;
}

// convert for returning to PMIx server library
PMIX_INFO_LIST_CONVERT(rc, ilist, &darray);
if (PMIX_SUCCESS != rc && PMIX_ERR_EMPTY != rc) {
PMIX_ERROR_LOG(rc);
}
cd = PMIX_NEW(prte_pmix_grp_caddy_t);
cd->info = (pmix_info_t*)darray.array;
cd->ninfo = darray.size;
PMIX_INFO_LIST_RELEASE(ilist);

/* return to the PMIx server library for relay to
* local procs in the operation */
coll->cbfunc(rc, cd->info, cd->ninfo, coll->cbdata, relcb, (void*)cd);
coll->cbfunc(st, cd->info, cd->ninfo, coll->cbdata, relcb, (void*)cd);
}

if (NULL != finalmembership) {
PMIX_PROC_FREE(finalmembership, nfinal);
}
if (0 < nendpts) {
PMIX_INFO_FREE(endpts, nendpts);
}
if (0 < ngrpinfo) {
PMIX_INFO_FREE(grpinfo, ngrpinfo);
}
PMIX_INFO_LIST_RELEASE(nlist);
// remove this collective from our tracker
find_delete_tracker(sig);
PMIX_RELEASE(sig);
Expand Down
Loading