Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions ompi/request/req_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -87,6 +88,7 @@ int ompi_request_default_wait_any(size_t count,
int rc = OMPI_SUCCESS;
ompi_request_t *request=NULL;
ompi_wait_sync_t sync;
int sync_sets = 0, sync_unsets = 0;

WAIT_SYNC_INIT(&sync, 1);

Expand All @@ -108,6 +110,8 @@ int ompi_request_default_wait_any(size_t count,
completed = i;
*index = i;
goto after_sync_wait;
} else {
sync_sets++;
}
}

Expand All @@ -116,7 +120,8 @@ int ompi_request_default_wait_any(size_t count,
if (MPI_STATUS_IGNORE != status) {
*status = ompi_status_empty;
}
WAIT_SYNC_RELEASE(&sync);
/* No signal-in-flight can be in this case */
WAIT_SYNC_RELEASE_NOWAIT(&sync);
return rc;
}

Expand All @@ -138,8 +143,17 @@ int ompi_request_default_wait_any(size_t count,
*/
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING) ) {
*index = i;
} else {
sync_unsets++;
}
}

if( sync_sets == sync_unsets ){
/* set signalled flag so we won't
* block in WAIT_SYNC_RELEASE
*/
WAIT_SYNC_SIGNALLED(&sync);
}

request = requests[*index];
assert( REQUEST_COMPLETE(request) );
Expand Down Expand Up @@ -361,6 +375,7 @@ int ompi_request_default_wait_some(size_t count,
ompi_request_t **rptr = NULL;
ompi_request_t *request = NULL;
ompi_wait_sync_t sync;
bool will_be_signalled = false;

WAIT_SYNC_INIT(&sync, 1);

Expand All @@ -383,13 +398,19 @@ int ompi_request_default_wait_some(size_t count,
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, &sync) ) {
/* If the request is completed go ahead and mark it as such */
assert( REQUEST_COMPLETE(request) );
/* TODO: make sure MPI spec is not strict about index order */
indices[num_requests_done] = i;
num_requests_done++;
REQUEST_UNMARK(request);
} else {
REQUEST_MARK(request);
}
}

if(num_requests_null_inactive == count) {
*outcount = MPI_UNDEFINED;
WAIT_SYNC_RELEASE(&sync);
/* nobody will signall us */
WAIT_SYNC_RELEASE_NOWAIT(&sync);
return rc;
}

Expand All @@ -402,23 +423,33 @@ int ompi_request_default_wait_some(size_t count,
/* Clean up the synchronization primitives */

rptr = requests;
num_requests_done = 0;
for (size_t i = 0; i < count; i++, rptr++) {
request = *rptr;

if( request->req_state == OMPI_REQUEST_INACTIVE ) {
/* Skip inactive and already accounted requests */
if( request->req_state == OMPI_REQUEST_INACTIVE || !REQUEST_MARKED(request) ) {
continue;
}
/* Atomically mark the request as pending. If this succeed
* then the request was not completed, and it is now marked as
* pending. Otherwise, the request is complete )either it was
* before or it has been meanwhile). The major drawback here
* is that we will do all the atomics operations in all cases.
/* Atomically mark the request as pending.
* If this succeed - then the request was not completed,
* and it is now marked as pending.
* Otherwise, the request is complete meanwhile.
*/
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING) ) {
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING) ) {
indices[num_requests_done] = i;
num_requests_done++;
}
/* at least one of requests was completed during this call
* corresponding thread will signal us
*/
will_be_signalled = true;
}
}

if( !will_be_signalled ){
/* nobody knows about us,
* set signa-in-progress flag to false
*/
WAIT_SYNC_SIGNALLED(&sync);
}

WAIT_SYNC_RELEASE(&sync);
Expand Down
8 changes: 8 additions & 0 deletions ompi/request/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2015-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -112,6 +113,7 @@ struct ompi_request_t {
ompi_request_complete_fn_t req_complete_cb; /**< Called when the request is MPI completed */
void *req_complete_cb_data;
ompi_mpi_object_t req_mpi_object; /**< Pointer to MPI object that created this request */
bool marked;
};

/**
Expand Down Expand Up @@ -151,6 +153,12 @@ typedef struct ompi_predefined_request_t ompi_predefined_request_t;


#define REQUEST_COMPLETE(req) (REQUEST_COMPLETED == (req)->req_complete)

#define REQUEST_MARK(req) ( (req)->marked = true )
#define REQUEST_UNMARK(req) ( (req)->marked = false )
#define REQUEST_MARKED(req) ( (req)->marked )


/**
* Finalize a request. This is a macro to avoid function call
* overhead, since this is typically invoked in the critical
Expand Down
33 changes: 30 additions & 3 deletions opal/threads/wait_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* reserved.
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand All @@ -24,26 +25,50 @@ typedef struct ompi_wait_sync_t {
pthread_mutex_t lock;
struct ompi_wait_sync_t *next;
struct ompi_wait_sync_t *prev;
volatile bool signaling;
} ompi_wait_sync_t;

#define REQUEST_PENDING (void*)0L
#define REQUEST_COMPLETED (void*)1L

#define SYNC_WAIT(sync) (opal_using_threads() ? sync_wait_mt (sync) : sync_wait_st (sync))

/* The loop in release handles a race condition between the signaling
* thread and the destruction of the condition variable. The signaling
* member will be set to false after the final signaling thread has
* finished opertating on the sync object. This is done to avoid
* extra atomics in the singalling function and keep it as fast
* as possible. Note that the race window is small so spinning here
* is more optimal than sleeping since this macro is called in
* the critical path. */
#define WAIT_SYNC_RELEASE(sync) \
if (opal_using_threads()) { \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
while ((sync)->signaling) { \
continue; \
} \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
}

#define WAIT_SYNC_RELEASE_NOWAIT(sync) \
if (opal_using_threads()) { \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
}


#define WAIT_SYNC_SIGNAL(sync) \
if (opal_using_threads()) { \
pthread_mutex_lock(&(sync->lock)); \
pthread_cond_signal(&sync->condition); \
pthread_mutex_unlock(&(sync->lock)); \
sync->signaling = false; \
}

#define WAIT_SYNC_SIGNALLED(sync){ \
(sync)->signaling = false; \
}

OPAL_DECLSPEC int sync_wait_mt(ompi_wait_sync_t *sync);
static inline int sync_wait_st (ompi_wait_sync_t *sync)
{
Expand All @@ -61,6 +86,7 @@ static inline int sync_wait_st (ompi_wait_sync_t *sync)
(sync)->next = NULL; \
(sync)->prev = NULL; \
(sync)->status = 0; \
(sync)->signaling = true; \
if (opal_using_threads()) { \
pthread_cond_init (&(sync)->condition, NULL); \
pthread_mutex_init (&(sync)->lock, NULL); \
Expand All @@ -81,8 +107,9 @@ static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int sta
}
} else {
/* this is an error path so just use the atomic */
opal_atomic_swap_32 (&sync->count, 0);
sync->status = OPAL_ERROR;
opal_atomic_wmb ();
opal_atomic_swap_32 (&sync->count, 0);
}
WAIT_SYNC_SIGNAL(sync);
}
Expand Down