Skip to content

Commit fbbf743

Browse files
committed
comm/cid: fix threaded CID allocation
This commit should restore the pre-non-blocking behavior of the CID allocator when threads are used. There are two primary changes: 1) do not hold the cid allocator lock past the end of a request callback, and 2) if a lower id communicator is detected during CID allocation back off and let the lower id communicator finish before continuing. Signed-off-by: Nathan Hjelm <[email protected]>
1 parent ce01246 commit fbbf743

File tree

1 file changed

+35
-15
lines changed

1 file changed

+35
-15
lines changed

ompi/communicator/comm_cid.c

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t
181181
context->newcomm = newcomm;
182182
context->comm = comm;
183183
context->bridgecomm = bridgecomm;
184+
context->pml_tag = 0;
184185

185186
/* Determine which implementation of allreduce we have to use
186187
* for the current mode. */
@@ -245,8 +246,8 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
245246
static int ompi_comm_checkcid (ompi_comm_request_t *request);
246247
/* verify that the cid was available globally */
247248
static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request);
248-
/* lock the cid generator */
249-
static int ompi_comm_cid_lock (ompi_comm_request_t *request);
249+
250+
static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX;
250251

251252
int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
252253
ompi_communicator_t *bridgecomm, const void *arg0, const void *arg1,
@@ -271,7 +272,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com
271272

272273
request->context = &context->super;
273274

274-
ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0);
275+
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
275276
ompi_comm_request_start (request);
276277

277278
*req = &request->super;
@@ -299,30 +300,33 @@ int ompi_comm_nextcid (ompi_communicator_t *newcomm, ompi_communicator_t *comm,
299300
return rc;
300301
}
301302

302-
static int ompi_comm_cid_lock (ompi_comm_request_t *request)
303-
{
304-
if (!OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
305-
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
306-
}
307-
308-
return ompi_comm_request_schedule_append (request, ompi_comm_cid_lock, NULL, 0);
309-
}
310-
311303
static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
312304
{
313305
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
306+
int64_t my_id = ((int64_t) ompi_comm_get_cid (context->comm) << 32 | context->pml_tag);
314307
ompi_request_t *subreq;
315308
bool flag;
316309
int ret;
317310

311+
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
312+
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
313+
}
314+
315+
if (ompi_comm_cid_lowest_id < my_id) {
316+
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
317+
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid, NULL, 0);
318+
}
319+
320+
ompi_comm_cid_lowest_id = my_id;
321+
318322
/**
319323
* This is the real algorithm described in the doc
320324
*/
321325
flag = false;
322326
context->nextlocal_cid = mca_pml.pml_max_contextid;
323327
for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
324-
flag = opal_pointer_array_test_and_set_item(&ompi_mpi_communicators,
325-
i, context->comm);
328+
flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
329+
context->comm);
326330
if (true == flag) {
327331
context->nextlocal_cid = i;
328332
break;
@@ -332,6 +336,7 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
332336
ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
333337
context, &subreq);
334338
if (OMPI_SUCCESS != ret) {
339+
ompi_comm_cid_lowest_id = INT64_MAX;
335340
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
336341
return ret;
337342
}
@@ -341,10 +346,12 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
341346
if (flag) {
342347
opal_pointer_array_test_and_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
343348
}
344-
349+
350+
ompi_comm_cid_lowest_id = INT64_MAX;
345351
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
346352
return OMPI_ERR_OUT_OF_RESOURCE;
347353
}
354+
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
348355

349356
/* next we want to verify that the resulting commid is ok */
350357
return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, &subreq, 1);
@@ -356,6 +363,10 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)
356363
ompi_request_t *subreq;
357364
int ret;
358365

366+
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
367+
return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0);
368+
}
369+
359370
context->flag = (context->nextcid == context->nextlocal_cid);
360371

361372
if (!context->flag) {
@@ -372,19 +383,26 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)
372383
ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, &subreq, 1);
373384
}
374385

386+
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
387+
375388
return ret;
376389
}
377390

378391
static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
379392
{
380393
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
381394

395+
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
396+
return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_check_flag, NULL, 0);
397+
}
398+
382399
if (1 == context->rflag) {
383400
/* set the according values to the newcomm */
384401
context->newcomm->c_contextid = context->nextcid;
385402
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);
386403

387404
/* unlock the cid generator */
405+
ompi_comm_cid_lowest_id = INT64_MAX;
388406
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
389407

390408
/* done! */
@@ -399,6 +417,8 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
399417

400418
++context->iter;
401419

420+
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
421+
402422
/* try again */
403423
return ompi_comm_allreduce_getnextcid (request);
404424
}

0 commit comments

Comments
 (0)