Skip to content

Commit 50ca9fb

Browse files
author
Ralph Castain
authored
Merge pull request #2893 from rhc54/topic/sim
Cleanup the ras simulator capability, and the relay route thru grpcomm
2 parents 362ac8b + 230d15f commit 50ca9fb

File tree

6 files changed

+276
-216
lines changed

6 files changed

+276
-216
lines changed

orte/mca/ess/hnp/ess_hnp_module.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -874,6 +874,9 @@ static int rte_finalize(void)
874874
if (NULL != orte_process_info.super.proc_hostname) {
875875
free(orte_process_info.super.proc_hostname);
876876
}
877+
if (orte_do_not_launch) {
878+
exit(0);
879+
}
877880
return ORTE_SUCCESS;
878881
}
879882

orte/mca/grpcomm/direct/grpcomm_direct.c

Lines changed: 146 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
260260
opal_list_item_t *item;
261261
orte_namelist_t *nm;
262262
int ret, cnt;
263-
opal_buffer_t *relay, *rly;
263+
opal_buffer_t *relay=NULL, *rly;
264264
orte_daemon_cmd_flag_t command = ORTE_DAEMON_NULL_CMD;
265265
opal_buffer_t wireup, datbuf, *data;
266266
opal_byte_object_t *bo;
@@ -284,12 +284,17 @@ static void xcast_recv(int status, orte_process_name_t* sender,
284284
rly = OBJ_NEW(opal_buffer_t);
285285
opal_dss.copy_payload(rly, buffer);
286286
OBJ_CONSTRUCT(&datbuf, opal_buffer_t);
287+
/* setup the relay list */
288+
OBJ_CONSTRUCT(&coll, opal_list_t);
287289

288290
/* unpack the flag to see if this payload is compressed */
289291
cnt=1;
290292
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &flag, &cnt, OPAL_INT8))) {
291293
ORTE_ERROR_LOG(ret);
292294
ORTE_FORCED_TERMINATE(ret);
295+
OBJ_DESTRUCT(&datbuf);
296+
OBJ_DESTRUCT(&coll);
297+
OBJ_RELEASE(rly);
293298
return;
294299
}
295300
if (flag) {
@@ -298,13 +303,19 @@ static void xcast_recv(int status, orte_process_name_t* sender,
298303
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &inlen, &cnt, OPAL_SIZE))) {
299304
ORTE_ERROR_LOG(ret);
300305
ORTE_FORCED_TERMINATE(ret);
306+
OBJ_DESTRUCT(&datbuf);
307+
OBJ_DESTRUCT(&coll);
308+
OBJ_RELEASE(rly);
301309
return;
302310
}
303311
/* unpack the unpacked data size */
304312
cnt=1;
305313
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &cmplen, &cnt, OPAL_SIZE))) {
306314
ORTE_ERROR_LOG(ret);
307315
ORTE_FORCED_TERMINATE(ret);
316+
OBJ_DESTRUCT(&datbuf);
317+
OBJ_DESTRUCT(&coll);
318+
OBJ_RELEASE(rly);
308319
return;
309320
}
310321
/* allocate the space */
@@ -315,6 +326,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
315326
ORTE_ERROR_LOG(ret);
316327
free(packed_data);
317328
ORTE_FORCED_TERMINATE(ret);
329+
OBJ_DESTRUCT(&datbuf);
330+
OBJ_DESTRUCT(&coll);
331+
OBJ_RELEASE(rly);
318332
return;
319333
}
320334
/* decompress the data */
@@ -336,6 +350,8 @@ static void xcast_recv(int status, orte_process_name_t* sender,
336350
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &sig, &cnt, ORTE_SIGNATURE))) {
337351
ORTE_ERROR_LOG(ret);
338352
OBJ_DESTRUCT(&datbuf);
353+
OBJ_DESTRUCT(&coll);
354+
OBJ_RELEASE(rly);
339355
ORTE_FORCED_TERMINATE(ret);
340356
return;
341357
}
@@ -346,17 +362,12 @@ static void xcast_recv(int status, orte_process_name_t* sender,
346362
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &tag, &cnt, ORTE_RML_TAG))) {
347363
ORTE_ERROR_LOG(ret);
348364
OBJ_DESTRUCT(&datbuf);
365+
OBJ_DESTRUCT(&coll);
366+
OBJ_RELEASE(rly);
349367
ORTE_FORCED_TERMINATE(ret);
350368
return;
351369
}
352370

