Skip to content

Commit b225366

Browse files
author
Ralph Castain
committed
Bring the ofi/rml component online by completing the wireup protocol for the daemons. Cleanup the current confusion over how connection info gets created and
passed to make it all flow thru the opal/pmix "put/get" operations. Update the PMIx code to latest master to pickup some required behaviors. Remove the no-longer-required get_contact_info and set_contact_info from the RML layer. Add an MCA param to allow the ofi/rml component to route messages if desired. This is mainly for experimentation at this point as we aren't sure if routing wi ll be beneficial at large scales. Leave it "off" by default. Signed-off-by: Ralph Castain <[email protected]>
1 parent 855b430 commit b225366

35 files changed

+519
-691
lines changed

opal/mca/pmix/pmix2x/pmix2x_client.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ int pmix2x_store_local(const opal_process_name_t *proc, opal_value_t *val)
243243
pmix_status_t rc;
244244
pmix_proc_t p;
245245
char *nsptr;
246+
opal_pmix2x_jobid_trkr_t *job;
246247

247248
OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
248249

@@ -254,7 +255,13 @@ int pmix2x_store_local(const opal_process_name_t *proc, opal_value_t *val)
254255

255256
if (NULL != proc) {
256257
if (NULL == (nsptr = pmix2x_convert_jobid(proc->jobid))) {
257-
return OPAL_ERR_NOT_FOUND;
258+
job = OBJ_NEW(opal_pmix2x_jobid_trkr_t);
259+
(void)opal_snprintf_jobid(job->nspace, PMIX_MAX_NSLEN, proc->jobid);
260+
job->jobid = proc->jobid;
261+
OPAL_PMIX_ACQUIRE_THREAD(&opal_pmix_base.lock);
262+
opal_list_append(&mca_pmix_pmix2x_component.jobids, &job->super);
263+
OPAL_PMIX_RELEASE_THREAD(&opal_pmix_base.lock);
264+
nsptr = job->nspace;
258265
}
259266
(void)strncpy(p.nspace, nsptr, PMIX_MAX_NSLEN);
260267
p.rank = pmix2x_convert_opalrank(proc->vpid);

orte/mca/ess/base/ess_base_std_app.c

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,18 +198,34 @@ int orte_ess_base_app_setup(bool db_restrict_local)
198198
}
199199
}
200200
if (NULL != orte_process_info.my_daemon_uri) {
201+
opal_value_t val;
202+
201203
/* extract the daemon's name so we can update the routing table */
202204
if (ORTE_SUCCESS != (ret = orte_rml_base_parse_uris(orte_process_info.my_daemon_uri,
203205
ORTE_PROC_MY_DAEMON, NULL))) {
204206
ORTE_ERROR_LOG(ret);
205207
error = "orte_rml_parse_daemon";
206208
goto error;
207209
}
208-
/* Set the contact info in the RML - this won't actually establish
209-
* the connection, but just tells the RML how to reach the daemon
210+
/* Set the contact info in the database - this won't actually establish
211+
* the connection, but just tells us how to reach the daemon
210212
* if/when we attempt to send to it
211213
*/
212-
orte_rml.set_contact_info(orte_process_info.my_daemon_uri);
214+
OBJ_CONSTRUCT(&val, opal_value_t);
215+
val.key = OPAL_PMIX_PROC_URI;
216+
val.type = OPAL_STRING;
217+
val.data.string = orte_process_info.my_daemon_uri;
218+
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(ORTE_PROC_MY_DAEMON, &val))) {
219+
ORTE_ERROR_LOG(ret);
220+
val.key = NULL;
221+
val.data.string = NULL;
222+
OBJ_DESTRUCT(&val);
223+
error = "store DAEMON URI";
224+
goto error;
225+
}
226+
val.key = NULL;
227+
val.data.string = NULL;
228+
OBJ_DESTRUCT(&val);
213229
}
214230

215231
/* setup the errmgr */

orte/mca/ess/base/ess_base_std_orted.c

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,8 @@ int orte_ess_base_orted_setup(void)
419419
pmix_server_start();
420420

