@@ -477,13 +477,14 @@ static void _getnbfn(int fd, short flags, void *cbdata)
477477 pmix_cb_t * cb = (pmix_cb_t * )cbdata ;
478478 pmix_cb_t * cbret ;
479479 pmix_buffer_t * msg ;
480- pmix_value_t * val ;
480+ pmix_value_t * val = NULL ;
481481 pmix_info_t * info , * iptr ;
482482 pmix_pointer_array_t results ;
483483 pmix_status_t rc ;
484484 pmix_nspace_t * ns , * nptr ;
485485 size_t n , nvals ;
486486 char * tmp ;
487+ bool my_nspace = false, my_rank = false;
487488
488489 pmix_output_verbose (2 , pmix_globals .debug_output ,
489490 "pmix: getnbfn value for proc %s:%d key %s" ,
@@ -596,65 +597,54 @@ static void _getnbfn(int fd, short flags, void *cbdata)
596597 return ;
597598 }
598599
599- /* should be in the internal hash table. */
600- #if defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 )
600+ /* check the internal storage first */
601601 rc = pmix_hash_fetch (& nptr -> internal , cb -> rank , cb -> key , & val );
602- if (PMIX_SUCCESS ! = rc ) {
603- rc = pmix_dstore_fetch ( cb -> nspace , cb -> rank , cb -> key , & val ) ;
602+ if (PMIX_SUCCESS = = rc ) {
603+ goto respond ;
604604 }
605- if (PMIX_SUCCESS == rc ) {
606- #else
607- if (PMIX_SUCCESS == (rc = pmix_hash_fetch (& nptr -> internal , cb -> rank , cb -> key , & val ))) {
605+
606+ my_nspace = (0 == strncmp (cb -> nspace , pmix_globals .myid .nspace , PMIX_MAX_NSLEN ));
607+ my_rank = (pmix_globals .myid .rank == cb -> rank );
608+
609+ /* if the key starts from "pmix", then they are looking for data
610+ * that was provided at startup */
611+ if (0 == strncmp (cb -> key , "pmix" , 4 )) {
612+ #if defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 )
613+ /* if this is a dstore - check there */
614+ rc = pmix_dstore_fetch (cb -> nspace , cb -> rank , cb -> key , & val );
608615#endif
609- /* if this is a compressed string, then uncompress it */
610- if (PMIX_COMPRESSED_STRING == val -> type ) {
611- pmix_util_uncompress_string (& tmp , (uint8_t * )val -> data .bo .bytes , val -> data .bo .size );
612- if (NULL == tmp ) {
613- PMIX_ERROR_LOG (PMIX_ERR_NOMEM );
614- rc = PMIX_ERR_NOMEM ;
615- PMIX_VALUE_RELEASE (val );
616- val = NULL ;
617- } else {
618- PMIX_VALUE_DESTRUCT (val );
619- PMIX_VAL_ASSIGN (val , string , tmp );
620- }
621- }
622- /* found it - we are in an event, so we can
623- * just execute the callback */
624- cb -> value_cbfunc (rc , val , cb -> cbdata );
625- /* cleanup */
626- if (NULL != val ) {
627- PMIX_VALUE_RELEASE (val );
616+ if ( PMIX_SUCCESS != rc && !my_nspace ){
617+ /* we are asking about the job-level info from other
618+ * namespace. It seems tha we don't have it - go and
619+ * ask server
620+ */
621+ goto request ;
628622 }
629- PMIX_RELEASE ( cb );
630- return ;
623+ /* we supposed to already have all local namespace data */
624+ goto respond ;
631625 }
632626
633- /* if the key is in the PMIx namespace, then they are looking for data
634- * that was provided at startup */
635- if (0 == strncmp (cb -> key , "pmix" , 4 )) {
636- /* if we don't have it, go request it */
637- goto request ;
627+ /* if we were asked about this rank */
628+ if ( my_nspace && my_rank ){
629+ /* if we asking the data about this rank - check local hash table.
630+ * All the data passed through PMIx_Put settle down there
631+ * if there is nothing there - it's nothing else we can do
632+ */
633+ rc = pmix_hash_fetch (& nptr -> modex , pmix_globals .myid .rank , cb -> key , & val );
634+ if ( PMIX_SUCCESS != rc ){
635+ rc = PMIX_ERR_NOT_FOUND ;
636+ goto respond ;
637+ }
638638 }
639639
640640 /* otherwise, the data must be something they "put" */
641641#if defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 )
642642 rc = PMIX_ERR_PROC_ENTRY_NOT_FOUND ;
643- if ((0 == strncmp (pmix_globals .myid .nspace , nptr -> nspace , PMIX_MAX_NSLEN + 1 )) &&
644- ((pmix_globals .myid .rank == cb -> rank ) || (PMIX_RANK_UNDEF == cb -> rank ))){
645- /* if we asking the data about this or undefined process -
646- check local hash table first. All the data passed through
647- PMIx_Put settle down there */
643+ /* if rank is undefined - check local table first */
644+ if ( my_nspace && (PMIX_RANK_UNDEF == cb -> rank )){
645+ /* if we asking about undefined process - check local hash table first
646+ * local rank may have submitted this key. */
648647 rc = pmix_hash_fetch (& nptr -> modex , pmix_globals .myid .rank , cb -> key , & val );
649- assert ( (PMIX_SUCCESS == rc ) || (PMIX_ERR_PROC_ENTRY_NOT_FOUND == rc ) ||
650- (PMIX_ERR_NOT_FOUND == rc ) );
651- if ( PMIX_SUCCESS != rc ){
652- if (pmix_globals .myid .rank == cb -> rank ){
653- rc = PMIX_ERR_NOT_FOUND ;
654- }
655- }
656- /* in else case we supposed to get PMIX_ERR_PROC_ENTRY_NOT_FOUND because
657- we don't push data from the remote processes into the dstore */
658648 }
659649 /* try to take it from dstore */
660650 if ( PMIX_ERR_PROC_ENTRY_NOT_FOUND == rc ){
@@ -670,71 +660,35 @@ static void _getnbfn(int fd, short flags, void *cbdata)
670660#endif /* PMIX_ENABLE_DSTORE */
671661
672662 if ( PMIX_SUCCESS == rc ) {
673- pmix_output_verbose (2 , pmix_globals .debug_output ,
674- "pmix_get[%d]: value retrieved from dstore" , __LINE__ );
675- /* if this is a compressed string, then uncompress it */
676- if (PMIX_COMPRESSED_STRING == val -> type ) {
677- pmix_util_uncompress_string (& tmp , (uint8_t * )val -> data .bo .bytes , val -> data .bo .size );
678- if (NULL == tmp ) {
679- PMIX_ERROR_LOG (PMIX_ERR_NOMEM );
680- rc = PMIX_ERR_NOMEM ;
681- PMIX_VALUE_RELEASE (val );
682- val = NULL ;
683- } else {
684- PMIX_VALUE_DESTRUCT (val );
685- PMIX_VAL_ASSIGN (val , string , tmp );
686- }
687- }
688- /* found it - we are in an event, so we can
689- * just execute the callback */
690- cb -> value_cbfunc (rc , val , cb -> cbdata );
691- /* cleanup */
692- if (NULL != val ) {
693- PMIX_VALUE_RELEASE (val );
694- }
695- PMIX_RELEASE (cb );
696- return ;
697- } else if (PMIX_ERR_NOT_FOUND == rc ) {
663+ goto respond ;
664+ } else if ( PMIX_ERR_PROC_ENTRY_NOT_FOUND == rc ){
665+ goto request ;
666+ }else if (PMIX_ERR_NOT_FOUND == rc ) {
698667 /* we have the modex data from this proc, but didn't find the key
699668 * the user requested. It's possible someone pushed something since
700669 * we got this data, so let's ask the server for an update. However,
701670 * we do have to protect against an infinite loop! */
702671 if (cb -> checked ) {
703- pmix_output_verbose (2 , pmix_globals .debug_output ,
704- "Error requesting key=%s for rank = %d, namespace = %s" ,
705- cb -> key , cb -> rank , cb -> nspace );
706- cb -> value_cbfunc (rc , NULL , cb -> cbdata );
707- /* protect the data */
708- cb -> procs = NULL ;
709- cb -> key = NULL ;
710- cb -> info = NULL ;
711- PMIX_RELEASE (cb );
712- return ;
672+ goto respond ;
713673 }
714674 pmix_output_verbose (2 , pmix_globals .debug_output ,
715675 "Unable to locally satisfy request for key=%s for rank = %d, namespace = %s" ,
716676 cb -> key , cb -> rank , cb -> nspace );
717677 cb -> checked = true; // flag that we are going to check this again
678+ goto request ;
718679 } else if (PMIX_ERR_PROC_ENTRY_NOT_FOUND != rc ) {
719680 /* errors are fatal */
720- cb -> value_cbfunc (rc , NULL , cb -> cbdata );
721- /* protect the data */
722- cb -> procs = NULL ;
723- cb -> key = NULL ;
724- cb -> info = NULL ;
725- PMIX_RELEASE (cb );
726- return ;
681+ goto respond ;
727682 }
728683
729- request :
684+ request :
730685 /* if we got here, then we don't have the data for this proc. If we
731686 * are a server, or we are a client and not connected, then there is
732687 * nothing more we can do */
733688 if (PMIX_PROC_SERVER == pmix_globals .proc_type ||
734689 (PMIX_PROC_SERVER != pmix_globals .proc_type && !pmix_globals .connected )) {
735- cb -> value_cbfunc (PMIX_ERR_NOT_FOUND , NULL , cb -> cbdata );
736- PMIX_RELEASE (cb );
737- return ;
690+ rc = PMIX_ERR_NOT_FOUND ;
691+ goto respond ;
738692 }
739693
740694 /* we also have to check the user's directives to see if they do not want
@@ -746,22 +700,11 @@ static void _getnbfn(int fd, short flags, void *cbdata)
746700 pmix_output_verbose (2 , pmix_globals .debug_output ,
747701 "PMIx_Get key=%s for rank = %d, namespace = %s was not found - request was optional" ,
748702 cb -> key , cb -> rank , cb -> nspace );
749- cb -> value_cbfunc (PMIX_ERR_NOT_FOUND , NULL , cb -> cbdata );
750- PMIX_RELEASE (cb );
751- return ;
703+ rc = PMIX_ERR_NOT_FOUND ;
704+ goto respond ;
752705 }
753706 }
754707
755- /* if we are seeking "pmix" data for our own nspace, then we must fail
756- * as it was provided at startup - any updates would have come via
757- * event notifications */
758- if (NULL != cb -> key && 0 == strncmp (cb -> key , "pmix" , 4 ) &&
759- 0 == strncmp (cb -> nspace , pmix_globals .myid .nspace , PMIX_MAX_NSLEN )) {
760- cb -> value_cbfunc (PMIX_ERR_NOT_FOUND , NULL , cb -> cbdata );
761- PMIX_RELEASE (cb );
762- return ;
763- }
764-
765708 /* see if we already have a request in place with the server for data from
766709 * this nspace:rank. If we do, then no need to ask again as the
767710 * request will return _all_ data from that proc */
@@ -779,9 +722,8 @@ static void _getnbfn(int fd, short flags, void *cbdata)
779722 * about packing the key as we return everything from that proc */
780723 msg = _pack_get (cb -> nspace , cb -> rank , cb -> info , cb -> ninfo , PMIX_GETNB_CMD );
781724 if (NULL == msg ) {
782- cb -> value_cbfunc (PMIX_ERROR , NULL , cb -> cbdata );
783- PMIX_RELEASE (cb );
784- return ;
725+ rc = PMIX_ERROR ;
726+ goto respond ;
785727 }
786728
787729 pmix_output_verbose (2 , pmix_globals .debug_output ,
@@ -794,8 +736,37 @@ static void _getnbfn(int fd, short flags, void *cbdata)
794736 /* send to the server */
795737 if (PMIX_SUCCESS != (rc = pmix_ptl .send_recv (& pmix_client_globals .myserver , msg , _getnb_cbfunc , (void * )cb ))){
796738 pmix_list_remove_item (& pmix_client_globals .pending_requests , & cb -> super );
797- cb -> value_cbfunc (PMIX_ERROR , NULL , cb -> cbdata );
798- PMIX_RELEASE (cb );
799- return ;
739+ rc = PMIX_ERROR ;
740+ goto respond ;
800741 }
742+
743+ return ;
744+
745+ respond :
746+
747+ /* if a callback was provided, execute it */
748+ if (NULL != cb && NULL != cb -> value_cbfunc ) {
749+ if (NULL != val ) {
750+ /* if this is a compressed string, then uncompress it */
751+ if (PMIX_COMPRESSED_STRING == val -> type ) {
752+ pmix_util_uncompress_string (& tmp , (uint8_t * )val -> data .bo .bytes , val -> data .bo .size );
753+ if (NULL == tmp ) {
754+ PMIX_ERROR_LOG (PMIX_ERR_NOMEM );
755+ rc = PMIX_ERR_NOMEM ;
756+ PMIX_VALUE_RELEASE (val );
757+ val = NULL ;
758+ } else {
759+ PMIX_VALUE_DESTRUCT (val );
760+ PMIX_VAL_ASSIGN (val , string , tmp );
761+ }
762+ }
763+ }
764+ cb -> value_cbfunc (rc , val , cb -> cbdata );
765+ }
766+ if (NULL != val ) {
767+ PMIX_VALUE_RELEASE (val );
768+ }
769+ PMIX_RELEASE (cb );
770+ return ;
771+
801772}
0 commit comments