diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index d559c3ad6b1..2e6476375e6 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -15,6 +15,7 @@ * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2016 Los Alamos National Security, LLC. All rights * reserved. + * Copyright (c) 2016 Mellanox Technologies. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -87,6 +88,7 @@ int ompi_request_default_wait_any(size_t count, int rc = OMPI_SUCCESS; ompi_request_t *request=NULL; ompi_wait_sync_t sync; + int sync_sets = 0, sync_unsets = 0; WAIT_SYNC_INIT(&sync, 1); @@ -108,6 +110,8 @@ int ompi_request_default_wait_any(size_t count, completed = i; *index = i; goto after_sync_wait; + } else { + sync_sets++; } } @@ -116,7 +120,8 @@ int ompi_request_default_wait_any(size_t count, if (MPI_STATUS_IGNORE != status) { *status = ompi_status_empty; } - WAIT_SYNC_RELEASE(&sync); + /* No signal-in-flight can be in this case */ + WAIT_SYNC_RELEASE_NOWAIT(&sync); return rc; } @@ -138,8 +143,17 @@ int ompi_request_default_wait_any(size_t count, */ if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING) ) { *index = i; + } else { + sync_unsets++; } } + + if( sync_sets == sync_unsets ){ + /* set signalled flag so we won't + * block in WAIT_SYNC_RELEASE + */ + WAIT_SYNC_SIGNALLED(&sync); + } request = requests[*index]; assert( REQUEST_COMPLETE(request) ); @@ -361,6 +375,7 @@ int ompi_request_default_wait_some(size_t count, ompi_request_t **rptr = NULL; ompi_request_t *request = NULL; ompi_wait_sync_t sync; + bool will_be_signalled = false; WAIT_SYNC_INIT(&sync, 1); @@ -383,13 +398,19 @@ int ompi_request_default_wait_some(size_t count, if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, &sync) ) { /* If the request is completed go ahead and mark it as such */ assert( REQUEST_COMPLETE(request) ); + /* TODO: make sure MPI spec is not strict about index order */ + indices[num_requests_done] = i; num_requests_done++; + REQUEST_UNMARK(request); + } else { + REQUEST_MARK(request); } } if(num_requests_null_inactive == count) { *outcount = MPI_UNDEFINED; - WAIT_SYNC_RELEASE(&sync); + /* nobody will signall us */ + WAIT_SYNC_RELEASE_NOWAIT(&sync); return rc; } @@ -402,23 +423,33 @@ int ompi_request_default_wait_some(size_t count, /* Clean up the synchronization primitives */ rptr = requests; - num_requests_done = 0; for (size_t i = 0; i < count; i++, rptr++) { request = *rptr; - if( request->req_state == OMPI_REQUEST_INACTIVE ) { + /* Skip inactive and already accounted requests */ + if( request->req_state == OMPI_REQUEST_INACTIVE || !REQUEST_MARKED(request) ) { continue; } - /* Atomically mark the request as pending. If this succeed - * then the request was not completed, and it is now marked as - * pending. Otherwise, the request is complete )either it was - * before or it has been meanwhile). The major drawback here - * is that we will do all the atomics operations in all cases. + /* Atomically mark the request as pending. + * If this succeed - then the request was not completed, + * and it is now marked as pending. + * Otherwise, the request is complete meanwhile. */ - if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING) ) { + if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING) ) { indices[num_requests_done] = i; num_requests_done++; - } + /* at least one of requests was completed during this call + * corresponding thread will signal us + */ + will_be_signalled = true; + } + } + + if( !will_be_signalled ){ + /* nobody knows about us, + * set signa-in-progress flag to false + */ + WAIT_SYNC_SIGNALLED(&sync); } WAIT_SYNC_RELEASE(&sync); diff --git a/ompi/request/request.h b/ompi/request/request.h index 9d53b6156a3..b09573546b9 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -15,6 +15,7 @@ * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. * Copyright (c) 2015-2016 Los Alamos National Security, LLC. All rights * reserved. + * Copyright (c) 2016 Mellanox Technologies. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -112,6 +113,7 @@ struct ompi_request_t { ompi_request_complete_fn_t req_complete_cb; /**< Called when the request is MPI completed */ void *req_complete_cb_data; ompi_mpi_object_t req_mpi_object; /**< Pointer to MPI object that created this request */ + bool marked; }; /** @@ -151,6 +153,12 @@ typedef struct ompi_predefined_request_t ompi_predefined_request_t; #define REQUEST_COMPLETE(req) (REQUEST_COMPLETED == (req)->req_complete) + +#define REQUEST_MARK(req) ( (req)->marked = true ) +#define REQUEST_UNMARK(req) ( (req)->marked = false ) +#define REQUEST_MARKED(req) ( (req)->marked ) + + /** * Finalize a request. This is a macro to avoid function call * overhead, since this is typically invoked in the critical diff --git a/opal/threads/wait_sync.h b/opal/threads/wait_sync.h index b29f01c4748..afc4bceb84c 100644 --- a/opal/threads/wait_sync.h +++ b/opal/threads/wait_sync.h @@ -5,6 +5,7 @@ * reserved. * Copyright (c) 2016 Los Alamos National Security, LLC. All rights * reserved. + * Copyright (c) 2016 Mellanox Technologies. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -24,6 +25,7 @@ typedef struct ompi_wait_sync_t { pthread_mutex_t lock; struct ompi_wait_sync_t *next; struct ompi_wait_sync_t *prev; + volatile bool signaling; } ompi_wait_sync_t; #define REQUEST_PENDING (void*)0L @@ -31,19 +33,42 @@ typedef struct ompi_wait_sync_t { #define SYNC_WAIT(sync) (opal_using_threads() ? sync_wait_mt (sync) : sync_wait_st (sync)) +/* The loop in release handles a race condition between the signaling + * thread and the destruction of the condition variable. The signaling + * member will be set to false after the final signaling thread has + * finished opertating on the sync object. This is done to avoid + * extra atomics in the singalling function and keep it as fast + * as possible. Note that the race window is small so spinning here + * is more optimal than sleeping since this macro is called in + * the critical path. */ #define WAIT_SYNC_RELEASE(sync) \ if (opal_using_threads()) { \ - pthread_cond_destroy(&(sync)->condition); \ - pthread_mutex_destroy(&(sync)->lock); \ + while ((sync)->signaling) { \ + continue; \ + } \ + pthread_cond_destroy(&(sync)->condition); \ + pthread_mutex_destroy(&(sync)->lock); \ } +#define WAIT_SYNC_RELEASE_NOWAIT(sync) \ + if (opal_using_threads()) { \ + pthread_cond_destroy(&(sync)->condition); \ + pthread_mutex_destroy(&(sync)->lock); \ + } + + #define WAIT_SYNC_SIGNAL(sync) \ if (opal_using_threads()) { \ pthread_mutex_lock(&(sync->lock)); \ pthread_cond_signal(&sync->condition); \ pthread_mutex_unlock(&(sync->lock)); \ + sync->signaling = false; \ } +#define WAIT_SYNC_SIGNALLED(sync){ \ + (sync)->signaling = false; \ +} + OPAL_DECLSPEC int sync_wait_mt(ompi_wait_sync_t *sync); static inline int sync_wait_st (ompi_wait_sync_t *sync) { @@ -61,6 +86,7 @@ static inline int sync_wait_st (ompi_wait_sync_t *sync) (sync)->next = NULL; \ (sync)->prev = NULL; \ (sync)->status = 0; \ + (sync)->signaling = true; \ if (opal_using_threads()) { \ pthread_cond_init (&(sync)->condition, NULL); \ pthread_mutex_init (&(sync)->lock, NULL); \ @@ -81,8 +107,9 @@ static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int sta } } else { /* this is an error path so just use the atomic */ - opal_atomic_swap_32 (&sync->count, 0); sync->status = OPAL_ERROR; + opal_atomic_wmb (); + opal_atomic_swap_32 (&sync->count, 0); } WAIT_SYNC_SIGNAL(sync); }