421421
if (NULL != orte_process_info.my_hnp_uri) {
422+
opal_value_t val;
423+
422424
/* extract the HNP's name so we can update the routing table */
423425
if (ORTE_SUCCESS != (ret = orte_rml_base_parse_uris(orte_process_info.my_hnp_uri,
424426
ORTE_PROC_MY_HNP, NULL))) {
@@ -430,7 +432,21 @@ int orte_ess_base_orted_setup(void)
430432
* the connection, but just tells the RML how to reach the HNP
431433
* if/when we attempt to send to it
432434
*/
433-
orte_rml.set_contact_info(orte_process_info.my_hnp_uri);
435+
OBJ_CONSTRUCT(&val, opal_value_t);
436+
val.key = OPAL_PMIX_PROC_URI;
437+
val.type = OPAL_STRING;
438+
val.data.string = orte_process_info.my_hnp_uri;
439+
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(ORTE_PROC_MY_HNP, &val))) {
440+
ORTE_ERROR_LOG(ret);
441+
val.key = NULL;
442+
val.data.string = NULL;
443+
OBJ_DESTRUCT(&val);
444+
error = "store HNP URI";
445+
goto error;
446+
}
447+
val.key = NULL;
448+
val.data.string = NULL;
449+
OBJ_DESTRUCT(&val);
434450
}
435451

436452
/* select the errmgr */
@@ -461,9 +477,6 @@ int orte_ess_base_orted_setup(void)
461477
}
462478
OPAL_LIST_DESTRUCT(&transports);
463479

464-
/* add our contact info to our proc object */
465-
proc->rml_uri = orte_rml.get_contact_info();
466-
467480
/*
468481
* Group communications
469482
*/
@@ -539,7 +552,7 @@ int orte_ess_base_orted_setup(void)
539552
}
540553
}
541554

542-
if (orte_static_ports) {
555+
if (orte_static_ports || orte_fwd_mpirun_port) {
543556
if (NULL == orte_node_regex) {
544557
/* we didn't get the node info */
545558
error = "cannot construct daemon map for static ports - no node map info";

orte/mca/ess/hnp/ess_hnp_module.c

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,10 @@ static int rte_init(void)
471471
proc->name.jobid = ORTE_PROC_MY_NAME->jobid;
472472
proc->name.vpid = ORTE_PROC_MY_NAME->vpid;
473473
proc->pid = orte_process_info.pid;
474-
proc->rml_uri = orte_rml.get_contact_info();
474+
orte_oob_base_get_addr(&proc->rml_uri);
475+
orte_process_info.my_hnp_uri = strdup(proc->rml_uri);
476+
/* we are also officially a daemon, so better update that field too */
477+
orte_process_info.my_daemon_uri = strdup(proc->rml_uri);
475478
proc->state = ORTE_PROC_STATE_RUNNING;
476479
OBJ_RETAIN(node); /* keep accounting straight */
477480
proc->node = node;
@@ -615,15 +618,6 @@ static int rte_init(void)
615618
goto error;
616619
}
617620

618-
/* we are an hnp, so update the contact info field for later use */
619-
orte_process_info.my_hnp_uri = orte_rml.get_contact_info();
620-
if (NULL != proc->rml_uri) {
621-
free(proc->rml_uri);
622-
}
623-
proc->rml_uri = strdup(orte_process_info.my_hnp_uri);
624-
625-
/* we are also officially a daemon, so better update that field too */
626-
orte_process_info.my_daemon_uri = strdup(orte_process_info.my_hnp_uri);
627621
/* setup the orte_show_help system to recv remote output */
628622
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SHOW_HELP,
629623
ORTE_RML_PERSISTENT, orte_show_help_recv, NULL);

orte/mca/grpcomm/direct/grpcomm_direct.c

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include "opal/dss/dss.h"
2525
#include "opal/class/opal_list.h"
26+
#include "opal/mca/pmix/pmix.h"
2627

2728
#include "orte/mca/errmgr/errmgr.h"
2829
#include "orte/mca/rml/base/base.h"
@@ -273,6 +274,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
273274
char *rtmod, *nidmap;
274275
size_t inlen, cmplen;
275276
uint8_t *packed_data, *cmpdata;
277+
int32_t nvals, i;
278+
opal_value_t *kv;
279+
orte_process_name_t dmn;
276280

277281
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
278282
"%s grpcomm:direct:xcast:recv: with %d bytes",
@@ -446,35 +450,49 @@ static void xcast_recv(int status, orte_process_name_t* sender,
446450
/* routing is now possible */
447451
orte_routed_base.routing_enabled = true;
448452

449-
/* see if we have wiring info as well */
453+
/* unpack the byte object */
450454
cnt=1;
451-
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
455+
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
452456
ORTE_ERROR_LOG(ret);
453457
goto relay;
454458
}
455-
456-
if (0 != flag) {
457-
/* unpack the byte object */
459+
if (0 < bo->size) {
460+
/* load it into a buffer */
461+
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
462+
opal_dss.load(&wireup, bo->bytes, bo->size);
463+
/* decode it, pushing the info into our database */
458464
cnt=1;
459-
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
460-
ORTE_ERROR_LOG(ret);
461-
goto relay;
462-
}
463-
if (0 < bo->size) {
464-
/* load it into a buffer */
465-
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
466-
opal_dss.load(&wireup, bo->bytes, bo->size);
467-
/* pass it for processing */
468-
if (ORTE_SUCCESS != (ret = orte_rml_base_update_contact_info(&wireup))) {
465+
while (OPAL_SUCCESS == (ret = opal_dss.unpack(&wireup, &dmn, &cnt, ORTE_NAME))) {
466+
cnt = 1;
467+
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &nvals, &cnt, OPAL_INT32))) {
469468
ORTE_ERROR_LOG(ret);
470-
OBJ_DESTRUCT(&wireup);
471-
goto relay;
469+
break;
472470
}
473-
/* done with the wireup buffer - dump it */
474-
OBJ_DESTRUCT(&wireup);
471+
for (i=0; i < nvals; i++) {
472+
cnt = 1;
473+
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&wireup, &kv, &cnt, OPAL_VALUE))) {
474+
ORTE_ERROR_LOG(ret);
475+
break;
476+
}
477+
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
478+
"%s STORING MODEX DATA FOR PROC %s KEY %s",
479+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
480+
ORTE_NAME_PRINT(&dmn), kv->key));
481+
if (OPAL_SUCCESS != (ret = opal_pmix.store_local(&dmn, kv))) {
482+
ORTE_ERROR_LOG(ret);
483+
OBJ_RELEASE(kv);
484+
break;
485+
}
486+
OBJ_RELEASE(kv);
487+
}
488+
}
489+
if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != ret) {
490+
ORTE_ERROR_LOG(ret);
475491
}
476-
free(bo);
492+
/* done with the wireup buffer - dump it */
493+
OBJ_DESTRUCT(&wireup);
477494
}
495+
free(bo);
478496
}
479497
/* copy the remainder of the payload - we don't pass wiring info
480498
* to the odls */
@@ -499,7 +517,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
499517
opal_dss.copy_payload(relay, data);
500518
}
501519

