Skip to content

Commit ca4322d

Browse files
bosilcaderbeyn
authored andcommitted
Fix multiple issues with the collective requests.
This patch addresses most (if not all) @derbeyn concerns expressed on open-mpi#1015. I added checks for the requests allocation in all functions, ompi_coll_base_free_reqs is called with the right number of requests, I removed the unnecessary basic_module_comm_t and use the base_module_comm_t instead, I remove all uses of the COLL_BASE_BCAST_USE_BLOCKING define, and other minor fixes. (cherry picked from commit 4b38b6b)
1 parent e39b9b9 commit ca4322d

21 files changed

+168
-169
lines changed

ompi/mca/coll/base/coll_base_alltoall.c

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2015 The University of Tennessee and The University
6+
* Copyright (c) 2004-2016 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@@ -43,7 +43,7 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
4343
{
4444
mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
4545
int i, j, size, rank, err = MPI_SUCCESS, line;
46-
MPI_Request *preq;
46+
ompi_request_t **preq, **reqs;
4747
char *tmp_buffer;
4848
size_t max_size;
4949
ptrdiff_t ext, true_lb, true_ext;
@@ -63,18 +63,19 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
6363
ompi_datatype_get_true_extent ( rdtype, &true_lb, &true_ext);
6464
max_size = true_ext + ext * (rcount-1);
6565

66+
/* Initiate all send/recv to/from others. */
67+
reqs = coll_base_comm_get_reqs(base_module->base_data, 2);
68+
if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }
69+
6670
/* Allocate a temporary buffer */
6771
tmp_buffer = calloc (max_size, 1);
68-
if (NULL == tmp_buffer) {
69-
return OMPI_ERR_OUT_OF_RESOURCE;
70-
}
72+
if (NULL == tmp_buffer) { return OMPI_ERR_OUT_OF_RESOURCE; }
7173
max_size = ext * rcount;
7274

7375
/* in-place alltoall slow algorithm (but works) */
7476
for (i = 0 ; i < size ; ++i) {
7577
for (j = i+1 ; j < size ; ++j) {
76-
/* Initiate all send/recv to/from others. */
77-
preq = coll_base_comm_get_reqs(base_module->base_data, size * 2);
78+
preq = reqs;
7879

7980
if (i == rank) {
8081
/* Copy the data into the temporary buffer */
@@ -111,7 +112,7 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
111112
}
112113

113114
/* Wait for the requests to complete */
114-
err = ompi_request_wait_all (2, base_module->base_data->mcct_reqs, MPI_STATUSES_IGNORE);
115+
err = ompi_request_wait_all (2, reqs, MPI_STATUSES_IGNORE);
115116
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
116117
}
117118
}
@@ -124,7 +125,7 @@ mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
124125
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
125126
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
126127
rank));
127-
ompi_coll_base_free_reqs(base_module->base_data->mcct_reqs, 2);
128+
ompi_coll_base_free_reqs(reqs, 2);
128129
}
129130

130131
/* All done */
@@ -399,20 +400,20 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
399400

400401
/* Post first batch or ireceive and isend requests */
401402
for (nreqs = 0, nrreqs = 0, ri = (rank + 1) % size; nreqs < total_reqs;
402-
ri = (ri + 1) % size, ++nreqs, ++nrreqs) {
403-
error =
404-
MCA_PML_CALL(irecv
405-
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
406-
MCA_COLL_BASE_TAG_ALLTOALL, comm, &reqs[nreqs]));
403+
ri = (ri + 1) % size, ++nrreqs) {
404+
nreqs++;
405+
error = MCA_PML_CALL(irecv
406+
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
407+
MCA_COLL_BASE_TAG_ALLTOALL, comm, &reqs[nreqs]));
407408
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
408409
}
409-
for ( nsreqs = 0, si = (rank + size - 1) % size; nreqs < 2 * total_reqs;
410-
si = (si + size - 1) % size, ++nreqs, ++nsreqs) {
411-
error =
412-
MCA_PML_CALL(isend
413-
(psnd + (ptrdiff_t)si * sext, scount, sdtype, si,
414-
MCA_COLL_BASE_TAG_ALLTOALL,
415-
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[nreqs]));
410+
for (nsreqs = 0, si = (rank + size - 1) % size; nreqs < 2 * total_reqs;
411+
si = (si + size - 1) % size, ++nsreqs) {
412+
nreqs++;
413+
error = MCA_PML_CALL(isend
414+
(psnd + (ptrdiff_t)si * sext, scount, sdtype, si,
415+
MCA_COLL_BASE_TAG_ALLTOALL,
416+
MCA_PML_BASE_SEND_STANDARD, comm, &reqs[nreqs]));
416417
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
417418
}
418419

@@ -440,11 +441,10 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
440441
ncreqs++;
441442
if (completed < total_reqs) {
442443
if (nrreqs < (size - 1)) {
443-
error =
444-
MCA_PML_CALL(irecv
445-
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
446-
MCA_COLL_BASE_TAG_ALLTOALL, comm,
447-
&reqs[completed]));
444+
error = MCA_PML_CALL(irecv
445+
(prcv + (ptrdiff_t)ri * rext, rcount, rdtype, ri,
446+
MCA_COLL_BASE_TAG_ALLTOALL, comm,
447+
&reqs[completed]));
448448
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
449449
++nrreqs;
450450
ri = (ri + 1) % size;
@@ -456,6 +456,7 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
456456
MCA_COLL_BASE_TAG_ALLTOALL,
457457
MCA_PML_BASE_SEND_STANDARD, comm,
458458
&reqs[completed]));
459+
if (MPI_SUCCESS != error) { line = __LINE__; goto error_hndl; }
459460
++nsreqs;
460461
si = (si + size - 1) % size;
461462
}
@@ -470,7 +471,7 @@ int ompi_coll_base_alltoall_intra_linear_sync(const void *sbuf, int scount,
470471
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
471472
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, error,
472473
rank));
473-
ompi_coll_base_free_reqs(reqs, 2 * total_reqs);
474+
ompi_coll_base_free_reqs(reqs, nreqs);
474475
return error;
475476
}
476477

