Skip to content

Commit b5d4649

Browse files
committed
Fix cross-mpirun connect/accept operations
Ensure we publish all the info required to be returned to the other mpirun when executing this operation. We need to know the daemon (and its URI) that is hosting each of the other procs so we can do a direct modex operation and retrieve their connection info. Signed-off-by: Ralph Castain <[email protected]> (cherry picked from commit 60961ce)
1 parent c0ee7ad commit b5d4649

File tree

4 files changed

+151
-10
lines changed

4 files changed

+151
-10
lines changed

orte/mca/state/base/state_base_fns.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -953,8 +953,9 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata)
953953
one_still_alive = false;
954954
j = opal_hash_table_get_first_key_uint32(orte_job_data, &u32, (void **)&job, &nptr);
955955
while (OPAL_SUCCESS == j) {
956-
/* skip the daemon job */
957-
if (job->jobid == ORTE_PROC_MY_NAME->jobid) {
956+
/* skip the daemon job and all jobs from other families */
957+
if (job->jobid == ORTE_PROC_MY_NAME->jobid ||
958+
ORTE_JOB_FAMILY(job->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
958959
goto next;
959960
}
960961
/* if this is the job we are checking AND it normally terminated,

orte/orted/pmix/pmix_server_dyn.c

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
#include "orte/mca/errmgr/errmgr.h"
4444
#include "orte/mca/rmaps/base/base.h"
45+
#include "orte/mca/rml/base/rml_contact.h"
4546
#include "orte/mca/state/state.h"
4647
#include "orte/util/name_fns.h"
4748
#include "orte/util/show_help.h"
@@ -537,7 +538,14 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
537538
int rc, cnt;
538539
opal_pmix_pdata_t *pdat;
539540
orte_job_t *jdata;
540-
opal_buffer_t buf;
541+
orte_node_t *node;
542+
orte_proc_t *proc;
543+
opal_buffer_t buf, bucket;
544+
opal_byte_object_t *bo;
545+
orte_process_name_t dmn, pname;
546+
char *uri;
547+
opal_value_t val;
548+
opal_list_t nodes;
541549

542550
ORTE_ACQUIRE_OBJECT(cd);
543551

@@ -554,6 +562,7 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
554562
pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
555563
if (OPAL_BYTE_OBJECT != pdat->value.type) {
556564
rc = ORTE_ERR_BAD_PARAM;
565+
ORTE_ERROR_LOG(rc);
557566
goto release;
558567
}
559568
/* the data will consist of a packed buffer with the job data in it */
@@ -563,15 +572,107 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
563572
pdat->value.data.bo.size = 0;
564573
cnt = 1;
565574
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
575+
ORTE_ERROR_LOG(rc);
576+
OBJ_DESTRUCT(&buf);
577+
goto release;
578+
}
579+
580+
/* unpack the byte object containing the daemon uri's */
581+
cnt=1;
582+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
583+
ORTE_ERROR_LOG(rc);
566584
OBJ_DESTRUCT(&buf);
567585
goto release;
568586
}
587+
/* load it into a buffer */
588+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
589+
opal_dss.load(&bucket, bo->bytes, bo->size);
590+
bo->bytes = NULL;
591+
free(bo);
592+
/* prep a list to save the nodes */
593+
OBJ_CONSTRUCT(&nodes, opal_list_t);
594+
/* unpack and store the URI's */
595+
cnt = 1;
596+
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &uri, &cnt, OPAL_STRING))) {
597+
rc = orte_rml_base_parse_uris(uri, &dmn, NULL);
598+
if (ORTE_SUCCESS != rc) {
599+
OBJ_DESTRUCT(&buf);
600+
OBJ_DESTRUCT(&bucket);
601+
goto release;
602+
}
603+
/* save a node object for this daemon */
604+
node = OBJ_NEW(orte_node_t);
605+
node->daemon = OBJ_NEW(orte_proc_t);
606+
memcpy(&node->daemon->name, &dmn, sizeof(orte_process_name_t));
607+
opal_list_append(&nodes, &node->super);
608+
/* register the URI */
609+
OBJ_CONSTRUCT(&val, opal_value_t);
610+
val.key = OPAL_PMIX_PROC_URI;
611+
val.type = OPAL_STRING;
612+
val.data.string = uri;
613+
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&dmn, &val))) {
614+
ORTE_ERROR_LOG(rc);
615+
val.key = NULL;
616+
val.data.string = NULL;
617+
OBJ_DESTRUCT(&val);
618+
OBJ_DESTRUCT(&buf);
619+
OBJ_DESTRUCT(&bucket);
620+
goto release;
621+
}
622+
val.key = NULL;
623+
val.data.string = NULL;
624+
OBJ_DESTRUCT(&val);
625+
cnt = 1;
626+
}
627+
OBJ_DESTRUCT(&bucket);
628+
629+
/* unpack the proc-to-daemon map */
630+
cnt=1;
631+
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
632+
ORTE_ERROR_LOG(rc);
633+
OBJ_DESTRUCT(&buf);
634+
goto release;
635+
}
636+
/* load it into a buffer */
637+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
638+
opal_dss.load(&bucket, bo->bytes, bo->size);
639+
bo->bytes = NULL;
640+
free(bo);
641+
/* unpack and store the map */
642+
cnt = 1;
643+
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &pname, &cnt, ORTE_NAME))) {
644+
/* get the name of the daemon hosting it */
645+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &dmn, &cnt, ORTE_NAME))) {
646+
OBJ_DESTRUCT(&buf);
647+
OBJ_DESTRUCT(&bucket);
648+
goto release;
649+
}
650+
/* create the proc object */
651+
proc = OBJ_NEW(orte_proc_t);
652+
memcpy(&proc->name, &pname, sizeof(orte_process_name_t));
653+
opal_pointer_array_set_item(jdata->procs, pname.vpid, proc);
654+
/* find the daemon */
655+
OPAL_LIST_FOREACH(node, &nodes, orte_node_t) {
656+
if (node->daemon->name.vpid == dmn.vpid) {
657+
OBJ_RETAIN(node);
658+
proc->node = node;
659+
break;
660+
}
661+
}
662+
}
663+
OBJ_DESTRUCT(&bucket);
664+
OPAL_LIST_DESTRUCT(&nodes);
569665
OBJ_DESTRUCT(&buf);
666+
667+
/* register the nspace */
570668
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
669+
ORTE_ERROR_LOG(rc);
571670
OBJ_RELEASE(jdata);
572671
goto release;
573672
}
574-
OBJ_RELEASE(jdata); // no reason to keep this around
673+
674+
/* save the job object so we don't endlessly cycle */
675+
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);
575676

