@@ -260,7 +260,7 @@ static void xcast_recv(int status, orte_process_name_t* sender,
260260 opal_list_item_t * item ;
261261 orte_namelist_t * nm ;
262262 int ret , cnt ;
263- opal_buffer_t * relay , * rly ;
263+ opal_buffer_t * relay = NULL , * rly ;
264264 orte_daemon_cmd_flag_t command = ORTE_DAEMON_NULL_CMD ;
265265 opal_buffer_t wireup , datbuf , * data ;
266266 opal_byte_object_t * bo ;
@@ -284,12 +284,17 @@ static void xcast_recv(int status, orte_process_name_t* sender,
284284 rly = OBJ_NEW (opal_buffer_t );
285285 opal_dss .copy_payload (rly , buffer );
286286 OBJ_CONSTRUCT (& datbuf , opal_buffer_t );
287+ /* setup the relay list */
288+ OBJ_CONSTRUCT (& coll , opal_list_t );
287289
288290 /* unpack the flag to see if this payload is compressed */
289291 cnt = 1 ;
290292 if (ORTE_SUCCESS != (ret = opal_dss .unpack (buffer , & flag , & cnt , OPAL_INT8 ))) {
291293 ORTE_ERROR_LOG (ret );
292294 ORTE_FORCED_TERMINATE (ret );
295+ OBJ_DESTRUCT (& datbuf );
296+ OBJ_DESTRUCT (& coll );
297+ OBJ_RELEASE (rly );
293298 return ;
294299 }
295300 if (flag ) {
@@ -298,13 +303,19 @@ static void xcast_recv(int status, orte_process_name_t* sender,
298303 if (ORTE_SUCCESS != (ret = opal_dss .unpack (buffer , & inlen , & cnt , OPAL_SIZE ))) {
299304 ORTE_ERROR_LOG (ret );
300305 ORTE_FORCED_TERMINATE (ret );
306+ OBJ_DESTRUCT (& datbuf );
307+ OBJ_DESTRUCT (& coll );
308+ OBJ_RELEASE (rly );
301309 return ;
302310 }
303311 /* unpack the unpacked data size */
304312 cnt = 1 ;
305313 if (ORTE_SUCCESS != (ret = opal_dss .unpack (buffer , & cmplen , & cnt , OPAL_SIZE ))) {
306314 ORTE_ERROR_LOG (ret );
307315 ORTE_FORCED_TERMINATE (ret );
316+ OBJ_DESTRUCT (& datbuf );
317+ OBJ_DESTRUCT (& coll );
318+ OBJ_RELEASE (rly );
308319 return ;
309320 }
310321 /* allocate the space */
@@ -315,6 +326,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
315326 ORTE_ERROR_LOG (ret );
316327 free (packed_data );
317328 ORTE_FORCED_TERMINATE (ret );
329+ OBJ_DESTRUCT (& datbuf );
330+ OBJ_DESTRUCT (& coll );
331+ OBJ_RELEASE (rly );
318332 return ;
319333 }
320334 /* decompress the data */
@@ -336,6 +350,8 @@ static void xcast_recv(int status, orte_process_name_t* sender,
336350 if (ORTE_SUCCESS != (ret = opal_dss .unpack (data , & sig , & cnt , ORTE_SIGNATURE ))) {
337351 ORTE_ERROR_LOG (ret );
338352 OBJ_DESTRUCT (& datbuf );
353+ OBJ_DESTRUCT (& coll );
354+ OBJ_RELEASE (rly );
339355 ORTE_FORCED_TERMINATE (ret );
340356 return ;
341357 }
@@ -346,17 +362,12 @@ static void xcast_recv(int status, orte_process_name_t* sender,
346362 if (ORTE_SUCCESS != (ret = opal_dss .unpack (data , & tag , & cnt , ORTE_RML_TAG ))) {
347363 ORTE_ERROR_LOG (ret );
348364 OBJ_DESTRUCT (& datbuf );
365+ OBJ_DESTRUCT (& coll );
366+ OBJ_RELEASE (rly );
349367 ORTE_FORCED_TERMINATE (ret );
350368 return ;
351369 }
352370
353- /* setup a buffer we can pass to ourselves - this just contains
354- * the initial message, minus the headers inserted by xcast itself */
355- relay = OBJ_NEW (opal_buffer_t );
356- opal_dss .copy_payload (relay , data );
357- /* setup the relay list */
358- OBJ_CONSTRUCT (& coll , opal_list_t );
359-
360371 /* get our conduit's routed module name */
361372 rtmod = orte_rml .get_routed (orte_coll_conduit );
362373
@@ -372,140 +383,163 @@ static void xcast_recv(int status, orte_process_name_t* sender,
372383 if (ORTE_DAEMON_EXIT_CMD == command ||
373384 ORTE_DAEMON_HALT_VM_CMD == command ) {
374385 orte_orteds_term_ordered = true;
386+ /* copy the msg for relay to ourselves */
387+ relay = OBJ_NEW (opal_buffer_t );
388+ /* repack the command */
389+ if (OPAL_SUCCESS != (ret = opal_dss .pack (relay , & command , 1 , ORTE_DAEMON_CMD ))) {
390+ ORTE_ERROR_LOG (ret );
391+ goto relay ;
392+ }
393+ opal_dss .copy_payload (relay , data );
375394 } else if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ||
376395 ORTE_DAEMON_DVM_NIDMAP_CMD == command ) {
377- /* update our local nidmap, if required - the decode function
378- * knows what to do
379- */
380- OPAL_OUTPUT_VERBOSE ((5 , orte_grpcomm_base_framework .framework_output ,
381- "%s grpcomm:direct:xcast updating daemon nidmap" ,
382- ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
383-
384- if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap (data ))) {
396+ /* setup our internal relay buffer */
397+ relay = OBJ_NEW (opal_buffer_t );
398+ /* repack the command */
399+ if (OPAL_SUCCESS != (ret = opal_dss .pack (relay , & command , 1 , ORTE_DAEMON_CMD ))) {
385400 ORTE_ERROR_LOG (ret );
386401 goto relay ;
387402 }
388-
389- if (!ORTE_PROC_IS_HNP ) {
390- /* update the routing plan - the HNP already did
391- * it when it computed the VM, so don't waste time
392- * re-doing it here */
393- orte_routed .update_routing_plan (rtmod );
394- }
395- /* routing is now possible */
396- orte_routed_base .routing_enabled = true;
397-
398- /* see if we have wiring info as well */
399- cnt = 1 ;
400- if (ORTE_SUCCESS != (ret = opal_dss .unpack (data , & flag , & cnt , OPAL_INT8 ))) {
403+ /* see if any daemons were launched */
404+ cnt = 1 ;
405+ if (OPAL_SUCCESS != (ret = opal_dss .unpack (data , & flag , & cnt , OPAL_INT8 ))) {
401406 ORTE_ERROR_LOG (ret );
402407 goto relay ;
403408 }
404-
405- if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ) {
406- OBJ_RELEASE (relay );
407- relay = OBJ_NEW (opal_buffer_t );
408- /* repack the command */
409- if (OPAL_SUCCESS != (ret = opal_dss .pack (relay , & command , 1 , ORTE_DAEMON_CMD ))) {
409+ /* add it to our relay buffer as we will need it later */
410+ opal_dss .pack (relay , & flag , 1 , OPAL_INT8 );
411+ if (0 != flag ) {
412+ /* update our local nidmap, if required - the decode function
413+ * knows what to do
414+ */
415+ OPAL_OUTPUT_VERBOSE ((5 , orte_grpcomm_base_framework .framework_output ,
416+ "%s grpcomm:direct:xcast updating daemon nidmap" ,
417+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
418+
419+ if (ORTE_SUCCESS != (ret = orte_util_decode_daemon_nodemap (data ))) {
410420 ORTE_ERROR_LOG (ret );
411421 goto relay ;
412422 }
413- if (0 == flag ) {
414- /* copy the remainder of the payload */
415- opal_dss .copy_payload (relay , data );
416- /* no - just return */
417- goto relay ;
423+
424+ if (!ORTE_PROC_IS_HNP ) {
425+ /* update the routing plan - the HNP already did
426+ * it when it computed the VM, so don't waste time
427+ * re-doing it here */
428+ orte_routed .update_routing_plan (rtmod );
418429 }
419- }
430+ /* routing is now possible */
431+ orte_routed_base .routing_enabled = true;
420432
421- /* unpack the byte object */
422- cnt = 1 ;
423- if (ORTE_SUCCESS != (ret = opal_dss .unpack (data , & bo , & cnt , OPAL_BYTE_OBJECT ))) {
424- ORTE_ERROR_LOG (ret );
425- goto relay ;
426- }
427- if (0 < bo -> size ) {
428- /* load it into a buffer */
429- OBJ_CONSTRUCT (& wireup , opal_buffer_t );
430- opal_dss .load (& wireup , bo -> bytes , bo -> size );
431- /* pass it for processing */
432- if (ORTE_SUCCESS != (ret = orte_rml_base_update_contact_info (& wireup ))) {
433+ /* see if we have wiring info as well */
434+ cnt = 1 ;
435+ if (ORTE_SUCCESS != (ret = opal_dss .unpack (data , & flag , & cnt , OPAL_INT8 ))) {
433436 ORTE_ERROR_LOG (ret );
434- OBJ_DESTRUCT (& wireup );
435437 goto relay ;
436438 }
437- /* done with the wireup buffer - dump it */
438- OBJ_DESTRUCT (& wireup );
439+
440+ if (0 != flag ) {
441+ /* unpack the byte object */
442+ cnt = 1 ;
443+ if (ORTE_SUCCESS != (ret = opal_dss .unpack (data , & bo , & cnt , OPAL_BYTE_OBJECT ))) {
444+ ORTE_ERROR_LOG (ret );
445+ goto relay ;
446+ }
447+ if (0 < bo -> size ) {
448+ /* load it into a buffer */
449+ OBJ_CONSTRUCT (& wireup , opal_buffer_t );
450+ opal_dss .load (& wireup , bo -> bytes , bo -> size );
451+ /* pass it for processing */
452+ if (ORTE_SUCCESS != (ret = orte_rml_base_update_contact_info (& wireup ))) {
453+ ORTE_ERROR_LOG (ret );
454+ OBJ_DESTRUCT (& wireup );
455+ goto relay ;
456+ }
457+ /* done with the wireup buffer - dump it */
458+ OBJ_DESTRUCT (& wireup );
459+ }
460+ free (bo );
461+ }
439462 }
440- free (bo );
441- if (ORTE_DAEMON_ADD_LOCAL_PROCS == command ) {
442- /* copy the remainder of the payload */
443- opal_dss .copy_payload (relay , data );
463+ /* copy the remainder of the payload - we don't pass wiring info
464+ * to the odls */
465+ opal_dss .copy_payload (relay , data );
466+ } else {
467+ relay = OBJ_NEW (opal_buffer_t );
468+ /* repack the command */
469+ if (OPAL_SUCCESS != (ret = opal_dss .pack (relay , & command , 1 , ORTE_DAEMON_CMD ))) {
470+ ORTE_ERROR_LOG (ret );
471+ goto relay ;
444472 }
473+ /* copy the msg for relay to ourselves */
474+ opal_dss .copy_payload (relay , data );
445475 }
446476 } else {
447477 ORTE_ERROR_LOG (ret );
448478 goto CLEANUP ;
449479 }
480+ } else {
481+ /* copy the msg for relay to ourselves */
482+ relay = OBJ_NEW (opal_buffer_t );
483+ opal_dss .copy_payload (relay , data );
450484 }
451485
452486 relay :
453-
454- /* get the list of next recipients from the routed module */
455- orte_routed .get_routing_list (rtmod , & coll );
456-
457- /* if list is empty, no relay is required */
458- if (opal_list_is_empty (& coll )) {
459- OPAL_OUTPUT_VERBOSE ((5 , orte_grpcomm_base_framework .framework_output ,
460- "%s grpcomm:direct:send_relay - recipient list is empty!" ,
461- ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
462- OBJ_RELEASE (rly );
463- goto CLEANUP ;
464- }
465-
466- /* send the message to each recipient on list, deconstructing it as we go */
467- while (NULL != (item = opal_list_remove_first (& coll ))) {
468- nm = (orte_namelist_t * )item ;
469-
470- OPAL_OUTPUT_VERBOSE ((5 , orte_grpcomm_base_framework .framework_output ,
471- "%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s" ,
472- ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), (int )rly -> bytes_used ,
473- ORTE_NAME_PRINT (& nm -> name )));
474- OBJ_RETAIN (rly );
475- /* check the state of the recipient - no point
476- * sending to someone not alive
477- */
478- jdata = orte_get_job_data_object (nm -> name .jobid );
479- if (NULL == (rec = (orte_proc_t * )opal_pointer_array_get_item (jdata -> procs , nm -> name .vpid ))) {
480- opal_output (0 , "%s grpcomm:direct:send_relay proc %s not found - cannot relay" ,
481- ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), ORTE_NAME_PRINT (& nm -> name ));
482- OBJ_RELEASE (rly );
483- OBJ_RELEASE (item );
484- continue ;
485- }
486- if (ORTE_PROC_STATE_RUNNING < rec -> state ||
487- !ORTE_FLAG_TEST (rec , ORTE_PROC_FLAG_ALIVE )) {
488- opal_output (0 , "%s grpcomm:direct:send_relay proc %s not running - cannot relay" ,
489- ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), ORTE_NAME_PRINT (& nm -> name ));
490- OBJ_RELEASE (rly );
491- OBJ_RELEASE (item );
492- continue ;
487+ if (!orte_do_not_launch ) {
488+ /* get the list of next recipients from the routed module */
489+ orte_routed .get_routing_list (rtmod , & coll );
490+
491+ /* if list is empty, no relay is required */
492+ if (opal_list_is_empty (& coll )) {
493+ OPAL_OUTPUT_VERBOSE ((5 , orte_grpcomm_base_framework .framework_output ,
494+ "%s grpcomm:direct:send_relay - recipient list is empty!" ,
495+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME )));
496+ goto CLEANUP ;
493497 }
494- if (ORTE_SUCCESS != (ret = orte_rml .send_buffer_nb (orte_coll_conduit ,
495- & nm -> name , rly , ORTE_RML_TAG_XCAST ,
496- orte_rml_send_callback , NULL ))) {
497- ORTE_ERROR_LOG (ret );
498- OBJ_RELEASE (rly );
498+
499+ /* send the message to each recipient on list, deconstructing it as we go */
500+ while (NULL != (item = opal_list_remove_first (& coll ))) {
501+ nm = (orte_namelist_t * )item ;
502+
503+ OPAL_OUTPUT_VERBOSE ((5 , orte_grpcomm_base_framework .framework_output ,
504+ "%s grpcomm:direct:send_relay sending relay msg of %d bytes to %s" ,
505+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), (int )rly -> bytes_used ,
506+ ORTE_NAME_PRINT (& nm -> name )));
507+ OBJ_RETAIN (rly );
508+ /* check the state of the recipient - no point
509+ * sending to someone not alive
510+ */
511+ jdata = orte_get_job_data_object (nm -> name .jobid );
512+ if (NULL == (rec = (orte_proc_t * )opal_pointer_array_get_item (jdata -> procs , nm -> name .vpid ))) {
513+ opal_output (0 , "%s grpcomm:direct:send_relay proc %s not found - cannot relay" ,
514+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), ORTE_NAME_PRINT (& nm -> name ));
515+ OBJ_RELEASE (rly );
516+ OBJ_RELEASE (item );
517+ continue ;
518+ }
519+ if (ORTE_PROC_STATE_RUNNING < rec -> state ||
520+ !ORTE_FLAG_TEST (rec , ORTE_PROC_FLAG_ALIVE )) {
521+ opal_output (0 , "%s grpcomm:direct:send_relay proc %s not running - cannot relay" ,
522+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ), ORTE_NAME_PRINT (& nm -> name ));
523+ OBJ_RELEASE (rly );
524+ OBJ_RELEASE (item );
525+ continue ;
526+ }
527+ if (ORTE_SUCCESS != (ret = orte_rml .send_buffer_nb (orte_coll_conduit ,
528+ & nm -> name , rly , ORTE_RML_TAG_XCAST ,
529+ orte_rml_send_callback , NULL ))) {
530+ ORTE_ERROR_LOG (ret );
531+ OBJ_RELEASE (rly );
532+ OBJ_RELEASE (item );
533+ continue ;
534+ }
499535 OBJ_RELEASE (item );
500- continue ;
501536 }
502- OBJ_RELEASE (item );
503537 }
504- OBJ_RELEASE (rly ); // retain accounting
505538
506539 CLEANUP :
507540 /* cleanup */
508- OBJ_DESTRUCT (& coll );
541+ OPAL_LIST_DESTRUCT (& coll );
542+ OBJ_RELEASE (rly ); // retain accounting
509543
510544 /* now pass the relay buffer to myself for processing - don't
511545 * inject it into the RML system via send as that will compete
@@ -517,7 +551,9 @@ static void xcast_recv(int status, orte_process_name_t* sender,
517551 relay -> base_ptr = NULL ;
518552 relay -> bytes_used = 0 ;
519553 }
520- OBJ_RELEASE (relay );
554+ if (NULL != relay ) {
555+ OBJ_RELEASE (relay );
556+ }
521557 OBJ_DESTRUCT (& datbuf );
522558}
523559
0 commit comments