Skip to content

Commit e014717

Browse files
committed
Make sure threads waiting on continuation request execute continuations while waiting
No other threads should execute them if the continuation request is limited to polling only. Signed-off-by: Joseph Schuchart <[email protected]>
1 parent aa9677d commit e014717

File tree

6 files changed

+110
-84
lines changed

6 files changed

+110
-84
lines changed

ompi/mpiext/continue/c/continuation.c

Lines changed: 50 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ struct ompi_cont_request_t {
8181
ompi_request_t super;
8282
opal_atomic_lock_t cont_lock; /**< Lock used completing/restarting the cont request */
8383
bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */
84-
bool cont_in_wait; /**< Whether the continuation request is currently waited on */
8584
opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */
8685
uint32_t continue_max_poll; /**< max number of local continuations to execute at once */
8786
opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */
87+
ompi_wait_sync_t *sync; /**< Sync object this continuation request is attached to */
8888
};
8989

9090
static void ompi_cont_request_construct(ompi_cont_request_t* cont_req)
@@ -98,10 +98,10 @@ static void ompi_cont_request_construct(ompi_cont_request_t* cont_req)
9898
cont_req->super.req_status = ompi_status_empty; /* always returns MPI_SUCCESS */
9999
opal_atomic_lock_init(&cont_req->cont_lock, false);
100100
cont_req->cont_enqueue_complete = false;
101-
cont_req->cont_in_wait = false;
102101
cont_req->cont_num_active = 0;
103102
cont_req->continue_max_poll = UINT32_MAX;
104103
cont_req->cont_complete_list = NULL;
104+
cont_req->sync = NULL;
105105
}
106106

107107
static void ompi_cont_request_destruct(ompi_cont_request_t* cont_req)
@@ -156,10 +156,13 @@ static opal_mutex_t request_cont_lock;
156156
*/
157157
static bool progress_callback_registered = false;
158158

159-
/**
160-
* Thread-local list of continuation requests that should be progressed.
161-
*/
162-
static opal_thread_local opal_list_t *thread_progress_list = NULL;
159+
struct lazy_list_s {
160+
opal_list_t list;
161+
bool is_initialized;
162+
};
163+
typedef struct lazy_list_s lazy_list_t;
164+
165+
static opal_thread_local lazy_list_t thread_progress_list = { .is_initialized = false };
163166

164167
static inline
165168
void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req,
@@ -179,6 +182,10 @@ void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req,
179182
/* signal that all continuations were found complete */
180183
ompi_request_complete(&cont_req->super, true);
181184
}
185+
if (NULL != cont_req->sync) {
186+
/* release the sync object */
187+
OPAL_THREAD_ADD_FETCH32(&cont_req->sync->num_req_need_progress, -1);
188+
}
182189
if (take_lock && using_threads) {
183190
opal_atomic_unlock(&cont_req->cont_lock);
184191
}
@@ -191,12 +198,7 @@ void ompi_continue_cont_release(ompi_continuation_t *cont)
191198
ompi_cont_request_t *cont_req = cont->cont_req;
192199
assert(OMPI_REQUEST_CONT == cont_req->super.req_type);
193200

194-
/* if a thread is waiting on the request, we got here when
195-
* the thread started executing the continuations, so the continuation
196-
* request is complete already */
197-
if (!cont_req->cont_in_wait) {
198-
ompi_continue_cont_req_release(cont_req, 1, true);
199-
}
201+
ompi_continue_cont_req_release(cont_req, 1, true);
200202
OBJ_RELEASE(cont_req);
201203

202204
#ifdef OPAL_ENABLE_DEBUG
@@ -240,9 +242,13 @@ int ompi_continue_progress_n(const uint32_t max)
240242
in_progress = 1;
241243

