@@ -459,100 +459,82 @@ void orte_state_base_report_progress(int fd, short argc, void *cbdata)
459
459
OBJ_RELEASE (caddy );
460
460
}
461
461
462
- static void _send_notification (int status , orte_process_name_t * proc )
462
+ static void _send_notification (int status ,
463
+ orte_process_name_t * proc ,
464
+ orte_process_name_t * target )
463
465
{
464
- opal_buffer_t buf ;
466
+ opal_buffer_t * buf ;
465
467
orte_grpcomm_signature_t sig ;
466
468
int rc ;
467
469
opal_value_t kv , * kvptr ;
470
+ orte_process_name_t daemon ;
471
+
472
+ buf = OBJ_NEW (opal_buffer_t );
468
473
469
- OBJ_CONSTRUCT (& buf , opal_buffer_t );
474
+ opal_output_verbose (5 , orte_state_base_framework .framework_output ,
475
+ "%s state:base:sending notification %s proc %s target %s" ,
476
+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ),
477
+ ORTE_ERROR_NAME (status ),
478
+ ORTE_NAME_PRINT (proc ),
479
+ ORTE_NAME_PRINT (target ));
470
480
471
481
/* pack the status */
472
- if (ORTE_SUCCESS != (rc = opal_dss .pack (& buf , & status , 1 , OPAL_INT ))) {
482
+ if (ORTE_SUCCESS != (rc = opal_dss .pack (buf , & status , 1 , OPAL_INT ))) {
473
483
ORTE_ERROR_LOG (rc );
474
- OBJ_DESTRUCT ( & buf );
484
+ OBJ_RELEASE ( buf );
475
485
return ;
476
486
}
477
487
478
488
/* the source is me */
479
- if (ORTE_SUCCESS != (rc = opal_dss .pack (& buf , ORTE_PROC_MY_NAME , 1 , ORTE_NAME ))) {
489
+ if (ORTE_SUCCESS != (rc = opal_dss .pack (buf , ORTE_PROC_MY_NAME , 1 , ORTE_NAME ))) {
480
490
ORTE_ERROR_LOG (rc );
481
- OBJ_DESTRUCT ( & buf );
491
+ OBJ_RELEASE ( buf );
482
492
return ;
483
493
}
484
494
485
- /* pass along the affected proc (one opal_value_t) */
486
- rc = 1 ;
487
- if (ORTE_SUCCESS != (rc = opal_dss .pack (& buf , & rc , 1 , OPAL_INT ))) {
495
+ /* we are going to pass three opal_value_t's */
496
+ rc = 3 ;
497
+ if (ORTE_SUCCESS != (rc = opal_dss .pack (buf , & rc , 1 , OPAL_INT ))) {
488
498
ORTE_ERROR_LOG (rc );
489
- OBJ_DESTRUCT ( & buf );
499
+ OBJ_RELEASE ( buf );
490
500
return ;
491
501
}
502
+
503
+ /* pass along the affected proc(s) */
492
504
OBJ_CONSTRUCT (& kv , opal_value_t );
493
505
kv .key = strdup (OPAL_PMIX_EVENT_AFFECTED_PROC );
494
506
kv .type = OPAL_NAME ;
495
507
kv .data .name .jobid = proc -> jobid ;
496
508
kv .data .name .vpid = proc -> vpid ;
497
509
kvptr = & kv ;
498
- if (ORTE_SUCCESS != (rc = opal_dss .pack (& buf , & kvptr , 1 , OPAL_VALUE ))) {
510
+ if (ORTE_SUCCESS != (rc = opal_dss .pack (buf , & kvptr , 1 , OPAL_VALUE ))) {
499
511
ORTE_ERROR_LOG (rc );
500
512
OBJ_DESTRUCT (& kv );
501
- OBJ_DESTRUCT (& buf );
502
- return ;
503
- }
504
- OBJ_DESTRUCT (& kv );
505
-
506
-
507
- /* xcast it to everyone */
508
- OBJ_CONSTRUCT (& sig , orte_grpcomm_signature_t );
509
- sig .signature = (orte_process_name_t * )malloc (sizeof (orte_process_name_t ));
510
- sig .signature [0 ].jobid = ORTE_PROC_MY_NAME -> jobid ;
511
- sig .signature [0 ].vpid = ORTE_VPID_WILDCARD ;
512
- sig .sz = 1 ;
513
-
514
- if (ORTE_SUCCESS != (rc = orte_grpcomm .xcast (& sig , ORTE_RML_TAG_NOTIFICATION , & buf ))) {
515
- ORTE_ERROR_LOG (rc );
516
- }
517
- OBJ_DESTRUCT (& sig );
518
- OBJ_DESTRUCT (& buf );
519
- }
520
-
521
- static void _send_direct_notify (int status , orte_process_name_t * proc )
522
- {
523
- opal_buffer_t * buf ;
524
- int rc ;
525
- opal_value_t kv , * kvptr ;
526
- orte_process_name_t daemon ;
527
-
528
- buf = OBJ_NEW (opal_buffer_t );
529
-
530
- /* pack the status */
531
- if (ORTE_SUCCESS != (rc = opal_dss .pack (buf , & status , 1 , OPAL_INT ))) {
532
- ORTE_ERROR_LOG (rc );
533
513
OBJ_RELEASE (buf );
534
514
return ;
535
515
}
516
+ OBJ_DESTRUCT (& kv );
536
517
537
- /* the source is me */
538
- if (ORTE_SUCCESS != (rc = opal_dss .pack (buf , ORTE_PROC_MY_NAME , 1 , ORTE_NAME ))) {
518
+ /* pass along the proc(s) to be notified */
519
+ OBJ_CONSTRUCT (& kv , opal_value_t );
520
+ kv .key = strdup (OPAL_PMIX_EVENT_CUSTOM_RANGE );
521
+ kv .type = OPAL_NAME ;
522
+ kv .data .name .jobid = target -> jobid ;
523
+ kv .data .name .vpid = target -> vpid ;
524
+ kvptr = & kv ;
525
+ if (ORTE_SUCCESS != (rc = opal_dss .pack (buf , & kvptr , 1 , OPAL_VALUE ))) {
539
526
ORTE_ERROR_LOG (rc );
527
+ OBJ_DESTRUCT (& kv );
540
528
OBJ_RELEASE (buf );
541
529
return ;
542
530
}
531
+ OBJ_DESTRUCT (& kv );
543
532
544
- /* pass along the proc to be notified (one opal_value_t) */
545
- rc = 1 ;
546
- if (ORTE_SUCCESS != (rc = opal_dss .pack (buf , & rc , 1 , OPAL_INT ))) {
547
- ORTE_ERROR_LOG (rc );
548
- OBJ_RELEASE (buf );
549
- return ;
550
- }
533
+ /* mark this as intended for non-default event handlers */
551
534
OBJ_CONSTRUCT (& kv , opal_value_t );
552
- kv .key = strdup (OPAL_PMIX_EVENT_CUSTOM_RANGE );
553
- kv .type = OPAL_NAME ;
554
- kv .data .name .jobid = proc -> jobid ;
555
- kv .data .name .vpid = proc -> vpid ;
535
+ kv .key = strdup (OPAL_PMIX_EVENT_NON_DEFAULT );
536
+ kv .type = OPAL_BOOL ;
537
+ kv .data .flag = true;
556
538
kvptr = & kv ;
557
539
if (ORTE_SUCCESS != (rc = opal_dss .pack (buf , & kvptr , 1 , OPAL_VALUE ))) {
558
540
ORTE_ERROR_LOG (rc );
@@ -562,17 +544,37 @@ static void _send_direct_notify(int status, orte_process_name_t *proc)
562
544
}
563
545
OBJ_DESTRUCT (& kv );
564
546
547
+ /* if the targets are a wildcard, then xcast it to everyone */
548
+ if (ORTE_VPID_WILDCARD == target -> vpid ) {
549
+ OBJ_CONSTRUCT (& sig , orte_grpcomm_signature_t );
550
+ sig .signature = (orte_process_name_t * )malloc (sizeof (orte_process_name_t ));
551
+ sig .signature [0 ].jobid = ORTE_PROC_MY_NAME -> jobid ;
552
+ sig .signature [0 ].vpid = ORTE_VPID_WILDCARD ;
553
+ sig .sz = 1 ;
565
554
566
- /* get the daemon hosting the proc to be notified */
567
- daemon .jobid = ORTE_PROC_MY_NAME -> jobid ;
568
- daemon .vpid = orte_get_proc_daemon_vpid (proc );
569
- /* send the notification to that daemon */
570
- if (ORTE_SUCCESS != (rc = orte_rml .send_buffer_nb (orte_mgmt_conduit ,
571
- & daemon , buf ,
572
- ORTE_RML_TAG_NOTIFICATION ,
573
- orte_rml_send_callback , NULL ))) {
574
- ORTE_ERROR_LOG (rc );
555
+ if (ORTE_SUCCESS != (rc = orte_grpcomm .xcast (& sig , ORTE_RML_TAG_NOTIFICATION , buf ))) {
556
+ ORTE_ERROR_LOG (rc );
557
+ }
558
+ OBJ_DESTRUCT (& sig );
575
559
OBJ_RELEASE (buf );
560
+ } else {
561
+ /* get the daemon hosting the proc to be notified */
562
+ daemon .jobid = ORTE_PROC_MY_NAME -> jobid ;
563
+ daemon .vpid = orte_get_proc_daemon_vpid (target );
564
+ /* send the notification to that daemon */
565
+ opal_output_verbose (5 , orte_state_base_framework .framework_output ,
566
+ "%s state:base:sending notification %s to proc %s at daemon %s" ,
567
+ ORTE_NAME_PRINT (ORTE_PROC_MY_NAME ),
568
+ ORTE_ERROR_NAME (status ),
569
+ ORTE_NAME_PRINT (target ),
570
+ ORTE_NAME_PRINT (& daemon ));
571
+ if (ORTE_SUCCESS != (rc = orte_rml .send_buffer_nb (orte_mgmt_conduit ,
572
+ & daemon , buf ,
573
+ ORTE_RML_TAG_NOTIFICATION ,
574
+ orte_rml_send_callback , NULL ))) {
575
+ ORTE_ERROR_LOG (rc );
576
+ OBJ_RELEASE (buf );
577
+ }
576
578
}
577
579
}
578
580
@@ -585,7 +587,7 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
585
587
orte_proc_t * pdata ;
586
588
int i ;
587
589
char * rtmod ;
588
- orte_process_name_t parent , * npptr ;
590
+ orte_process_name_t parent , target , * npptr ;
589
591
590
592
opal_output_verbose (5 , orte_state_base_framework .framework_output ,
591
593
"%s state:base:track_procs called for proc %s state %s" ,
@@ -699,15 +701,21 @@ void orte_state_base_track_procs(int fd, short argc, void *cbdata)
699
701
npptr = & parent ;
700
702
if (!orte_get_attribute (& jdata -> attributes , ORTE_JOB_LAUNCH_PROXY , (void * * )& npptr , OPAL_NAME )) {
701
703
/* notify everyone who asked for it */
702
- _send_direct_notify (OPAL_ERR_JOB_TERMINATED , ORTE_NAME_WILDCARD );
704
+ target .jobid = jdata -> jobid ;
705
+ target .vpid = ORTE_VPID_WILDCARD ;
706
+ _send_notification (OPAL_ERR_JOB_TERMINATED , & target , ORTE_NAME_WILDCARD );
703
707
} else {
704
- _send_direct_notify (OPAL_ERR_JOB_TERMINATED , & parent );
708
+ target .jobid = jdata -> jobid ;
709
+ target .vpid = ORTE_VPID_WILDCARD ;
710
+ _send_notification (OPAL_ERR_JOB_TERMINATED , & target , & parent );
705
711
}
706
712
}
707
713
} else if (ORTE_PROC_STATE_TERMINATED < pdata -> state &&
708
714
!orte_job_term_ordered ) {
709
715
/* if this was an abnormal term, notify the other procs of the termination */
710
- _send_notification (OPAL_ERR_PROC_ABORTED , & pdata -> name );
716
+ parent .jobid = jdata -> jobid ;
717
+ parent .vpid = ORTE_VPID_WILDCARD ;
718
+ _send_notification (OPAL_ERR_PROC_ABORTED , & pdata -> name , & parent );
711
719
}
712
720
}
713
721
0 commit comments