353-
/* setup a buffer we can pass to ourselves - this just contains
354-
* the initial message, minus the headers inserted by xcast itself */
355-
relay = OBJ_NEW(opal_buffer_t);
356-
opal_dss.copy_payload(relay, data);
357-
/* setup the relay list */
358-
OBJ_CONSTRUCT(&coll, opal_list_t);
359-
360371
/* get our conduit's routed module name */
361372
rtmod = orte_rml.get_routed(orte_coll_conduit);
362373

@@ -372,140 +383,163 @@ static void xcast_recv(int status, orte_process_name_t* sender,
372383
if (ORTE_DAEMON_EXIT_CMD == command ||
373384
ORTE_DAEMON_HALT_VM_CMD == command) {
374385
orte_orteds_term_ordered = true;
386+
/* copy the msg for relay to ourselves */
387+
relay = OBJ_NEW(opal_buffer_t);
388+
/* repack the command */
389+
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
390+
ORTE_ERROR_LOG(ret);
391+
goto relay;
392+
}
393+
opal_dss.copy_payload(relay, data);
375394
} else if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ||
376395
ORTE_DAEMON_DVM_NIDMAP_CMD == command) {
377-
/* update our local nidmap, if required - the decode function
378-
* knows what to do
379-
*/
380-
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
381-
"%s grpcomm:direct:xcast updating daemon nidmap",
382-
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
383-
384-
if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap(data))) {
396+
/* setup our internal relay buffer */
397+
relay = OBJ_NEW(opal_buffer_t);
398+
/* repack the command */
399+
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
385400
ORTE_ERROR_LOG(ret);
386401
goto relay;
387402
}
388-
389-
if (!ORTE_PROC_IS_HNP) {
390-
/* update the routing plan - the HNP already did
391-
* it when it computed the VM, so don't waste time
392-
* re-doing it here */
393-
orte_routed.update_routing_plan(rtmod);
394-
}
395-
/* routing is now possible */
396-
orte_routed_base.routing_enabled = true;
397-
398-
/* see if we have wiring info as well */
399-
cnt=1;
400-
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
403+
/* see if any daemons were launched */
404+
cnt = 1;
405+
if (OPAL_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
401406
ORTE_ERROR_LOG(ret);
402407
goto relay;
403408
}
404-
405-
if (ORTE_DAEMON_ADD_LOCAL_PROCS == command) {
406-
OBJ_RELEASE(relay);
407-
relay = OBJ_NEW(opal_buffer_t);
408-
/* repack the command */
409-
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
409+
/* add it to our relay buffer as we will need it later */
410+
opal_dss.pack(relay, &flag, 1, OPAL_INT8);
411+
if (0 != flag) {
412+
/* update our local nidmap, if required - the decode function
413+
* knows what to do
414+
*/
415+
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
416+
"%s grpcomm:direct:xcast updating daemon nidmap",
417+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
418+
419+
if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap(data))) {
410420
ORTE_ERROR_LOG(ret);
411421
goto relay;
412422
}
413-
if (0 == flag) {
414-
/* copy the remainder of the payload */
415-
opal_dss.copy_payload(relay, data);
416-
/* no - just return */
417-
goto relay;
423+
424+
if (!ORTE_PROC_IS_HNP) {
425+
/* update the routing plan - the HNP already did
426+
* it when it computed the VM, so don't waste time
427+
* re-doing it here */
428+
orte_routed.update_routing_plan(rtmod);
418429
}
419-
}
430+
/* routing is now possible */
431+
orte_routed_base.routing_enabled = true;
420432

