Skip to content

Commit ab9a53e

Browse files
bosilcahjelmn
authored andcommitted
Refactor the request test and wait functions.
(cherry picked from commit open-mpi/ompi@bfcf145) Signed-off-by: Nathan Hjelm <[email protected]>
1 parent 7cea31d commit ab9a53e

File tree

2 files changed

+102
-92
lines changed

2 files changed

+102
-92
lines changed

ompi/request/req_test.c

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
12
/*
23
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
34
* University Research and Technology
@@ -27,11 +28,12 @@
2728
#include "ompi/request/request_default.h"
2829
#include "ompi/request/grequest.h"
2930

30-
int ompi_request_default_test( ompi_request_t ** rptr,
31-
int *completed,
32-
ompi_status_public_t * status )
31+
int ompi_request_default_test(ompi_request_t ** rptr,
32+
int *completed,
33+
ompi_status_public_t * status )
3334
{
3435
ompi_request_t *request = *rptr;
36+
3537
#if OPAL_ENABLE_PROGRESS_THREADS == 0
3638
int do_it_once = 0;
3739

@@ -46,7 +48,7 @@ int ompi_request_default_test( ompi_request_t ** rptr,
4648
return OMPI_SUCCESS;
4749
}
4850

49-
if (request->req_complete) {
51+
if( REQUEST_COMPLETE(request) ) {
5052

5153
*completed = true;
5254
/* For a generalized request, we *have* to call the query_fn
@@ -116,7 +118,7 @@ int ompi_request_default_test_any(
116118
continue;
117119
}
118120

119-
if( request->req_complete ) {
121+
if( REQUEST_COMPLETE(request) ) {
120122

121123
*index = i;
122124
*completed = true;
@@ -191,7 +193,7 @@ int ompi_request_default_test_all(
191193
request = *rptr;
192194

193195
if( request->req_state == OMPI_REQUEST_INACTIVE ||
194-
request->req_complete) {
196+
REQUEST_COMPLETE(request) ) {
195197
num_completed++;
196198
}
197199
}

ompi/request/req_wait.c

Lines changed: 94 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ int ompi_request_default_wait_any(size_t count,
8484
ompi_status_public_t * status)
8585
{
8686
size_t completed = count, num_requests_null_inactive = 0;
87-
int rc = OMPI_SUCCESS;
87+
int i, rc = OMPI_SUCCESS;
8888
ompi_request_t **rptr=NULL;
8989
ompi_request_t *request=NULL;
9090
ompi_wait_sync_t sync;
@@ -93,13 +93,8 @@ int ompi_request_default_wait_any(size_t count,
9393

9494
rptr = requests;
9595
num_requests_null_inactive = 0;
96-
for (size_t i = 0; i < count; i++, rptr++) {
97-
request = *rptr;
98-
99-
/* Sanity test */
100-
if( NULL == request) {
101-
continue;
102-
}
96+
for (i = 0; i < count; i++) {
97+
request = requests[i];
10398

10499
/*
105100
* Check for null or completed persistent request.
@@ -110,11 +105,11 @@ int ompi_request_default_wait_any(size_t count,
110105
continue;
111106
}
112107

113-
if(!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, &sync)) {
108+
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, &sync) ) {
114109
assert(REQUEST_COMPLETE(request));
115-
wait_sync_update( &sync, 1, request->req_status.MPI_ERROR);
116110
completed = i;
117-
break;
111+
*index = i;
112+
goto after_sync_wait;
118113
}
119114
}
120115

@@ -128,11 +123,12 @@ int ompi_request_default_wait_any(size_t count,
128123
}
129124

130125
SYNC_WAIT(&sync);
131-
132-
/* recheck the complete status and clean up the sync primitives */
133-
rptr = requests;
134-
for(size_t i = 0, pending_count = completed; i < pending_count ; i++, rptr++) {
135-
request = *rptr;
126+
127+
after_sync_wait:
128+
/* recheck the complete status and clean up the sync primitives. Do it backward to
129+
* return the earliest complete request to the user. */
130+
for(i = completed-1; i >= 0; i--) {
131+
request = requests[i];
136132

137133
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
138134
continue;
@@ -142,15 +138,12 @@ int ompi_request_default_wait_any(size_t count,
142138
* the request has been completed meanwhile, and it has been atomically
143139
* marked as REQUEST_COMPLETE.
144140
*/
145-
OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING);
146-
if (REQUEST_COMPLETE(request)) {
147-
completed = i;
141+
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING) ) {
142+
*index = i;
148143
}
149144
}
150145

151-
rptr = requests + completed;
152-
request = *rptr;
153-
146+
request = requests[*index];
154147
assert( REQUEST_COMPLETE(request) );
155148
/* Per note above, we have to call gen request query_fn even
156149
if STATUS_IGNORE was provided */
@@ -172,9 +165,8 @@ int ompi_request_default_wait_any(size_t count,
172165
/* If there's an error while freeing the request,
173166
assume that the request is still there. Otherwise,
174167
Bad Things will happen later! */
175-
rc = ompi_request_free(rptr);
168+
rc = ompi_request_free(&requests[*index]);
176169
}
177-
*index = completed;
178170

179171
WAIT_SYNC_RELEASE(&sync);
180172
return rc;
@@ -185,127 +177,141 @@ int ompi_request_default_wait_all( size_t count,
185177
ompi_request_t ** requests,
186178
ompi_status_public_t * statuses )
187179
{
188-
size_t completed = 0, i, failed = 0;
180+
size_t completed = 0, failed = 0;
189181
ompi_request_t **rptr;
190182
ompi_request_t *request;
191-
int mpi_error = OMPI_SUCCESS;
183+
int i, mpi_error = OMPI_SUCCESS;
192184
ompi_wait_sync_t sync;
193185

194186
WAIT_SYNC_INIT(&sync, count);
195187
rptr = requests;
196188
for (i = 0; i < count; i++) {
197189
request = *rptr++;
198190

191+
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
192+
completed++;
193+
continue;
194+
}
195+
199196
if (!OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, &sync)) {
200197
if( OPAL_UNLIKELY( MPI_SUCCESS != request->req_status.MPI_ERROR ) ) {
201198
failed++;
202199
}
203200
completed++;
204201
}
205202
}
203+
if( failed > 0 ) {
204+
goto finish;
205+
}
206+
206207
if( 0 != completed ) {
207208
wait_sync_update(&sync, completed, OPAL_SUCCESS);
208209
}
209210

