@@ -156,6 +156,11 @@ static opal_mutex_t request_cont_lock;
156156 */
157157static bool progress_callback_registered = false;
158158
159+ /**
160+ * Thread-local list of continuation requests that should be progressed.
161+ */
162+ static opal_thread_local opal_list_t * thread_progress_list = NULL ;
163+
159164static inline
160165void ompi_continue_cont_req_release (ompi_cont_request_t * cont_req ,
161166 int32_t num_release ,
@@ -234,16 +239,44 @@ int ompi_continue_progress_n(const uint32_t max)
234239 uint32_t completed = 0 ;
235240 in_progress = 1 ;
236241
242+ const bool using_threads = opal_using_threads ();
243+ if (NULL != thread_progress_list ) {
244+ ompi_cont_request_t * cont_req ;
245+ OPAL_LIST_FOREACH (cont_req , thread_progress_list , ompi_cont_request_t ) {
246+ ompi_continuation_t * cb ;
247+ if (opal_list_is_empty (cont_req -> cont_complete_list )) continue ;
248+ while (max > completed ) {
249+ if (using_threads ) {
250+ opal_atomic_lock (& cont_req -> cont_lock );
251+ cb = (ompi_continuation_t * ) opal_list_remove_first (cont_req -> cont_complete_list );
252+ opal_atomic_unlock (& cont_req -> cont_lock );
253+ } else {
254+ cb = (ompi_continuation_t * ) opal_list_remove_first (cont_req -> cont_complete_list );
255+ }
256+ if (NULL == cb ) break ;
257+
258+ ompi_continue_cont_invoke (cb );
259+ ++ completed ;
260+ }
261+ if (max <= completed ) break ;
262+ }
263+ }
264+
237265 if (!opal_list_is_empty (& continuation_list )) {
238266 /* global progress */
239- do {
267+ while ( max > completed ) {
240268 ompi_continuation_t * cb ;
241- OPAL_THREAD_LOCK (& request_cont_lock );
242- cb = (ompi_continuation_t * )opal_list_remove_first (& continuation_list );
243- OPAL_THREAD_UNLOCK (& request_cont_lock );
269+ if (using_threads ) {
270+ opal_mutex_lock (& request_cont_lock );
271+ cb = (ompi_continuation_t * )opal_list_remove_first (& continuation_list );
272+ opal_mutex_unlock (& request_cont_lock );
273+ } else {
274+ cb = (ompi_continuation_t * )opal_list_remove_first (& continuation_list );
275+ }
244276 if (NULL == cb ) break ;
245277 ompi_continue_cont_invoke (cb );
246- } while (max > ++ completed );
278+ ++ completed ;
279+ }
247280 }
248281
249282 in_progress = 0 ;
@@ -319,6 +352,13 @@ int ompi_continue_register_request_progress(ompi_request_t *req)
319352
320353 opal_atomic_unlock (& cont_req -> cont_lock );
321354
355+ if (NULL == thread_progress_list ) {
356+ thread_progress_list = OBJ_NEW (opal_list_t );
357+ }
358+
359+ /* enqueue the continuation request to allow for progress by this thread */
360+ opal_list_append (thread_progress_list , & req -> super .super );
361+
322362 return OMPI_SUCCESS ;
323363}
324364
@@ -330,13 +370,20 @@ int ompi_continue_deregister_request_progress(ompi_request_t *req)
330370{
331371 ompi_cont_request_t * cont_req = (ompi_cont_request_t * )req ;
332372
373+ if (NULL == cont_req -> cont_complete_list ) return OMPI_SUCCESS ;
374+
333375 /* make sure we execute all outstanding continuations */
334376 uint32_t tmp_max_poll = cont_req -> continue_max_poll ;
335377 cont_req -> continue_max_poll = UINT32_MAX ;
336378 ompi_continue_progress_request (req );
337379 cont_req -> continue_max_poll = tmp_max_poll ;
338380
339381 cont_req -> cont_in_wait = false;
382+
383+
384+ /* remove the continuation request from the thread-local progress list */
385+ opal_list_remove_item (thread_progress_list , & req -> super .super );
386+
340387 return OMPI_SUCCESS ;
341388}
342389
0 commit comments