Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.

Commit 440f73f

Browse files
authored
Merge pull request #1246 from hjelmn/v2.x_request_performance
v2.x request race fixes
2 parents 9b00cae + f390926 commit 440f73f

File tree

2 files changed

+55
-6
lines changed

2 files changed

+55
-6
lines changed

ompi/request/req_wait.c

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
1616
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
1717
* reserved.
18+
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
1819
* $COPYRIGHT$
1920
*
2021
* Additional copyrights may follow
@@ -116,7 +117,8 @@ int ompi_request_default_wait_any(size_t count,
116117
if (MPI_STATUS_IGNORE != status) {
117118
*status = ompi_status_empty;
118119
}
119-
WAIT_SYNC_RELEASE(&sync);
120+
/* No signal-in-flight can be in this case */
121+
WAIT_SYNC_RELEASE_NOWAIT(&sync);
120122
return rc;
121123
}
122124

@@ -140,6 +142,15 @@ int ompi_request_default_wait_any(size_t count,
140142
*index = i;
141143
}
142144
}
145+
146+
if( *index == completed ){
147+
/* Only one request has triggered. There was no
148+
* in-flight completions.
149+
* Drop the signalled flag so we won't block
150+
* in WAIT_SYNC_RELEASE
151+
*/
152+
WAIT_SYNC_SIGNALLED(&sync);
153+
}
143154

144155
request = requests[*index];
145156
assert( REQUEST_COMPLETE(request) );
@@ -348,7 +359,8 @@ int ompi_request_default_wait_some(size_t count,
348359
ompi_request_t **rptr = NULL;
349360
ompi_request_t *request = NULL;
350361
ompi_wait_sync_t sync;
351-
362+
size_t sync_sets = 0, sync_unsets = 0;
363+
352364
WAIT_SYNC_INIT(&sync, 1);
353365

354366
*outcount = 0;
@@ -373,10 +385,12 @@ int ompi_request_default_wait_some(size_t count,
373385
num_requests_done++;
374386
}
375387
}
388+
sync_sets = count - num_requests_null_inactive - num_requests_done;
376389

377390
if(num_requests_null_inactive == count) {
378391
*outcount = MPI_UNDEFINED;
379-
WAIT_SYNC_RELEASE(&sync);
392+
/* nobody will signall us */
393+
WAIT_SYNC_RELEASE_NOWAIT(&sync);
380394
return rc;
381395
}
382396

@@ -407,6 +421,14 @@ int ompi_request_default_wait_some(size_t count,
407421
num_requests_done++;
408422
}
409423
}
424+
sync_unsets = count - num_requests_null_inactive - num_requests_done;
425+
426+
if( sync_sets == sync_unsets ){
427+
/* nobody knows about us,
428+
* set signa-in-progress flag to false
429+
*/
430+
WAIT_SYNC_SIGNALLED(&sync);
431+
}
410432

411433
WAIT_SYNC_RELEASE(&sync);
412434

opal/threads/wait_sync.h

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* reserved.
66
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
77
* reserved.
8+
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
89
* $COPYRIGHT$
910
*
1011
* Additional copyrights may follow
@@ -26,26 +27,50 @@ typedef struct ompi_wait_sync_t {
2627
pthread_mutex_t lock;
2728
struct ompi_wait_sync_t *next;
2829
struct ompi_wait_sync_t *prev;
30+
volatile bool signaling;
2931
} ompi_wait_sync_t;
3032

3133
#define REQUEST_PENDING (void*)0L
3234
#define REQUEST_COMPLETED (void*)1L
3335

3436
#define SYNC_WAIT(sync) (opal_using_threads() ? sync_wait_mt (sync) : sync_wait_st (sync))
3537

38+
/* The loop in release handles a race condition between the signaling
39+
* thread and the destruction of the condition variable. The signaling
40+
* member will be set to false after the final signaling thread has
41+
* finished opertating on the sync object. This is done to avoid
42+
* extra atomics in the singalling function and keep it as fast
43+
* as possible. Note that the race window is small so spinning here
44+
* is more optimal than sleeping since this macro is called in
45+
* the critical path. */
3646
#define WAIT_SYNC_RELEASE(sync) \
3747
if (opal_using_threads()) { \
38-
pthread_cond_destroy(&(sync)->condition); \
39-
pthread_mutex_destroy(&(sync)->lock); \
48+
while ((sync)->signaling) { \
49+
continue; \
50+
} \
51+
pthread_cond_destroy(&(sync)->condition); \
52+
pthread_mutex_destroy(&(sync)->lock); \
4053
}
4154

55+
#define WAIT_SYNC_RELEASE_NOWAIT(sync) \
56+
if (opal_using_threads()) { \
57+
pthread_cond_destroy(&(sync)->condition); \
58+
pthread_mutex_destroy(&(sync)->lock); \
59+
}
60+
61+
4262
#define WAIT_SYNC_SIGNAL(sync) \
4363
if (opal_using_threads()) { \
4464
pthread_mutex_lock(&(sync->lock)); \
4565
pthread_cond_signal(&sync->condition); \
4666
pthread_mutex_unlock(&(sync->lock)); \
67+
sync->signaling = false; \
4768
}
4869

70+
#define WAIT_SYNC_SIGNALLED(sync){ \
71+
(sync)->signaling = false; \
72+
}
73+
4974
OPAL_DECLSPEC int sync_wait_mt(ompi_wait_sync_t *sync);
5075
static inline int sync_wait_st (ompi_wait_sync_t *sync)
5176
{
@@ -63,6 +88,7 @@ static inline int sync_wait_st (ompi_wait_sync_t *sync)
6388
(sync)->next = NULL; \
6489
(sync)->prev = NULL; \
6590
(sync)->status = 0; \
91+
(sync)->signaling = true; \
6692
if (opal_using_threads()) { \
6793
pthread_cond_init (&(sync)->condition, NULL); \
6894
pthread_mutex_init (&(sync)->lock, NULL); \
@@ -83,8 +109,9 @@ static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int sta
83109
}
84110
} else {
85111
/* this is an error path so just use the atomic */
86-
opal_atomic_swap_32 (&sync->count, 0);
87112
sync->status = OPAL_ERROR;
113+
opal_atomic_wmb ();
114+
opal_atomic_swap_32 (&sync->count, 0);
88115
}
89116
WAIT_SYNC_SIGNAL(sync);
90117
}

0 commit comments

Comments
 (0)