210-
if( failed > 0 ) {
211-
goto finish;
211+
/* wait until all requests complete or until an error is triggered. */
212+
mpi_error = SYNC_WAIT(&sync);
213+
if( OPAL_SUCCESS != mpi_error ) {
214+
/* if we are in an error case, increase the failed to ensure
215+
proper cleanup during the requests completion. */
216+
failed++;
212217
}
213218

214-
/*
215-
* acquire lock and test for completion - if all requests are
216-
* not completed pend on condition variable until a request
217-
* completes
218-
*/
219-
SYNC_WAIT(&sync);
220-
221219
finish:
222220
rptr = requests;
223221
if (MPI_STATUSES_IGNORE != statuses) {
224222
/* fill out status and free request if required */
225223
for( i = 0; i < count; i++, rptr++ ) {
226224
request = *rptr;
227225

228-
/*
229-
* Assert only if no requests were failed.
230-
* Since some may still be pending.
231-
*/
232-
if( 0 >= failed ) {
233-
assert( REQUEST_COMPLETE(request) );
234-
}
235-
236226
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
237227
statuses[i] = ompi_status_empty;
238228
continue;
239229
}
240-
if (OMPI_REQUEST_GEN == request->req_type) {
241-
ompi_grequest_invoke_query(request, &request->req_status);
242-
}
243230

244-
statuses[i] = request->req_status;
245-
/*
246-
* Per MPI 2.2 p 60:
247-
* Allows requests to be marked as MPI_ERR_PENDING if they are
248-
* "neither failed nor completed." Which can only happen if
249-
* there was an error in one of the other requests.
250-
*/
251231
if( OPAL_UNLIKELY(0 < failed) ) {
252-
if( !request->req_complete ) {
232+
/* if we have failed requests we skipped the waiting on the sync. Thus,
233+
* some of the requests might not be properly completed, in which case
234+
* we must detach all requests from the sync. However, if we can succesfully
235+
* mark the request as pending then it is neither failed nor complete, and
236+
* we must stop altering it.
237+
*/
238+
if( OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING ) ) {
239+
/*
240+
* Per MPI 2.2 p 60:
241+
* Allows requests to be marked as MPI_ERR_PENDING if they are
242+
* "neither failed nor completed." Which can only happen if
243+
* there was an error in one of the other requests.
244+
*/
253245
statuses[i].MPI_ERROR = MPI_ERR_PENDING;
254246
mpi_error = MPI_ERR_IN_STATUS;
255247
continue;
256248
}
257249
}
250+
assert( REQUEST_COMPLETE(request) );
251+
252+
if (OMPI_REQUEST_GEN == request->req_type) {
253+
ompi_grequest_invoke_query(request, &request->req_status);
254+
}
255+
256+
statuses[i] = request->req_status;
258257

259258
if( request->req_persistent ) {
260259
request->req_state = OMPI_REQUEST_INACTIVE;
261260
continue;
262-
} else {
263-
/* Only free the request if there is no error on it */
264-
if (MPI_SUCCESS == request->req_status.MPI_ERROR) {
265-
/* If there's an error while freeing the request,
266-
assume that the request is still there.
267-
Otherwise, Bad Things will happen later! */
268-
int tmp = ompi_request_free(rptr);
269-
if (OMPI_SUCCESS == mpi_error && OMPI_SUCCESS != tmp) {
270-
mpi_error = tmp;
271-
}
261+
}
262+
/* Only free the request if there is no error on it */
263+
if (MPI_SUCCESS == request->req_status.MPI_ERROR) {
264+
/* If there's an error while freeing the request,
265+
assume that the request is still there.
266+
Otherwise, Bad Things will happen later! */
267+
int tmp = ompi_request_free(rptr);
268+
if (OMPI_SUCCESS == mpi_error && OMPI_SUCCESS != tmp) {
269+
mpi_error = tmp;
272270
}
273271
}
274272
if( statuses[i].MPI_ERROR != OMPI_SUCCESS) {
275273
mpi_error = MPI_ERR_IN_STATUS;
276274
}
277275
}
278276
} else {
277+
int rc;
279278
/* free request if required */
280279
for( i = 0; i < count; i++, rptr++ ) {
281-
int rc;
282280
request = *rptr;
283281

282+
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
283+
rc = ompi_status_empty.MPI_ERROR;
284+
goto absorb_error_and_continue;
285+
}
284286
/*
285287
* Assert only if no requests were failed.
286288
* Since some may still be pending.
287289
*/
288-
if( 0 >= failed ) {
289-
assert( REQUEST_COMPLETE(request) );
290-
} else {
290+
if( OPAL_UNLIKELY(0 < failed) ) {
291291
/* If the request is still pending due to a failed request
292292
* then skip it in this loop.
293293
*/
294-
if( !REQUEST_COMPLETE(request) ) {
295-
continue;
296-
}
294+
if( OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING ) ) {
295+
/*
296+
* Per MPI 2.2 p 60:
297+
* Allows requests to be marked as MPI_ERR_PENDING if they are
298+
* "neither failed nor completed." Which can only happen if
299+
* there was an error in one of the other requests.
300+
*/
301+
rc = MPI_ERR_PENDING;
302+
goto absorb_error_and_continue;
303+
}
297304
}
305+
assert( REQUEST_COMPLETE(request) );
298306

299307
/* Per note above, we have to call gen request query_fn
300308
even if STATUSES_IGNORE was provided */
301309
if (OMPI_REQUEST_GEN == request->req_type) {
302310
rc = ompi_grequest_invoke_query(request, &request->req_status);
303311
}
304-
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
305-
rc = ompi_status_empty.MPI_ERROR;
306-
} else {
307-
rc = request->req_status.MPI_ERROR;
308-
}
312+
313+
rc = request->req_status.MPI_ERROR;
314+
309315
if( request->req_persistent ) {
310316
request->req_state = OMPI_REQUEST_INACTIVE;
311317
} else if (MPI_SUCCESS == rc) {
@@ -315,6 +321,7 @@ int ompi_request_default_wait_all( size_t count,
315321
mpi_error = tmp;
316322
}
317323
}
324+
absorb_error_and_continue:
318325
/*
319326
* Per MPI 2.2 p34:
320327
* "It is possible for an MPI function to return MPI_ERR_IN_STATUS
@@ -348,10 +355,6 @@ int ompi_request_default_wait_some(size_t count,
348355

349356
*outcount = 0;
350357

351-
/*
352-
* We only get here when outcount still is 0.
353-
* give up and sleep until completion
354-
*/
355358
rptr = requests;
356359
num_requests_null_inactive = 0;
357360
num_requests_done = 0;
@@ -367,21 +370,22 @@ int ompi_request_default_wait_some(size_t count,
367370
}
368371

369372
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, REQUEST_PENDING, &sync) ) {
373+
/* If the request is completed go ahead and mark it as such */
370374
assert( REQUEST_COMPLETE(request) );
371375
num_requests_done++;
372376
}
373377
}
378+
374379
if(num_requests_null_inactive == count) {
375380
*outcount = MPI_UNDEFINED;
376381
WAIT_SYNC_RELEASE(&sync);
377382
return rc;
378383
}
379384