242244
const bool using_threads = opal_using_threads();
243-
if (NULL != thread_progress_list) {
245+
246+
/* execute thread-local continuations first
247+
* (e.g., from continuation requests the current thread is waiting on) */
248+
lazy_list_t *tl_list = &thread_progress_list;
249+
if (tl_list->is_initialized) {
244250
ompi_cont_request_t *cont_req;
245-
OPAL_LIST_FOREACH(cont_req, thread_progress_list, ompi_cont_request_t) {
251+
OPAL_LIST_FOREACH(cont_req, &tl_list->list, ompi_cont_request_t) {
246252
ompi_continuation_t *cb;
247253
if (opal_list_is_empty(cont_req->cont_complete_list)) continue;
248254
while (max > completed) {
@@ -289,6 +295,12 @@ static int ompi_continue_progress_callback()
289295
return ompi_continue_progress_n(1);
290296
}
291297

298+
static int ompi_continue_wait_progress_callback()
299+
{
300+
return ompi_continue_progress_n(UINT32_MAX);
301+
}
302+
303+
292304
int ompi_continue_progress_request(ompi_request_t *req)
293305
{
294306
if (in_progress) return 0;
@@ -329,60 +341,53 @@ int ompi_continue_progress_request(ompi_request_t *req)
329341

330342

331343
/**
332-
* Register the provided continuation request to be included in the
333-
* global progress loop (used while a thread is waiting for the contnuation
334-
* request to complete).
335-
* We move all local continuations into the global continuation list
336-
* and mark the continuation request such that future continuations
337-
* are directly put into the global continuations list.
338-
* Once the wait completed (i.e., all continuations registered with the
339-
* continuation request) we unmark it (see ompi_continue_deregister_request_progress).
344+
* Register the continuation request so that it will be progressed even if
345+
* it is poll-only and the thread is waiting on the provided sync object.
340346
*/
341-
int ompi_continue_register_request_progress(ompi_request_t *req)
347+
int ompi_continue_register_request_progress(ompi_request_t *req, ompi_wait_sync_t *sync)
342348
{
343349
ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req;
344350

345351
if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS;
346352

347-
opal_atomic_lock(&cont_req->cont_lock);
353+
lazy_list_t *cont_req_list = &thread_progress_list;
348354

349-
cont_req->cont_in_wait = true;
350-
351-
ompi_continue_cont_req_release(cont_req, opal_list_get_size(cont_req->cont_complete_list), false);
355+
/* check that the thread-local list is initialized */
356+
if (!cont_req_list->is_initialized) {
357+
OBJ_CONSTRUCT(&cont_req_list->list, opal_list_t);
358+
cont_req_list->is_initialized = true;
359+
}
352360

353-
opal_atomic_unlock(&cont_req->cont_lock);
361+
/* add the continuation request to the thread-local list */
362+
opal_list_append(&cont_req_list->list, &cont_req->super.super.super);
354363

355-
if (NULL == thread_progress_list) {
356-
thread_progress_list = OBJ_NEW(opal_list_t);
364+
/* register with the sync object */
365+
if (NULL != sync) {
366+
sync->num_req_need_progress++;
367+
sync->progress_cb = &ompi_continue_wait_progress_callback;
357368
}
358-
359-
/* enqueue the continuation request to allow for progress by this thread */
360-
opal_list_append(thread_progress_list, &req->super.super);
369+
cont_req->sync = sync;
361370

362371
return OMPI_SUCCESS;
363372
}
364373

365374
/**
366-
* Remove the continuation request from being progressed by the global progress
367-
* loop (after a wait completes).
375+
* Remove the poll-only continuation request from the thread's progress list after
376+
* it has completed.
368377
*/
369378
int ompi_continue_deregister_request_progress(ompi_request_t *req)
370379
{
371380
ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req;
372381

373382
if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS;
374383

375-
/* make sure we execute all outstanding continuations */
376-
uint32_t tmp_max_poll = cont_req->continue_max_poll;
377-
cont_req->continue_max_poll = UINT32_MAX;
378-
ompi_continue_progress_request(req);
379-
cont_req->continue_max_poll = tmp_max_poll;
380-
381-
cont_req->cont_in_wait = false;
382-
384+
/* let the sync know we're done, it may suspend the thread now */
385+
if (NULL != cont_req->sync) {
386+
cont_req->sync->num_req_need_progress--;
387+
}
383388

384389
/* remove the continuation request from the thread-local progress list */
385-
opal_list_remove_item(thread_progress_list, &req->super.super);
390+
opal_list_remove_item(&thread_progress_list.list, &req->super.super);
386391

387392
return OMPI_SUCCESS;
388393
}
@@ -439,13 +444,6 @@ ompi_continue_enqueue_runnable(ompi_continuation_t *cont)
439444
if (NULL != cont_req->cont_complete_list) {
440445
opal_atomic_lock(&cont_req->cont_lock);
441446
opal_list_append(cont_req->cont_complete_list, &cont->super.super);
442-
if (cont_req->cont_in_wait) {
443-
/* if a thread is waiting for this request to complete, signal completions
444-
* the continuations will be executed at the end of the wait
445-
* but we need to ensure that the request is marked complete first
446-
*/
447-
ompi_continue_cont_req_release(cont_req, 1, false);
448-
}
449447
opal_atomic_unlock(&cont_req->cont_lock);
450448
} else {
451449
OPAL_THREAD_LOCK(&request_cont_lock);
@@ -601,15 +599,14 @@ int ompi_continue_attach(
601599
requests[i] = MPI_REQUEST_NULL;
602600
}
603601
}
604-
605602
}
606603
}
607604

608605
assert(count >= num_registered);
609606
int num_complete = count - num_registered;
610607
int32_t last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active,
611608
-num_complete);
612-
if (0 == last_num_active && 0 == num_registered) {
609+
if (0 == last_num_active) {
613610
if (cont_req->cont_enqueue_complete) {
614611
/* enqueue for later processing */
615612
ompi_continue_enqueue_runnable(cont);

ompi/mpiext/continue/c/continuation.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "ompi/mpiext/continue/c/mpiext_continue_c.h"
2323

2424

25+
struct ompi_request_t;
26+
2527
BEGIN_C_DECLS
2628

2729
/**
@@ -38,18 +40,18 @@ int ompi_continuation_fini(void);
3840
* Register a request with local completion list for progressing through
3941
* the progress engine.
4042
*/
41-
int ompi_continue_register_request_progress(ompi_request_t *cont_req);
43+
int ompi_continue_register_request_progress(struct ompi_request_t *cont_req, ompi_wait_sync_t *sync);
4244

4345
/**
4446
* Deregister a request with local completion list from progressing through
4547
* the progress engine.
4648
*/
47-
int ompi_continue_deregister_request_progress(ompi_request_t *cont_req);
49+
int ompi_continue_deregister_request_progress(struct ompi_request_t *cont_req);
4850

4951
/**
5052
* Progress a continuation request that has local completions.
5153
*/
52-
int ompi_continue_progress_request(ompi_request_t *cont_req);
54+
int ompi_continue_progress_request(struct ompi_request_t *cont_req);
5355

5456
/**
5557
* Attach a continuation to a set of operations represented by \c requests.
@@ -60,9 +62,9 @@ int ompi_continue_progress_request(ompi_request_t *cont_req);
6062
* can be used to query for and progress outstanding continuations.
6163
*/
6264
int ompi_continue_attach(
63-
ompi_request_t *cont_req,
65+
struct ompi_request_t *cont_req,
6466
int count,
65-
ompi_request_t *requests[],
67+
struct ompi_request_t *requests[],
6668
MPIX_Continue_cb_function *cont_cb,
6769
void *cont_data,
6870
ompi_status_public_t statuses[]);
@@ -71,7 +73,7 @@ int ompi_continue_attach(
7173
/**
7274
* Allocate a new continuation request.
7375
*/
74-
int ompi_continue_allocate_request(ompi_request_t **cont_req, ompi_info_t *info);
76+
int ompi_continue_allocate_request(struct ompi_request_t **cont_req, ompi_info_t *info);
7577

7678
END_C_DECLS
7779

ompi/request/req_wait.c

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,8 @@ int ompi_request_default_wait(
4141
{
4242
ompi_request_t *req = *req_ptr;
4343

44-
#if OMPI_HAVE_MPI_EXT_CONTINUE
45-
if (OMPI_REQUEST_CONT == req->req_type) {
46-
/* let the continuations be processed as part of the global progress loop
47-
* while we're waiting for their completion */
48-
ompi_continue_register_request_progress(req);
49-
}
50-
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
51-
52-
5344
ompi_request_wait_completion(req);
5445

55-
#if OMPI_HAVE_MPI_EXT_CONTINUE
56-
if (OMPI_REQUEST_CONT == req->req_type) {
57-
ompi_continue_deregister_request_progress(req);
58-
}
59-
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
60-
6146
#if OPAL_ENABLE_FT_MPI
6247
/* Special case for MPI_ANY_SOURCE */
6348
if( MPI_ERR_PROC_FAILED_PENDING == req->req_status.MPI_ERROR ) {
@@ -144,13 +129,6 @@ int ompi_request_default_wait_any(size_t count,
144129

145130
request = requests[i];
146131

147-
#if OMPI_HAVE_MPI_EXT_CONTINUE
148-
if (OMPI_REQUEST_CONT == request->req_type) {
149-
have_cont_req = true;
150-
ompi_continue_register_request_progress(request);
151-
}
152-
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
153-
154132
/* Check for null or completed persistent request. For
155133
* MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE.
156134
*/
@@ -167,6 +145,13 @@ int ompi_request_default_wait_any(size_t count,
167145
}
168146
}
169147

148+
#if OMPI_HAVE_MPI_EXT_CONTINUE
149+
if (OMPI_REQUEST_CONT == request->req_type) {
150+
have_cont_req = true;
151+
ompi_continue_register_request_progress(request, &sync);
152+
}
153+
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
154+
170155
#if OPAL_ENABLE_FT_MPI
171156
if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) {
172157
completed = i;
@@ -319,7 +304,7 @@ int ompi_request_default_wait_all( size_t count,
319304

320305
#if OMPI_HAVE_MPI_EXT_CONTINUE
321306
if (OMPI_REQUEST_CONT == request->req_type) {
322-
ompi_continue_register_request_progress(request);
307+
ompi_continue_register_request_progress(request, &sync);
323308
}
324309
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
325310

@@ -590,7 +575,7 @@ int ompi_request_default_wait_some(size_t count,
590575

591576
#if OMPI_HAVE_MPI_EXT_CONTINUE
592577
if (OMPI_REQUEST_CONT == request->req_type) {
593-
ompi_continue_register_request_progress(request);
578+
ompi_continue_register_request_progress(request, &sync);
594579
}
595580
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
596581

ompi/request/request.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@
4040
#include "ompi/constants.h"
4141
#include "ompi/runtime/params.h"
4242

43+
#if OMPI_HAVE_MPI_EXT_CONTINUE
44+
#include "ompi/mpiext/continue/c/continuation.h"
45+
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
46+
4347
BEGIN_C_DECLS
4448

4549
/**
@@ -465,7 +469,20 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
465469
WAIT_SYNC_INIT(&sync, 1);
466470

467471
if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, &sync)) {
472+
#if OMPI_HAVE_MPI_EXT_CONTINUE
473+
if (OMPI_REQUEST_CONT == req->req_type) {
474+
/* let the continuations be processed as part of the global progress loop
475+
* while we're waiting for their completion */
476+
ompi_continue_register_request_progress(req, &sync);
477+
}
478+
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
468479
SYNC_WAIT(&sync);
480+
481+
#if OMPI_HAVE_MPI_EXT_CONTINUE
482+
if (OMPI_REQUEST_CONT == req->req_type) {
483+
ompi_continue_deregister_request_progress(req);
484+
}
485+
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
469486
} else {
470487
/* completed before we had a chance to swap in the sync object */
471488
WAIT_SYNC_SIGNALLED(&sync);
@@ -487,6 +504,13 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
487504
}
488505
opal_atomic_rmb();
489506
} else {
507+
#if OMPI_HAVE_MPI_EXT_CONTINUE
508+
if (OMPI_REQUEST_CONT == req->req_type) {
509+
/* let the continuations be processed as part of the global progress loop
510+
* while we're waiting for their completion */
511+
ompi_continue_register_request_progress(req, NULL);
512+
}
513+
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
490514
while(!REQUEST_COMPLETE(req)) {
491515
opal_progress();
492516
#if OPAL_ENABLE_FT_MPI
@@ -497,6 +521,12 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
497521
}
498522
#endif /* OPAL_ENABLE_FT_MPI */
499523
}
524+
525+
#if OMPI_HAVE_MPI_EXT_CONTINUE
526+
if (OMPI_REQUEST_CONT == req->req_type) {
527+
ompi_continue_deregister_request_progress(req);
528+
}
529+
#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
500530
}
501531
}
502532

0 commit comments

Comments
 (0)