7272#endif /* PMIX_ENABLE_DSTORE */
7373
7474#include "pmix_client_ops.h"
75+ #include "src/include/pmix_jobdata.h"
7576
7677#define PMIX_MAX_RETRIES 10
7778
@@ -191,8 +192,11 @@ static void job_data(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
191192 }
192193 assert (NULL != nspace );
193194 free (nspace );
195+
194196 /* decode it */
195- pmix_client_process_nspace_blob (pmix_globals .myid .nspace , buf );
197+ #if !(defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 ))
198+ pmix_job_data_htable_store (pmix_globals .myid .nspace , buf );
199+ #endif
196200 cb -> status = PMIX_SUCCESS ;
197201 cb -> active = false;
198202}
@@ -715,12 +719,27 @@ static void _peersfn(int sd, short args, void *cbdata)
715719 pmix_cb_t * cb = (pmix_cb_t * )cbdata ;
716720 pmix_status_t rc ;
717721 char * * nsprocs = NULL , * * nsps = NULL , * * tmp ;
722+ #if !(defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 ))
718723 pmix_nspace_t * nsptr ;
719724 pmix_nrec_t * nptr ;
725+ #endif
720726 size_t i ;
721727
722728 /* cycle across our known nspaces */
723729 tmp = NULL ;
730+ #if defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 )
731+ if (PMIX_SUCCESS == (rc = pmix_dstore_fetch (cb -> nspace , PMIX_RANK_WILDCARD ,
732+ cb -> key , & cb -> value ))) {
733+
734+ tmp = pmix_argv_split (cb -> value -> data .string , ',' );
735+ for (i = 0 ; NULL != tmp [i ]; i ++ ) {
736+ pmix_argv_append_nosize (& nsps , cb -> nspace );
737+ pmix_argv_append_nosize (& nsprocs , tmp [i ]);
738+ }
739+ pmix_argv_free (tmp );
740+ tmp = NULL ;
741+ }
742+ #else
724743 PMIX_LIST_FOREACH (nsptr , & pmix_globals .nspaces , pmix_nspace_t ) {
725744 if (0 == strncmp (nsptr -> nspace , cb -> nspace , PMIX_MAX_NSLEN )) {
726745 /* cycle across the nodes in this nspace */
@@ -738,6 +757,7 @@ static void _peersfn(int sd, short args, void *cbdata)
738757 }
739758 }
740759 }
760+ #endif
741761 if (0 == (i = pmix_argv_count (nsps ))) {
742762 /* we don't know this nspace */
743763 rc = PMIX_ERR_NOT_FOUND ;
@@ -1010,160 +1030,8 @@ static pmix_status_t send_connect_ack(int sd)
10101030 return PMIX_SUCCESS ;
10111031}
10121032
1013- void pmix_client_process_nspace_blob ( const char * nspace , pmix_buffer_t * bptr )
1033+ static pmix_status_t usock_connect ( struct sockaddr * addr , int * fd )
10141034{
1015- pmix_status_t rc ;
1016- int32_t cnt ;
1017- int rank ;
1018- pmix_kval_t * kptr , * kp2 , kv ;
1019- pmix_buffer_t buf2 ;
1020- pmix_byte_object_t * bo ;
1021- size_t nnodes , i , j ;
1022- pmix_nspace_t * nsptr , * nsptr2 ;
1023- pmix_nrec_t * nrec , * nr2 ;
1024- char * * procs ;
1025-
1026- pmix_output_verbose (2 , pmix_globals .debug_output ,
1027- "pmix: PROCESSING BLOB FOR NSPACE %s" , nspace );
1028-
1029- /* cycle across our known nspaces */
1030- nsptr = NULL ;
1031- PMIX_LIST_FOREACH (nsptr2 , & pmix_globals .nspaces , pmix_nspace_t ) {
1032- if (0 == strcmp (nsptr2 -> nspace , nspace )) {
1033- nsptr = nsptr2 ;
1034- break ;
1035- }
1036- }
1037- if (NULL == nsptr ) {
1038- /* we don't know this nspace - add it */
1039- nsptr = PMIX_NEW (pmix_nspace_t );
1040- (void )strncpy (nsptr -> nspace , nspace , PMIX_MAX_NSLEN );
1041- pmix_list_append (& pmix_globals .nspaces , & nsptr -> super );
1042- }
1043-
1044- /* unpack any info structs provided */
1045- cnt = 1 ;
1046- kptr = PMIX_NEW (pmix_kval_t );
1047- while (PMIX_SUCCESS == (rc = pmix_bfrop .unpack (bptr , kptr , & cnt , PMIX_KVAL ))) {
1048- if (0 == strcmp (kptr -> key , PMIX_PROC_BLOB )) {
1049- /* transfer the byte object for unpacking */
1050- bo = & (kptr -> value -> data .bo );
1051- PMIX_CONSTRUCT (& buf2 , pmix_buffer_t );
1052- PMIX_LOAD_BUFFER (& buf2 , bo -> bytes , bo -> size );
1053- /* start by unpacking the rank */
1054- cnt = 1 ;
1055- if (PMIX_SUCCESS != (rc = pmix_bfrop .unpack (& buf2 , & rank , & cnt , PMIX_PROC_RANK ))) {
1056- PMIX_ERROR_LOG (rc );
1057- PMIX_DESTRUCT (& buf2 );
1058- return ;
1059- }
1060- kp2 = PMIX_NEW (pmix_kval_t );
1061- kp2 -> key = strdup (PMIX_RANK );
1062- PMIX_VALUE_CREATE (kp2 -> value , 1 );
1063- kp2 -> value -> type = PMIX_PROC_RANK ;
1064- kp2 -> value -> data .rank = rank ;
1065- if (PMIX_SUCCESS != (rc = pmix_hash_store (& nsptr -> internal , rank , kp2 ))) {
1066- PMIX_ERROR_LOG (rc );
1067- }
1068- PMIX_RELEASE (kp2 ); // maintain accounting
1069- cnt = 1 ;
1070- kp2 = PMIX_NEW (pmix_kval_t );
1071- while (PMIX_SUCCESS == (rc = pmix_bfrop .unpack (& buf2 , kp2 , & cnt , PMIX_KVAL ))) {
1072- /* this is data provided by a job-level exchange, so store it
1073- * in the job-level data hash_table */
1074- if (PMIX_SUCCESS != (rc = pmix_hash_store (& nsptr -> internal , rank , kp2 ))) {
1075- PMIX_ERROR_LOG (rc );
1076- }
1077- PMIX_RELEASE (kp2 ); // maintain accounting
1078- kp2 = PMIX_NEW (pmix_kval_t );
1079- }
1080- /* cleanup */
1081- PMIX_DESTRUCT (& buf2 ); // releases the original kptr data
1082- PMIX_RELEASE (kp2 );
1083- } else if (0 == strcmp (kptr -> key , PMIX_MAP_BLOB )) {
1084- /* transfer the byte object for unpacking */
1085- bo = & (kptr -> value -> data .bo );
1086- PMIX_CONSTRUCT (& buf2 , pmix_buffer_t );
1087- PMIX_LOAD_BUFFER (& buf2 , bo -> bytes , bo -> size );
1088- /* start by unpacking the number of nodes */
1089- cnt = 1 ;
1090- if (PMIX_SUCCESS != (rc = pmix_bfrop .unpack (& buf2 , & nnodes , & cnt , PMIX_SIZE ))) {
1091- PMIX_ERROR_LOG (rc );
1092- PMIX_DESTRUCT (& buf2 );
1093- return ;
1094- }
1095- /* unpack the list of procs on each node */
1096- for (i = 0 ; i < nnodes ; i ++ ) {
1097- cnt = 1 ;
1098- PMIX_CONSTRUCT (& kv , pmix_kval_t );
1099- if (PMIX_SUCCESS != (rc = pmix_bfrop .unpack (& buf2 , & kv , & cnt , PMIX_KVAL ))) {
1100- PMIX_ERROR_LOG (rc );
1101- PMIX_DESTRUCT (& buf2 );
1102- PMIX_DESTRUCT (& kv );
1103- return ;
1104- }
1105- /* the name of the node is in the key, and the value is
1106- * a comma-delimited list of procs on that node. See if we already
1107- * have this node */
1108- nrec = NULL ;
1109- PMIX_LIST_FOREACH (nr2 , & nsptr -> nodes , pmix_nrec_t ) {
1110- if (0 == strcmp (nr2 -> name , kv .key )) {
1111- nrec = nr2 ;
1112- break ;
1113- }
1114- }
1115- if (NULL == nrec ) {
1116- /* Create a node record and store that list */
1117- nrec = PMIX_NEW (pmix_nrec_t );
1118- nrec -> name = strdup (kv .key );
1119- pmix_list_append (& nsptr -> nodes , & nrec -> super );
1120- } else {
1121- /* refresh the list */
1122- if (NULL != nrec -> procs ) {
1123- free (nrec -> procs );
1124- }
1125- }
1126- nrec -> procs = strdup (kv .value -> data .string );
1127- /* split the list of procs so we can store their
1128- * individual location data */
1129- procs = pmix_argv_split (nrec -> procs , ',' );
1130- for (j = 0 ; NULL != procs [j ]; j ++ ) {
1131- /* store the hostname for each proc - again, this is
1132- * data obtained via a job-level exchange, so store it
1133- * in the job-level data hash_table */
1134- kp2 = PMIX_NEW (pmix_kval_t );
1135- kp2 -> key = strdup (PMIX_HOSTNAME );
1136- kp2 -> value = (pmix_value_t * )malloc (sizeof (pmix_value_t ));
1137- kp2 -> value -> type = PMIX_STRING ;
1138- kp2 -> value -> data .string = strdup (nrec -> name );
1139- rank = strtol (procs [j ], NULL , 10 );
1140- if (PMIX_SUCCESS != (rc = pmix_hash_store (& nsptr -> internal , rank , kp2 ))) {
1141- PMIX_ERROR_LOG (rc );
1142- }
1143- PMIX_RELEASE (kp2 ); // maintain accounting
1144- }
1145- pmix_argv_free (procs );
1146- PMIX_DESTRUCT (& kv );
1147- }
1148- /* cleanup */
1149- PMIX_DESTRUCT (& buf2 ); // releases the original kptr data
1150- } else {
1151- /* this is job-level data, so just add it to that hash_table
1152- * with the wildcard rank */
1153- if (PMIX_SUCCESS != (rc = pmix_hash_store (& nsptr -> internal , PMIX_RANK_WILDCARD , kptr ))) {
1154- PMIX_ERROR_LOG (rc );
1155- }
1156- }
1157- PMIX_RELEASE (kptr );
1158- kptr = PMIX_NEW (pmix_kval_t );
1159- cnt = 1 ;
1160- }
1161- /* need to release the leftover kptr */
1162- PMIX_RELEASE (kptr );
1163- }
1164-
1165- static pmix_status_t usock_connect (struct sockaddr * addr , int * fd )
1166- {
11671035 int sd = -1 ;
11681036 pmix_status_t rc ;
11691037 pmix_socklen_t addrlen = 0 ;
0 commit comments