@@ -181,6 +181,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t
181181 context -> newcomm = newcomm ;
182182 context -> comm = comm ;
183183 context -> bridgecomm = bridgecomm ;
184+ context -> pml_tag = 0 ;
184185
185186 /* Determine which implementation of allreduce we have to use
186187 * for the current mode. */
@@ -245,8 +246,8 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
245246static int ompi_comm_checkcid (ompi_comm_request_t * request );
246247/* verify that the cid was available globally */
247248static int ompi_comm_nextcid_check_flag (ompi_comm_request_t * request );
248- /* lock the cid generator */
249- static int ompi_comm_cid_lock ( ompi_comm_request_t * request ) ;
249+
250+ static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX ;
250251
251252int ompi_comm_nextcid_nb (ompi_communicator_t * newcomm , ompi_communicator_t * comm ,
252253 ompi_communicator_t * bridgecomm , const void * arg0 , const void * arg1 ,
@@ -271,7 +272,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com
271272
272273 request -> context = & context -> super ;
273274
274- ompi_comm_request_schedule_append (request , ompi_comm_cid_lock , NULL , 0 );
275+ ompi_comm_request_schedule_append (request , ompi_comm_allreduce_getnextcid , NULL , 0 );
275276 ompi_comm_request_start (request );
276277
277278 * req = & request -> super ;
@@ -299,30 +300,33 @@ int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
299300 return rc ;
300301}
301302
302- static int ompi_comm_cid_lock (ompi_comm_request_t * request )
303- {
304- if (!OPAL_THREAD_TRYLOCK (& ompi_cid_lock )) {
305- return ompi_comm_request_schedule_append (request , ompi_comm_allreduce_getnextcid , NULL , 0 );
306- }
307-
308- return ompi_comm_request_schedule_append (request , ompi_comm_cid_lock , NULL , 0 );
309- }
310-
311303static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t * request )
312304{
313305 ompi_comm_cid_context_t * context = (ompi_comm_cid_context_t * ) request -> context ;
306+ int64_t my_id = ((int64_t ) ompi_comm_get_cid (context -> comm ) << 32 | context -> pml_tag );
314307 ompi_request_t * subreq ;
315308 bool flag ;
316309 int ret ;
317310
311+ if (OPAL_THREAD_TRYLOCK (& ompi_cid_lock )) {
312+ return ompi_comm_request_schedule_append (request , ompi_comm_allreduce_getnextcid , NULL , 0 );
313+ }
314+
315+ if (ompi_comm_cid_lowest_id < my_id ) {
316+ OPAL_THREAD_UNLOCK (& ompi_cid_lock );
317+ return ompi_comm_request_schedule_append (request , ompi_comm_allreduce_getnextcid , NULL , 0 );
318+ }
319+
320+ ompi_comm_cid_lowest_id = my_id ;
321+
318322 /**
319323 * This is the real algorithm described in the doc
320324 */
321325 flag = false;
322326 context -> nextlocal_cid = mca_pml .pml_max_contextid ;
323327 for (unsigned int i = context -> start ; i < mca_pml .pml_max_contextid ; ++ i ) {
324- flag = opal_pointer_array_test_and_set_item (& ompi_mpi_communicators ,
325- i , context -> comm );
328+ flag = opal_pointer_array_test_and_set_item (& ompi_mpi_communicators , i ,
329+ context -> comm );
326330 if (true == flag ) {
327331 context -> nextlocal_cid = i ;
328332 break ;
@@ -332,6 +336,7 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
332336 ret = context -> allreduce_fn (& context -> nextlocal_cid , & context -> nextcid , 1 , MPI_MAX ,
333337 context , & subreq );
334338 if (OMPI_SUCCESS != ret ) {
339+ ompi_comm_cid_lowest_id = INT64_MAX ;
335340 OPAL_THREAD_UNLOCK (& ompi_cid_lock );
336341 return ret ;
337342 }
@@ -341,10 +346,12 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
341346 if (flag ) {
342347 opal_pointer_array_test_and_set_item (& ompi_mpi_communicators , context -> nextlocal_cid , NULL );
343348 }
344-
349+
350+ ompi_comm_cid_lowest_id = INT64_MAX ;
345351 OPAL_THREAD_UNLOCK (& ompi_cid_lock );
346352 return OMPI_ERR_OUT_OF_RESOURCE ;
347353 }
354+ OPAL_THREAD_UNLOCK (& ompi_cid_lock );
348355
349356 /* next we want to verify that the resulting commid is ok */
350357 return ompi_comm_request_schedule_append (request , ompi_comm_checkcid , & subreq , 1 );
@@ -356,6 +363,10 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)
356363 ompi_request_t * subreq ;
357364 int ret ;
358365
366+ if (OPAL_THREAD_TRYLOCK (& ompi_cid_lock )) {
367+ return ompi_comm_request_schedule_append (request , ompi_comm_checkcid , NULL , 0 );
368+ }
369+
359370 context -> flag = (context -> nextcid == context -> nextlocal_cid );
360371
361372 if (!context -> flag ) {
@@ -372,19 +383,26 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)
372383 ompi_comm_request_schedule_append (request , ompi_comm_nextcid_check_flag , & subreq , 1 );
373384 }
374385
386+ OPAL_THREAD_UNLOCK (& ompi_cid_lock );
387+
375388 return ret ;
376389}
377390
378391static int ompi_comm_nextcid_check_flag (ompi_comm_request_t * request )
379392{
380393 ompi_comm_cid_context_t * context = (ompi_comm_cid_context_t * ) request -> context ;
381394
395+ if (OPAL_THREAD_TRYLOCK (& ompi_cid_lock )) {
396+ return ompi_comm_request_schedule_append (request , ompi_comm_nextcid_check_flag , NULL , 0 );
397+ }
398+
382399 if (1 == context -> rflag ) {
383400 /* set the according values to the newcomm */
384401 context -> newcomm -> c_contextid = context -> nextcid ;
385402 opal_pointer_array_set_item (& ompi_mpi_communicators , context -> nextcid , context -> newcomm );
386403
387404 /* unlock the cid generator */
405+ ompi_comm_cid_lowest_id = INT64_MAX ;
388406 OPAL_THREAD_UNLOCK (& ompi_cid_lock );
389407
390408 /* done! */
@@ -399,6 +417,8 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
399417
400418 ++ context -> iter ;
401419
420+ OPAL_THREAD_UNLOCK (& ompi_cid_lock );
421+
402422 /* try again */
403423 return ompi_comm_allreduce_getnextcid (request );
404424}
0 commit comments