@@ -88,6 +88,7 @@ static const char pmix_version_string[] = PMIX_VERSION;
8888static void _notify_complete (pmix_status_t status , void * cbdata )
8989{
9090 pmix_event_chain_t * chain = (pmix_event_chain_t * )cbdata ;
91+ PMIX_ACQUIRE_OBJECT (chain );
9192 PMIX_RELEASE (chain );
9293}
9394
@@ -178,7 +179,7 @@ static void wait_cbfunc(struct pmix_peer_t *pr,
178179
179180 pmix_output_verbose (2 , pmix_globals .debug_output ,
180181 "pmix:client wait_cbfunc received" );
181-
182+ PMIX_POST_OBJECT ( active );
182183 * active = false;
183184}
184185
@@ -197,6 +198,7 @@ static void job_data(struct pmix_peer_t *pr,
197198 if (PMIX_SUCCESS != (rc = pmix_bfrop .unpack (buf , & nspace , & cnt , PMIX_STRING ))) {
198199 PMIX_ERROR_LOG (rc );
199200 cb -> status = PMIX_ERROR ;
201+ PMIX_POST_OBJECT (cb );
200202 cb -> active = false;
201203 return ;
202204 }
@@ -208,6 +210,7 @@ static void job_data(struct pmix_peer_t *pr,
208210 pmix_job_data_htable_store (pmix_globals .myid .nspace , buf );
209211#endif
210212 cb -> status = PMIX_SUCCESS ;
213+ PMIX_POST_OBJECT (cb );
211214 cb -> active = false;
212215}
213216
@@ -235,6 +238,7 @@ static void evhandler_reg_callbk(pmix_status_t status,
235238 void * cbdata )
236239{
237240 volatile int * active = (volatile int * )cbdata ;
241+ PMIX_POST_OBJECT (active );
238242 * active = status ;
239243}
240244
@@ -680,6 +684,9 @@ static void _putfn(int sd, short args, void *cbdata)
680684 uint8_t * tmp ;
681685 size_t len ;
682686
687+ /* need to acquire the cb object from its originating thread */
688+ PMIX_ACQUIRE_OBJECT (cb );
689+
683690 /* no need to push info that starts with "pmix" as that is
684691 * info we would have been provided at startup */
685692 if (0 == strncmp (cb -> key , "pmix" , 4 )) {
@@ -757,6 +764,8 @@ static void _putfn(int sd, short args, void *cbdata)
757764 PMIX_RELEASE (kv ); // maintain accounting
758765 }
759766 cb -> pstatus = rc ;
767+ /* post the data so the receiving thread can acquire it */
768+ PMIX_POST_OBJECT (cb );
760769 cb -> active = false;
761770}
762771
@@ -802,6 +811,9 @@ static void _commitfn(int sd, short args, void *cbdata)
802811 pmix_buffer_t * msgout ;
803812 pmix_cmd_t cmd = PMIX_COMMIT_CMD ;
804813
814+ /* need to acquire the cb object from its originating thread */
815+ PMIX_ACQUIRE_OBJECT (cb );
816+
805817 msgout = PMIX_NEW (pmix_buffer_t );
806818 /* pack the cmd */
807819 if (PMIX_SUCCESS != (rc = pmix_bfrop .pack (msgout , & cmd , 1 , PMIX_CMD ))) {
@@ -850,6 +862,8 @@ static void _commitfn(int sd, short args, void *cbdata)
850862
851863 done :
852864 cb -> pstatus = rc ;
865+ /* post the data so the receiving thread can acquire it */
866+ PMIX_POST_OBJECT (cb );
853867 cb -> active = false;
854868 }
855869
@@ -901,6 +915,9 @@ static void _peersfn(int sd, short args, void *cbdata)
901915#endif
902916 size_t i ;
903917
918+ /* need to acquire the cb object from its originating thread */
919+ PMIX_ACQUIRE_OBJECT (cb );
920+
904921 /* cycle across our known nspaces */
905922 tmp = NULL ;
906923#if defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 )
@@ -955,6 +972,8 @@ static void _peersfn(int sd, short args, void *cbdata)
955972
956973 done :
957974 cb -> pstatus = rc ;
975+ /* post the data so the receiving thread can acquire it */
976+ PMIX_POST_OBJECT (cb );
958977 cb -> active = false;
959978}
960979
@@ -1004,6 +1023,9 @@ static void _nodesfn(int sd, short args, void *cbdata)
10041023 pmix_nspace_t * nsptr ;
10051024 pmix_nrec_t * nptr ;
10061025
1026+ /* need to acquire the cb object from its originating thread */
1027+ PMIX_ACQUIRE_OBJECT (cb );
1028+
10071029 /* cycle across our known nspaces */
10081030 tmp = NULL ;
10091031 PMIX_LIST_FOREACH (nsptr , & pmix_globals .nspaces , pmix_nspace_t ) {
@@ -1023,6 +1045,8 @@ static void _nodesfn(int sd, short args, void *cbdata)
10231045 }
10241046
10251047 cb -> pstatus = rc ;
1048+ /* post the data so the receiving thread can acquire it */
1049+ PMIX_POST_OBJECT (cb );
10261050 cb -> active = false;
10271051}
10281052
0 commit comments