380-
if( 0 != num_requests_done ) {
381-
/* As we only expect one trigger update the sync with count 1 */
382-
wait_sync_update(&sync, 1, request->req_status.MPI_ERROR);
385+
if( 0 == num_requests_done ) {
386+
/* One completed request is enough to satisfy the some condition */
387+
SYNC_WAIT(&sync);
383388
}
384-
SYNC_WAIT(&sync);
385389

386390
/* Do the final counting and */
387391
/* Clean up the synchronization primitives */
@@ -394,9 +398,13 @@ int ompi_request_default_wait_some(size_t count,
394398
if( request->req_state == OMPI_REQUEST_INACTIVE ) {
395399
continue;
396400
}
397-
401+
/* Atomically mark the request as pending. If this succeed
402+
* then the request was not completed, and it is now marked as
403+
* pending. Otherwise, the request is complete )either it was
404+
* before or it has been meanwhile). The major drawback here
405+
* is that we will do all the atomics operations in all cases.
406+
*/
398407
if( !OPAL_ATOMIC_CMPSET_PTR(&request->req_complete, &sync, REQUEST_PENDING) ) {
399-
assert(REQUEST_COMPLETE(request));
400408
indices[num_requests_done] = i;
401409
num_requests_done++;
402410
}

0 commit comments

Comments
 (0)