@@ -607,14 +608,16 @@ int ompi_coll_base_alltoall_intra_basic_linear(const void *sbuf, int scount,
607608
/* Initiate all send/recv to/from others. */
608609

609610
req = rreq = coll_base_comm_get_reqs(data, (size - 1) * 2);
611+
if (NULL == req) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hndl; }
610612

611613
prcv = (char *) rbuf;
612614
psnd = (char *) sbuf;
613615

614616
/* Post all receives first -- a simple optimization */
615617

616618
for (nreqs = 0, i = (rank + 1) % size; i != rank;
617-
i = (i + 1) % size, ++rreq, ++nreqs) {
619+
i = (i + 1) % size, ++rreq) {
620+
nreqs++;
618621
err = MCA_PML_CALL(irecv_init
619622
(prcv + (ptrdiff_t)i * rcvinc, rcount, rdtype, i,
620623
MCA_COLL_BASE_TAG_ALLTOALL, comm, rreq));
@@ -627,7 +630,8 @@ int ompi_coll_base_alltoall_intra_basic_linear(const void *sbuf, int scount,
627630
*/
628631
sreq = rreq;
629632
for (i = (rank + size - 1) % size; i != rank;
630-
i = (i + size - 1) % size, ++sreq, ++nreqs) {
633+
i = (i + size - 1) % size, ++sreq) {
634+
nreqs++;
631635
err = MCA_PML_CALL(isend_init
632636
(psnd + (ptrdiff_t)i * sndinc, scount, sdtype, i,
633637
MCA_COLL_BASE_TAG_ALLTOALL,

ompi/mca/coll/base/coll_base_alltoallv.c

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2015 The University of Tennessee and The University
6+
* Copyright (c) 2004-2016 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@@ -44,7 +44,7 @@ mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts
4444
{
4545
mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
4646
int i, j, size, rank, err=MPI_SUCCESS;
47-
MPI_Request *preq;
47+
ompi_request_t **preq, **reqs;
4848
char *tmp_buffer;
4949
size_t max_size, rdtype_size;
5050
ptrdiff_t ext;
@@ -74,11 +74,14 @@ mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts
7474
return OMPI_ERR_OUT_OF_RESOURCE;
7575
}
7676

77+
/* Initiate all send/recv to/from others. */
78+
reqs = preq = coll_base_comm_get_reqs(base_module->base_data, 2);
79+
if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; goto error_hndl; }
80+
7781
/* in-place alltoallv slow algorithm (but works) */
7882
for (i = 0 ; i < size ; ++i) {
7983
for (j = i+1 ; j < size ; ++j) {
80-
/* Initiate all send/recv to/from others. */
81-
preq = coll_base_comm_get_reqs(base_module->base_data, 2);
84+
preq = reqs;
8285

8386
if (i == rank && rcounts[j]) {
8487
/* Copy the data into the temporary buffer */
@@ -115,7 +118,7 @@ mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts
115118
}
116119

117120
/* Wait for the requests to complete */
118-
err = ompi_request_wait_all (2, base_module->base_data->mcct_reqs, MPI_STATUSES_IGNORE);
121+
err = ompi_request_wait_all (2, reqs, MPI_STATUSES_IGNORE);
119122
if (MPI_SUCCESS != err) { goto error_hndl; }
120123
}
121124
}
@@ -124,7 +127,7 @@ mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts
124127
/* Free the temporary buffer */
125128
free (tmp_buffer);
126129
if( MPI_SUCCESS != err ) {
127-
ompi_coll_base_free_reqs(base_module->base_data->mcct_reqs, 2 );
130+
ompi_coll_base_free_reqs(reqs, 2 );
128131
}
129132

130133
/* All done */
@@ -205,7 +208,7 @@ ompi_coll_base_alltoallv_intra_basic_linear(const void *sbuf, const int *scounts
205208
int i, size, rank, err, nreqs;
206209
char *psnd, *prcv;
207210
ptrdiff_t sext, rext;
208-
MPI_Request *preq;
211+
ompi_request_t **preq, **reqs;
209212
mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
210213
mca_coll_base_comm_t *data = base_module->base_data;
211214

@@ -241,22 +244,21 @@ ompi_coll_base_alltoallv_intra_basic_linear(const void *sbuf, const int *scounts
241244

242245
/* Now, initiate all send/recv to/from others. */
243246
nreqs = 0;
244-
preq = coll_base_comm_get_reqs(data, 2 * size);
247+
reqs = preq = coll_base_comm_get_reqs(data, 2 * size);
248+
if( NULL == reqs ) { err = OMPI_ERR_OUT_OF_RESOURCE; goto err_hndl; }
245249

246250
/* Post all receives first */
247251
for (i = 0; i < size; ++i) {
248252
if (i == rank || 0 == rcounts[i]) {
249253
continue;
250254
}
251255

256+
++nreqs;
252257
prcv = ((char *) rbuf) + (ptrdiff_t)rdisps[i] * rext;
253258
err = MCA_PML_CALL(irecv_init(prcv, rcounts[i], rdtype,
254259
i, MCA_COLL_BASE_TAG_ALLTOALLV, comm,
255260
preq++));
256-
++nreqs;
257-
if (MPI_SUCCESS != err) {
258-
goto err_hndl;
259-
}
261+
if (MPI_SUCCESS != err) { goto err_hndl; }
260262
}
261263

262264
/* Now post all sends */
@@ -265,31 +267,30 @@ ompi_coll_base_alltoallv_intra_basic_linear(const void *sbuf, const int *scounts
265267
continue;
266268
}
267269

270+
++nreqs;
268271
psnd = ((char *) sbuf) + (ptrdiff_t)sdisps[i] * sext;
269272
err = MCA_PML_CALL(isend_init(psnd, scounts[i], sdtype,
270273
i, MCA_COLL_BASE_TAG_ALLTOALLV,
271274
MCA_PML_BASE_SEND_STANDARD, comm,
272275
preq++));
273-
++nreqs;
274-
if (MPI_SUCCESS != err) {
275-
goto err_hndl;
276-
}
276+
if (MPI_SUCCESS != err) { goto err_hndl; }
277277
}
278278

279279
/* Start your engines. This will never return an error. */
280-
MCA_PML_CALL(start(nreqs, data->mcct_reqs));
280+
MCA_PML_CALL(start(nreqs, reqs));
281281

282282
/* Wait for them all. If there's an error, note that we don't care
283283
* what the error was -- just that there *was* an error. The PML
284284
* will finish all requests, even if one or more of them fail.
285285
* i.e., by the end of this call, all the requests are free-able.
286286
* So free them anyway -- even if there was an error, and return the
287287
* error after we free everything. */
288-
err = ompi_request_wait_all(nreqs, data->mcct_reqs,
289-
MPI_STATUSES_IGNORE);
288+
err = ompi_request_wait_all(nreqs, reqs, MPI_STATUSES_IGNORE);
289+
if( MPI_SUCCESS == err )
290+
return MPI_SUCCESS;
290291
err_hndl:
291292
/* Free the requests in all cases as they are persistent */
292-
ompi_coll_base_free_reqs(data->mcct_reqs, nreqs);
293+
ompi_coll_base_free_reqs(reqs, nreqs);
293294

294295
return err;
295296
}

ompi/mca/coll/base/coll_base_barrier.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,16 @@ int ompi_coll_base_barrier_intra_basic_linear(struct ompi_communicator_t *comm,
347347

348348
else {
349349
requests = coll_base_comm_get_reqs(module->base_data, size);
350+
if( NULL == requests ) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto err_hndl; }
351+
350352
for (i = 1; i < size; ++i) {
351353
err = MCA_PML_CALL(irecv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE,
352354
MCA_COLL_BASE_TAG_BARRIER, comm,
353355
&(requests[i])));
354356
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
355357
}
356-
ompi_request_wait_all( size-1, requests+1, MPI_STATUSES_IGNORE );
358+
err = ompi_request_wait_all( size-1, requests+1, MPI_STATUSES_IGNORE );
359+
if (MPI_SUCCESS != err) { line = __LINE__; goto err_hndl; }
357360
requests = NULL; /* we're done the requests array is clean */
358361

359362
for (i = 1; i < size; ++i) {
@@ -370,7 +373,7 @@ int ompi_coll_base_barrier_intra_basic_linear(struct ompi_communicator_t *comm,
370373
OPAL_OUTPUT( (ompi_coll_base_framework.framework_output,"%s:%4d\tError occurred %d, rank %2d",
371374
__FILE__, line, err, rank) );
372375
if( NULL != requests )
373-
ompi_coll_base_free_reqs(requests, size-1);
376+
ompi_coll_base_free_reqs(requests, size);
374377
return err;
375378
}
376379
/* copied function (with appropriate renaming) ends here */

0 commit comments

Comments
 (0)