@@ -136,8 +136,8 @@ typedef struct ompi_request_t ompi_request_t;
136136 */
137137#define PREDEFINED_REQUEST_PAD 256
138138
139- #define REQUEST_PENDING (void *)0L
140- #define REQUEST_COMPLETED (void *)1L
139+ #define REQUEST_PENDING (( void *)0L)
140+ #define REQUEST_COMPLETED (( void *)1L)
141141
142142struct ompi_predefined_request_t {
143143 struct ompi_request_t request ;
@@ -162,8 +162,8 @@ typedef struct ompi_predefined_request_t ompi_predefined_request_t;
162162 (request)->req_complete_cb_data = NULL; \
163163 } while (0);
164164
165-
166165#define REQUEST_COMPLETE (req ) (REQUEST_COMPLETED == (req)->req_complete)
166+
167167/**
168168 * Finalize a request. This is a macro to avoid function call
169169 * overhead, since this is typically invoked in the critical
@@ -455,7 +455,7 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
455455#if OMPI_HAVE_MPI_EXT_CONTINUE
456456 if (OMPI_REQUEST_CONT == req -> req_type ) {
457457 /* let the continuations be processed as part of the global progress loop
458- * while we're waiting for their completion */
458+ * while we're waiting for their completion */
459459 ompi_continue_register_request_progress (req , & sync );
460460 }
461461#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */
@@ -529,24 +529,59 @@ static inline void ompi_request_wait_completion(ompi_request_t *req)
529529static inline int ompi_request_complete (ompi_request_t * request , bool with_signal )
530530{
531531 int rc = 0 ;
532+ void * complete_flag = REQUEST_COMPLETED ;
533+ ompi_request_complete_fn_t cb = request -> req_complete_cb ;
532534
533- if (NULL != request -> req_complete_cb ) {
534- /* Set the request cb to NULL to allow resetting in the callback */
535- ompi_request_complete_fn_t fct = request -> req_complete_cb ;
535+ if (NULL != cb ) {
536536 request -> req_complete_cb = NULL ;
537- rc = fct ( request );
537+ rc = cb ( request );
538538 }
539539
540540 if (0 == rc ) {
541541 if (OPAL_LIKELY (with_signal )) {
542542
543543 ompi_wait_sync_t * tmp_sync = (ompi_wait_sync_t * ) OPAL_ATOMIC_SWAP_PTR (& request -> req_complete ,
544544 REQUEST_COMPLETED );
545- if ( REQUEST_PENDING != tmp_sync ) {
545+ if (opal_using_threads ()) {
546+ /* another thread may have set the callback, so check again */
547+ opal_atomic_mb (); // prevent reordering of reads and writes around this point
548+ if (OPAL_UNLIKELY (NULL != request -> req_complete_cb )) {
549+ /* slow-path: a callback was added after we completed above, try to acquire it */
550+ cb = (ompi_request_complete_fn_t )opal_atomic_swap_ptr ((opal_atomic_intptr_t * )& request -> req_complete_cb ,
551+ (intptr_t )NULL );
552+ if (OPAL_UNLIKELY (NULL != cb )) {
553+ /* acquired the callback, invoke it */
554+ rc = cb (request );
555+ if (0 != rc ) {
556+ /* the callback may have restarted or freed the request so prevent
557+ * the code below from updating the sync object and accessing the request;
558+ * requests that are restarted or freed should not have a sync object */
559+ tmp_sync = REQUEST_PENDING ;
560+ }
561+ }
562+ }
563+ }
564+
565+ if (REQUEST_PENDING != tmp_sync ) {
546566 wait_sync_update (tmp_sync , 1 , request -> req_status .MPI_ERROR );
547567 }
548568 } else {
549569 request -> req_complete = REQUEST_COMPLETED ;
570+ if (opal_using_threads ()) {
571+ /* another thread that may have set a callback, so check again */
572+ opal_atomic_mb (); // prevent reordering of reads and writes across this point
573+ if (OPAL_UNLIKELY (NULL != request -> req_complete_cb )) {
574+ /* slow-path: a callback was added after we completed above, try to acquire it */
575+ cb = (ompi_request_complete_fn_t )opal_atomic_swap_ptr ((opal_atomic_intptr_t * )& request -> req_complete_cb ,
576+ (intptr_t )NULL );
577+ if (OPAL_UNLIKELY (NULL != cb )) {
578+ /* acquired the callback, invoke it
579+ * the callback may or may not free or restart the request, so we cannot touch
580+ * the request after this point */
581+ cb (request );
582+ }
583+ }
584+ }
550585 }
551586 }
552587
@@ -558,12 +593,27 @@ static inline int ompi_request_set_callback(ompi_request_t* request,
558593 void * cb_data )
559594{
560595 request -> req_complete_cb_data = cb_data ;
561- request -> req_complete_cb = cb ;
562- /* If request is completed and the callback is not called, need to call callback */
563- if ((NULL != request -> req_complete_cb ) && (request -> req_complete == REQUEST_COMPLETED )) {
564- ompi_request_complete_fn_t fct = request -> req_complete_cb ;
565- request -> req_complete_cb = NULL ;
566- return fct ( request );
596+
597+ if (REQUEST_COMPLETE (request )) {
598+ /* invoke the callback; this is safe here because the
599+ * completing thread never sees the callback */
600+ cb (request ); // return value ignored here
601+ } else {
602+ if (opal_using_threads ()) {
603+ opal_atomic_swap_ptr (& request -> req_complete_cb , (intptr_t )cb );
604+ opal_atomic_mb ();
605+ if (REQUEST_COMPLETE (request )) {
606+ /* the request has completed after we swapped the callback, try to acquire it */
607+ ompi_request_complete_fn_t new_cb ;
608+ new_cb = (ompi_request_complete_fn_t )opal_atomic_swap_ptr ((opal_atomic_intptr_t * )& request -> req_complete_cb ,
609+ (intptr_t )NULL );
610+ if (NULL != new_cb ) {
611+ new_cb (request ); // return value ignored here
612+ }
613+ }
614+ } else {
615+ request -> req_complete_cb = cb ;
616+ }
567617 }
568618 return OMPI_SUCCESS ;
569619}
0 commit comments