421-
/* unpack the byte object */
422-
cnt=1;
423-
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
424-
ORTE_ERROR_LOG(ret);
425-
goto relay;
426-
}
427-
if (0 < bo->size) {
428-
/* load it into a buffer */
429-
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
430-
opal_dss.load(&wireup, bo->bytes, bo->size);
431-
/* pass it for processing */
432-
if (ORTE_SUCCESS != (ret = orte_rml_base_update_contact_info(&wireup))) {
433+
/* see if we have wiring info as well */
434+
cnt=1;
435+
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &flag, &cnt, OPAL_INT8))) {
433436
ORTE_ERROR_LOG(ret);
434-
OBJ_DESTRUCT(&wireup);
435437
goto relay;
436438
}
437-
/* done with the wireup buffer - dump it */
438-
OBJ_DESTRUCT(&wireup);
439+
440+
if (0 != flag) {
441+
/* unpack the byte object */
442+
cnt=1;
443+
if (ORTE_SUCCESS != (ret = opal_dss.unpack(data, &bo, &cnt, OPAL_BYTE_OBJECT))) {
444+
ORTE_ERROR_LOG(ret);
445+
goto relay;
446+
}
447+
if (0 < bo->size) {
448+
/* load it into a buffer */
449+
OBJ_CONSTRUCT(&wireup, opal_buffer_t);
450+
opal_dss.load(&wireup, bo->bytes, bo->size);
451+
/* pass it for processing */
452+
if (ORTE_SUCCESS != (ret = orte_rml_base_update_contact_info(&wireup))) {
453+
ORTE_ERROR_LOG(ret);
454+
OBJ_DESTRUCT(&wireup);
455+
goto relay;
456+
}
457+
/* done with the wireup buffer - dump it */
458+
OBJ_DESTRUCT(&wireup);
459+
}
460+
free(bo);
461+
}
439462
}
440-
free(bo);
441-
if (ORTE_DAEMON_ADD_LOCAL_PROCS == command) {
442-
/* copy the remainder of the payload */
443-
opal_dss.copy_payload(relay, data);
463+
/* copy the remainder of the payload - we don't pass wiring info
464+
* to the odls */
465+
opal_dss.copy_payload(relay, data);
466+
} else {
467+
relay = OBJ_NEW(opal_buffer_t);
468+
/* repack the command */
469+
if (OPAL_SUCCESS != (ret = opal_dss.pack(relay, &command, 1, ORTE_DAEMON_CMD))) {
470+
ORTE_ERROR_LOG(ret);
471+
goto relay;
444472
}
473+
/* copy the msg for relay to ourselves */
474+
opal_dss.copy_payload(relay, data);
445475
}
446476
} else {
447477
ORTE_ERROR_LOG(ret);
448478
goto CLEANUP;
449479
}
480+
} else {
481+
/* copy the msg for relay to ourselves */
482+
relay = OBJ_NEW(opal_buffer_t);
483+
opal_dss.copy_payload(relay, data);
450484
}
451485