576677
/* restart the cnct processor */
577678
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
@@ -617,6 +718,7 @@ static void _cnct(int sd, short args, void *cbdata)
617718
* out about it, and all we can do is return an error */
618719
if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
619720
orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
721+
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
620722
rc = ORTE_ERR_NOT_SUPPORTED;
621723
goto release;
622724
}
@@ -632,6 +734,7 @@ static void _cnct(int sd, short args, void *cbdata)
632734
kv->data.uint32 = geteuid();
633735
opal_list_append(cd->info, &kv->super);
634736
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
737+
ORTE_ERROR_LOG(rc);
635738
opal_argv_free(keys);
636739
goto release;
637740
}
@@ -645,6 +748,7 @@ static void _cnct(int sd, short args, void *cbdata)
645748
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
646749
/* it hasn't been registered yet, so register it now */
647750
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
751+
ORTE_ERROR_LOG(rc);
648752
goto release;
649753
}
650754
}

orte/orted/pmix/pmix_server_fence.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ static void dmodex_req(int sd, short args, void *cbdata)
227227
rc = ORTE_ERR_NOT_FOUND;
228228
goto callback;
229229
}
230+
230231
/* point the request to the daemon that is hosting the
231232
* target process */
232233
req->proxy.vpid = dmn->name.vpid;
@@ -240,7 +241,8 @@ static void dmodex_req(int sd, short args, void *cbdata)
240241

241242
/* if we are the host daemon, then this is a local request, so
242243
* just wait for the data to come in */
243-
if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
244+
if (ORTE_PROC_MY_NAME->jobid == dmn->name.jobid &&
245+
ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
244246
return;
245247
}
246248

orte/orted/pmix/pmix_server_register_fns.c

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* All rights reserved.
1414
* Copyright (c) 2009-2018 Cisco Systems, Inc. All rights reserved
1515
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
16-
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
16+
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
1717
* Copyright (c) 2014 Mellanox Technologies, Inc.
1818
* All rights reserved.
1919
* Copyright (c) 2014-2016 Research Organization for Information Science
@@ -71,6 +71,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
7171
gid_t gid;
7272
opal_list_t *cache;
7373
hwloc_obj_t machine;
74+
opal_buffer_t buf, bucket;
75+
opal_byte_object_t bo, *boptr;
76+
orte_proc_t *proc;
7477

7578
opal_output_verbose(2, orte_pmix_server_globals.output,
7679
"%s register nspace for %s",
@@ -472,21 +475,52 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
472475
jdata->num_local_procs,
473476
info, NULL, NULL);
474477
OPAL_LIST_RELEASE(info);
478+
if (OPAL_SUCCESS != rc) {
479+
return rc;
480+
}
475481

476-
/* if the user has connected us to an external server, then we must
477-
* assume there is going to be some cross-mpirun exchange, and so
482+
/* if I am the HNP and this job is a member of my family, then we must
483+
* assume there could be some cross-mpirun exchange, and so
478484
* we protect against that situation by publishing the job info
479485
* for this job - this allows any subsequent "connect" to retrieve
480486
* the job info */
481-
if (NULL != orte_data_server_uri) {
482-
opal_buffer_t buf;
483487

488+
if (ORTE_PROC_IS_HNP && ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(jdata->jobid)) {
489+
/* pack the job - note that this doesn't include the procs
490+
* or their locations */
484491
OBJ_CONSTRUCT(&buf, opal_buffer_t);
485492
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &jdata, 1, ORTE_JOB))) {
486493
ORTE_ERROR_LOG(rc);
487494
OBJ_DESTRUCT(&buf);
488495
return rc;
489496
}
497+
498+
/* pack the hostname, daemon vpid and contact URI for each involved node */
499+
map = jdata->map;
500+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
501+
for (i=0; i < map->nodes->size; i++) {
502+
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
503+
continue;
504+
}
505+
opal_dss.pack(&bucket, &node->daemon->rml_uri, 1, OPAL_STRING);
506+
}
507+
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
508+
boptr = &bo;
509+
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
510+
511+
/* pack the proc name and daemon vpid for each proc */
512+
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
513+
for (i=0; i < jdata->procs->size; i++) {
514+
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
515+
continue;
516+
}
517+
opal_dss.pack(&bucket, &proc->name, 1, ORTE_NAME);
518+
opal_dss.pack(&bucket, &proc->node->daemon->name, 1, ORTE_NAME);
519+
}
520+
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
521+
boptr = &bo;
522+
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);
523+
490524
info = OBJ_NEW(opal_list_t);
491525
/* create a key-value with the key being the string jobid
492526
* and the value being the byte object */

0 commit comments

Comments
 (0)