502-
relay:
520+
relay:
503521
if (!orte_do_not_launch) {
504522
/* get the list of next recipients from the routed module */
505523
orte_routed.get_routing_list(rtmod, &coll);

orte/mca/odls/base/odls_base_default_fns.c

Lines changed: 95 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
107107
orte_jobid_t job)
108108
{
109-
int rc;
109+
int rc, v;
110110
orte_job_t *jdata=NULL, *jptr;
111111
orte_job_map_t *map=NULL;
112112
opal_buffer_t *wireup, jobdata;
@@ -116,6 +116,9 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
116116
void *nptr;
117117
uint32_t key;
118118
char *nidmap;
119+
orte_proc_t *dmn;
120+
opal_value_t *val = NULL, *kv;
121+
opal_list_t *modex;
119122

120123
/* get the job data pointer */
121124
if (NULL == (jdata = orte_get_job_data_object(job))) {
@@ -156,36 +159,106 @@ int orte_odls_base_default_get_add_procs_data(opal_buffer_t *buffer,
156159
ORTE_ERROR_LOG(rc);
157160
return rc;
158161
}
159-
if (!orte_static_ports && !orte_fwd_mpirun_port) {
160-
/* pack a flag indicating wiring info is provided */
161-
flag = 1;
162-
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
163-
/* get wireup info for daemons per the selected routing module */
164-
wireup = OBJ_NEW(opal_buffer_t);
165-
if (ORTE_SUCCESS != (rc = orte_rml_base_get_contact_info(ORTE_PROC_MY_NAME->jobid, wireup))) {
162+
/* get wireup info for daemons */
163+
if (NULL == (jptr = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) {
164+
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
165+
return ORTE_ERR_BAD_PARAM;
166+
}
167+
wireup = OBJ_NEW(opal_buffer_t);
168+
/* always include data for mpirun as the daemons can't have it yet */
169+
val = NULL;
170+
if (OPAL_SUCCESS != (rc = opal_pmix.get(ORTE_PROC_MY_NAME, NULL, NULL, &val)) || NULL == val) {
171+
ORTE_ERROR_LOG(rc);
172+
OBJ_RELEASE(wireup);
173+
return rc;
174+
} else {
175+
/* the data is returned as a list of key-value pairs in the opal_value_t */
176+
if (OPAL_PTR != val->type) {
177+
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
178+
OBJ_RELEASE(wireup);
179+
return ORTE_ERR_NOT_FOUND;
180+
}
181+
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
166182
ORTE_ERROR_LOG(rc);
167183
OBJ_RELEASE(wireup);
168184
return rc;
169185
}
170-
/* put it in a byte object for xmission */
171-
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);
172-
/* pack the byte object - zero-byte objects are fine */
173-
bo.size = numbytes;
174-
boptr = &bo;
175-
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
186+
modex = (opal_list_t*)val->data.ptr;
187+
numbytes = (int32_t)opal_list_get_size(modex);
188+
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &numbytes, 1, OPAL_INT32))) {
176189
ORTE_ERROR_LOG(rc);
177190
OBJ_RELEASE(wireup);
178191
return rc;
179192
}
180-
/* release the data since it has now been copied into our buffer */
181-
if (NULL != bo.bytes) {
182-
free(bo.bytes);
193+
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
194+
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &kv, 1, OPAL_VALUE))) {
195+
ORTE_ERROR_LOG(rc);
196+
OBJ_RELEASE(wireup);
197+
return rc;
198+
}
183199
}
184-
OBJ_RELEASE(wireup);
185-
} else {
186-
/* pack a flag indicating no wireup data is provided */
187-
flag = 0;
188-
opal_dss.pack(buffer, &flag, 1, OPAL_INT8);
200+
OPAL_LIST_RELEASE(modex);
201+
OBJ_RELEASE(val);
202+
}
203+
/* if we didn't rollup the connection info, then we have
204+
* to provide a complete map of connection info */
205+
if (!orte_static_ports && !orte_fwd_mpirun_port) {
206+
for (v=1; v < jptr->procs->size; v++) {
207+
if (NULL == (dmn = (orte_proc_t*)opal_pointer_array_get_item(jptr->procs, v))) {
208+
continue;
209+
}
210+
val = NULL;
211+
if (OPAL_SUCCESS != (rc = opal_pmix.get(&dmn->name, NULL, NULL, &val)) || NULL == val) {
212+
ORTE_ERROR_LOG(rc);
213+
OBJ_RELEASE(buffer);
214+
return rc;
215+
} else {
216+
/* the data is returned as a list of key-value pairs in the opal_value_t */
217+
if (OPAL_PTR != val->type) {
218+
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
219+
OBJ_RELEASE(buffer);
220+
return ORTE_ERR_NOT_FOUND;
221+
}
222+
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &dmn->name, 1, ORTE_NAME))) {
223+
ORTE_ERROR_LOG(rc);
224+
OBJ_RELEASE(buffer);
225+
OBJ_RELEASE(wireup);
226+
return rc;
227+
}
228+
modex = (opal_list_t*)val->data.ptr;
229+
numbytes = (int32_t)opal_list_get_size(modex);
230+
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &numbytes, 1, OPAL_INT32))) {
231+
ORTE_ERROR_LOG(rc);
232+
OBJ_RELEASE(buffer);
233+
OBJ_RELEASE(wireup);
234+
return rc;
235+
}
236+
OPAL_LIST_FOREACH(kv, modex, opal_value_t) {
237+
if (ORTE_SUCCESS != (rc = opal_dss.pack(wireup, &kv, 1, OPAL_VALUE))) {
238+
ORTE_ERROR_LOG(rc);
239+
OBJ_RELEASE(buffer);
240+
OBJ_RELEASE(wireup);
241+
return rc;
242+
}
243+
}
244+
OPAL_LIST_RELEASE(modex);
245+
OBJ_RELEASE(val);
246+
}
247+
}
248+
}
249+
/* put it in a byte object for xmission */
250+
opal_dss.unload(wireup, (void**)&bo.bytes, &numbytes);
251+
OBJ_RELEASE(wireup);
252+
/* pack the byte object - zero-byte objects are fine */
253+
bo.size = numbytes;
254+
boptr = &bo;
255+
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &boptr, 1, OPAL_BYTE_OBJECT))) {
256+
ORTE_ERROR_LOG(rc);
257+
return rc;
258+
}
259+
/* release the data since it has now been copied into our buffer */
260+
if (NULL != bo.bytes) {
261+
free(bo.bytes);
189262
}
190263

191264
/* we need to ensure that any new daemons get a complete

0 commit comments

Comments
 (0)