452486
relay:
453-
454-
/* get the list of next recipients from the routed module */
455-
orte_routed.get_routing_list(rtmod, &coll);
456-
457-
/* if list is empty, no relay is required */
458-
if (opal_list_is_empty(&coll)) {
459-
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
460-
"%s grpcomm:direct:send_relay - recipient list is empty!",
461-
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
462-
OBJ_RELEASE(rly);
463-
goto CLEANUP;
464-
}
465-
466-
/* send the message to each recipient on list, deconstructing it as we go */
467-
while (NULL != (item = opal_list_remove_first(&coll))) {
468-
nm = (orte_namelist_t*)item;
469-
470-
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
471-
"%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s",
472-
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)rly->bytes_used,
473-
ORTE_NAME_PRINT(&nm->name)));
474-
OBJ_RETAIN(rly);
475-
/* check the state of the recipient - no point
476-
* sending to someone not alive
477-
*/
478-
jdata = orte_get_job_data_object(nm->name.jobid);
479-
if (NULL == (rec = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, nm->name.vpid))) {
480-
opal_output(0, "%s grpcomm:direct:send_relay proc %s not found - cannot relay",
481-
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
482-
OBJ_RELEASE(rly);
483-
OBJ_RELEASE(item);
484-
continue;
485-
}
486-
if (ORTE_PROC_STATE_RUNNING < rec->state ||
487-
!ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE)) {
488-
opal_output(0, "%s grpcomm:direct:send_relay proc %s not running - cannot relay",
489-
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
490-
OBJ_RELEASE(rly);
491-
OBJ_RELEASE(item);
492-
continue;
487+
if (!orte_do_not_launch) {
488+
/* get the list of next recipients from the routed module */
489+
orte_routed.get_routing_list(rtmod, &coll);
490+
491+
/* if list is empty, no relay is required */
492+
if (opal_list_is_empty(&coll)) {
493+
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
494+
"%s grpcomm:direct:send_relay - recipient list is empty!",
495+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
496+
goto CLEANUP;
493497
}
494-
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
495-
&nm->name, rly, ORTE_RML_TAG_XCAST,
496-
orte_rml_send_callback, NULL))) {
497-
ORTE_ERROR_LOG(ret);
498-
OBJ_RELEASE(rly);
498+
499+
/* send the message to each recipient on list, deconstructing it as we go */
500+
while (NULL != (item = opal_list_remove_first(&coll))) {
501+
nm = (orte_namelist_t*)item;
502+
503+
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
504+
"%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s",
505+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)rly->bytes_used,
506+
ORTE_NAME_PRINT(&nm->name)));
507+
OBJ_RETAIN(rly);
508+
/* check the state of the recipient - no point
509+
* sending to someone not alive
510+
*/
511+
jdata = orte_get_job_data_object(nm->name.jobid);
512+
if (NULL == (rec = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, nm->name.vpid))) {
513+
opal_output(0, "%s grpcomm:direct:send_relay proc %s not found - cannot relay",
514+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
515+
OBJ_RELEASE(rly);
516+
OBJ_RELEASE(item);
517+
continue;
518+
}
519+
if (ORTE_PROC_STATE_RUNNING < rec->state ||
520+
!ORTE_FLAG_TEST(rec, ORTE_PROC_FLAG_ALIVE)) {
521+
opal_output(0, "%s grpcomm:direct:send_relay proc %s not running - cannot relay",
522+
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&nm->name));
523+
OBJ_RELEASE(rly);
524+
OBJ_RELEASE(item);
525+
continue;
526+
}
527+
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(orte_coll_conduit,
528+
&nm->name, rly, ORTE_RML_TAG_XCAST,
529+
orte_rml_send_callback, NULL))) {
530+
ORTE_ERROR_LOG(ret);
531+
OBJ_RELEASE(rly);
532+
OBJ_RELEASE(item);
533+
continue;
534+
}
499535
OBJ_RELEASE(item);
500-
continue;
501536
}
502-
OBJ_RELEASE(item);
503537
}
504-
OBJ_RELEASE(rly); // retain accounting
505538

506539
CLEANUP:
507540
/* cleanup */
508-
OBJ_DESTRUCT(&coll);
541+
OPAL_LIST_DESTRUCT(&coll);
542+
OBJ_RELEASE(rly); // retain accounting
509543

510544
/* now pass the relay buffer to myself for processing - don't
511545
* inject it into the RML system via send as that will compete
@@ -517,7 +551,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
517551
relay->base_ptr = NULL;
518552
relay->bytes_used = 0;
519553
}
520-
OBJ_RELEASE(relay);
554+
if (NULL != relay) {
555+
OBJ_RELEASE(relay);
556+
}
521557
OBJ_DESTRUCT(&datbuf);
522558
}
523559

0 commit comments

Comments
 (0)