@@ -80,8 +80,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_cont_request_t);
8080struct ompi_cont_request_t {
8181 ompi_request_t super ;
8282 opal_atomic_lock_t cont_lock ; /**< Lock used completing/restarting the cont request */
83- bool cont_global_progress ;
8483 bool cont_enqueue_complete ; /**< Whether to enqueue immediately complete requests */
84+ bool cont_in_wait ; /**< Whether the continuation request is currently waited on */
8585 opal_atomic_int32_t cont_num_active ; /**< The number of active continuations registered with a continuation request */
8686 uint32_t continue_max_poll ; /**< max number of local continuations to execute at once */
8787 opal_list_t * cont_complete_list ; /**< List of complete continuations to be invoked during test */
@@ -98,7 +98,7 @@ static void ompi_cont_request_construct(ompi_cont_request_t* cont_req)
9898 cont_req -> super .req_status = ompi_status_empty ; /* always returns MPI_SUCCESS */
9999 opal_atomic_lock_init (& cont_req -> cont_lock , false);
100100 cont_req -> cont_enqueue_complete = false;
101- cont_req -> cont_global_progress = false;
101+ cont_req -> cont_in_wait = false;
102102 cont_req -> cont_num_active = 0 ;
103103 cont_req -> continue_max_poll = UINT32_MAX ;
104104 cont_req -> cont_complete_list = NULL ;
@@ -142,11 +142,13 @@ OBJ_CLASS_INSTANCE(
142142 NULL , NULL );
143143
144144/**
145- * List of completed requests that need the user-defined completion callback
146- * invoked.
145+ * List of continuations eligible for execution
147146 */
148147static opal_list_t continuation_list ;
149148
149+ /**
150+ * Mutex to protect the continuation_list
151+ */
150152static opal_mutex_t request_cont_lock ;
151153
152154/**
@@ -155,16 +157,15 @@ static opal_mutex_t request_cont_lock;
155157static bool progress_callback_registered = false;
156158
157159static inline
158- void ompi_continue_cont_release (ompi_continuation_t * cont )
160+ void ompi_continue_cont_req_release (ompi_cont_request_t * cont_req ,
161+ int32_t num_release ,
162+ bool take_lock )
159163{
160- ompi_cont_request_t * cont_req = cont -> cont_req ;
161- assert (OMPI_REQUEST_CONT == cont_req -> super .req_type );
162-
163- int num_active = opal_atomic_add_fetch_32 (& cont_req -> cont_num_active , -1 );
164+ int num_active = opal_atomic_add_fetch_32 (& cont_req -> cont_num_active , - num_release );
164165 assert (num_active >= 0 );
165166 if (0 == num_active ) {
166167 const bool using_threads = opal_using_threads ();
167- if (using_threads ) {
168+ if (take_lock && using_threads ) {
168169 opal_atomic_lock (& cont_req -> cont_lock );
169170 }
170171 /* double check that no other thread has completed or restarted the request already */
@@ -173,10 +174,24 @@ void ompi_continue_cont_release(ompi_continuation_t *cont)
173174 /* signal that all continuations were found complete */
174175 ompi_request_complete (& cont_req -> super , true);
175176 }
176- if (using_threads ) {
177+ if (take_lock && using_threads ) {
177178 opal_atomic_unlock (& cont_req -> cont_lock );
178179 }
179180 }
181+ }
182+
183+ static inline
184+ void ompi_continue_cont_release (ompi_continuation_t * cont )
185+ {
186+ ompi_cont_request_t * cont_req = cont -> cont_req ;
187+ assert (OMPI_REQUEST_CONT == cont_req -> super .req_type );
188+
189+ /* if a thread is waiting on the request, we got here when
190+ * the thread started executing the continuations, so the continuation
191+ * request is complete already */
192+ if (!cont_req -> cont_in_wait ) {
193+ ompi_continue_cont_req_release (cont_req , 1 , true);
194+ }
180195 OBJ_RELEASE (cont_req );
181196
182197#ifdef OPAL_ENABLE_DEBUG
@@ -214,19 +229,22 @@ static
214229int ompi_continue_progress_n (const uint32_t max )
215230{
216231
217- if (in_progress || opal_list_is_empty ( & continuation_list ) ) return 0 ;
232+ if (in_progress ) return 0 ;
218233
219234 uint32_t completed = 0 ;
220235 in_progress = 1 ;
221236
222- do {
223- ompi_continuation_t * cb ;
224- OPAL_THREAD_LOCK (& request_cont_lock );
225- cb = (ompi_continuation_t * )opal_list_remove_first (& continuation_list );
226- OPAL_THREAD_UNLOCK (& request_cont_lock );
227- if (NULL == cb ) break ;
228- ompi_continue_cont_invoke (cb );
229- } while (max > ++ completed );
237+ if (!opal_list_is_empty (& continuation_list )) {
238+ /* global progress */
239+ do {
240+ ompi_continuation_t * cb ;
241+ OPAL_THREAD_LOCK (& request_cont_lock );
242+ cb = (ompi_continuation_t * )opal_list_remove_first (& continuation_list );
243+ OPAL_THREAD_UNLOCK (& request_cont_lock );
244+ if (NULL == cb ) break ;
245+ ompi_continue_cont_invoke (cb );
246+ } while (max > ++ completed );
247+ }
230248
231249 in_progress = 0 ;
232250
@@ -293,24 +311,13 @@ int ompi_continue_register_request_progress(ompi_request_t *req)
293311
294312 if (NULL == cont_req -> cont_complete_list ) return OMPI_SUCCESS ;
295313
296- const bool using_threads = opal_using_threads ();
297- if (using_threads ) {
298- OPAL_THREAD_LOCK (& request_cont_lock );
299- /* lock needed to sync with ompi_request_cont_enqueue_complete */
300- opal_atomic_lock (& cont_req -> cont_lock );
301- }
314+ opal_atomic_lock (& cont_req -> cont_lock );
302315
303- /* signal that from now on all continuations should go into the global queue */
304- cont_req -> cont_global_progress = true;
316+ cont_req -> cont_in_wait = true;
305317
306- /* move all complete local continuations into the global queue */
307- opal_list_join (& continuation_list , opal_list_get_begin (& continuation_list ),
308- cont_req -> cont_complete_list );
318+ ompi_continue_cont_req_release (cont_req , opal_list_get_size (cont_req -> cont_complete_list ), false);
309319
310- if (using_threads ) {
311- opal_atomic_unlock (& cont_req -> cont_lock );
312- OPAL_THREAD_UNLOCK (& request_cont_lock );
313- }
320+ opal_atomic_unlock (& cont_req -> cont_lock );
314321
315322 return OMPI_SUCCESS ;
316323}
@@ -322,15 +329,14 @@ int ompi_continue_register_request_progress(ompi_request_t *req)
322329int ompi_continue_deregister_request_progress (ompi_request_t * req )
323330{
324331 ompi_cont_request_t * cont_req = (ompi_cont_request_t * )req ;
325- if (opal_using_threads ()) {
326- /* lock needed to sync with ompi_request_cont_enqueue_complete */
327- opal_atomic_lock (& cont_req -> cont_lock );
328- cont_req -> cont_global_progress = false;
329- opal_atomic_unlock (& cont_req -> cont_lock );
330- } else {
331- cont_req -> cont_global_progress = false;
332- }
333332
333+ /* make sure we execute all outstanding continuations */
334+ uint32_t tmp_max_poll = cont_req -> continue_max_poll ;
335+ cont_req -> continue_max_poll = UINT32_MAX ;
336+ ompi_continue_progress_request (req );
337+ cont_req -> continue_max_poll = tmp_max_poll ;
338+
339+ cont_req -> cont_in_wait = false;
334340 return OMPI_SUCCESS ;
335341}
336342
@@ -383,35 +389,31 @@ static void
383389ompi_continue_enqueue_runnable (ompi_continuation_t * cont )
384390{
385391 ompi_cont_request_t * cont_req = cont -> cont_req ;
386- int retry ;
387- do {
388- retry = 0 ;
389- if (NULL != cont_req -> cont_complete_list
390- && !cont_req -> cont_global_progress ) {
391- opal_atomic_lock (& cont_req -> cont_lock );
392- if (OPAL_UNLIKELY (cont_req -> cont_global_progress )) {
393- opal_atomic_unlock (& cont_req -> cont_lock );
394- /* try again, this time target the global list */
395- retry = 1 ;
396- continue ;
397- }
398- opal_list_append (cont_req -> cont_complete_list , & cont -> super .super );
399- opal_atomic_unlock (& cont_req -> cont_lock );
400- } else {
401- OPAL_THREAD_LOCK (& request_cont_lock );
402- opal_list_append (& continuation_list , & cont -> super .super );
403- if (OPAL_UNLIKELY (!progress_callback_registered )) {
404- /* TODO: Ideally, we want to ensure that the callback is called *after*
405- * all the other progress callbacks are done so that any
406- * completions have happened before we attempt to execute
407- * callbacks. There doesn't seem to exist the infrastructure though.
408- */
409- opal_progress_register (& ompi_continue_progress_callback );
410- progress_callback_registered = true;
411- }
412- OPAL_THREAD_UNLOCK (& request_cont_lock );
392+ if (NULL != cont_req -> cont_complete_list ) {
393+ opal_atomic_lock (& cont_req -> cont_lock );
394+ opal_list_append (cont_req -> cont_complete_list , & cont -> super .super );
395+ if (cont_req -> cont_in_wait ) {
396+ /* if a thread is waiting for this request to complete, signal completions
397+ * the continuations will be executed at the end of the wait
398+ * but we need to ensure that the request is marked complete first
399+ */
400+ ompi_continue_cont_req_release (cont_req , 1 , false);
413401 }
414- } while (retry );
402+ opal_atomic_unlock (& cont_req -> cont_lock );
403+ } else {
404+ OPAL_THREAD_LOCK (& request_cont_lock );
405+ opal_list_append (& continuation_list , & cont -> super .super );
406+ if (OPAL_UNLIKELY (!progress_callback_registered )) {
407+ /* TODO: Ideally, we want to ensure that the callback is called *after*
408+ * all the other progress callbacks are done so that any
409+ * completions have happened before we attempt to execute
410+ * callbacks. There doesn't seem to exist the infrastructure though.
411+ */
412+ opal_progress_register (& ompi_continue_progress_callback );
413+ progress_callback_registered = true;
414+ }
415+ OPAL_THREAD_UNLOCK (& request_cont_lock );
416+ }
415417}
416418
417419/**
0 commit comments