Skip to content

Commit fbb0b41

Browse files
committed
mpi: More MPI guards. Don't cancel, just assert.
Prefer checking recv_request in post instead of trying to cleanup/cancel open requests. Open requests are a bug in the controller logic and we should throw an error when we detect them.
1 parent 9b68f13 commit fbb0b41

File tree

1 file changed

+21
-90
lines changed

1 file changed

+21
-90
lines changed

src/pfasst/encap/vector_impl.hpp

Lines changed: 21 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33

44
#include "pfasst/encap/vector.hpp"
55

6+
#ifdef WITH_MPI
7+
#define CHKMPIERR(err) if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
8+
#endif
9+
610
namespace pfasst
711
{
812
namespace encap
@@ -38,41 +42,12 @@ namespace pfasst
3842
VectorEncapsulation<scalar, time>::~VectorEncapsulation()
3943
{
4044
#ifdef WITH_MPI
41-
// TODO: refactor that request handler cleanup
42-
43-
int err = MPI_SUCCESS;
44-
45-
// check and finalize old request
46-
if (this->recv_request != MPI_REQUEST_NULL) {
47-
int old_complete = -1;
48-
MPI_Status test_old_stat;
49-
err = MPI_Request_get_status(this->recv_request, &old_complete, &test_old_stat);
50-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
51-
if (!(bool)old_complete) {
52-
err = MPI_Cancel(&(this->recv_request));
53-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
54-
}
55-
// cleanup resources
56-
err = MPI_Request_free(&(this->recv_request));
57-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
58-
assert(this->recv_request == MPI_REQUEST_NULL); // just to make sure
59-
}
60-
61-
// check and finalize old request
6245
if (this->send_request != MPI_REQUEST_NULL) {
63-
int old_complete = -1;
64-
MPI_Status test_old_stat;
65-
err = MPI_Request_get_status(this->send_request, &old_complete, &test_old_stat);
66-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
67-
if (!(bool)old_complete) {
68-
err = MPI_Cancel(&(this->send_request));
69-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
70-
}
71-
// cleanup resources
72-
err = MPI_Request_free(&(this->send_request));
73-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
74-
assert(this->send_request == MPI_REQUEST_NULL); // just to make sure
46+
MPI_Status stat;
47+
int err = MPI_Wait(&(this->send_request), &stat); CHKMPIERR(err);
7548
}
49+
assert(this->recv_request == MPI_REQUEST_NULL);
50+
assert(this->send_request == MPI_REQUEST_NULL);
7651
#endif
7752
}
7853

@@ -214,33 +189,13 @@ namespace pfasst
214189
if (mpi.size() == 1) { return; }
215190
if (mpi.rank() == 0) { return; }
216191

217-
int err = MPI_SUCCESS;
218-
219-
MPI_Request new_recv_request = MPI_REQUEST_NULL;
220-
int src = (mpi.rank() - 1) % mpi.size();
221-
err = MPI_Irecv(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
222-
src, tag, mpi.comm, &new_recv_request);
223-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
224-
225-
// check and finalize old request
226192
if (this->recv_request != MPI_REQUEST_NULL) {
227-
int old_complete = -1;
228-
MPI_Status test_old_stat;
229-
err = MPI_Request_get_status(this->recv_request, &old_complete, &test_old_stat);
230-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
231-
if (!(bool)old_complete) {
232-
err = MPI_Cancel(&(this->recv_request));
233-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
234-
}
235-
// cleanup resources
236-
err = MPI_Request_free(&(this->recv_request));
237-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
238-
assert(this->recv_request == MPI_REQUEST_NULL); // just to make sure
193+
throw MPIError();
239194
}
240195

241-
// keep the new request handler
242-
std::swap(this->recv_request, new_recv_request);
243-
assert(new_recv_request == MPI_REQUEST_NULL);
196+
int src = (mpi.rank() - 1) % mpi.size();
197+
int err = MPI_Irecv(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
198+
src, tag, mpi.comm, &this->recv_request); CHKMPIERR(err);
244199
}
245200

246201
template<typename scalar, typename time>
@@ -250,23 +205,20 @@ namespace pfasst
250205
if (mpi.size() == 1) { return; }
251206
if (mpi.rank() == 0) { return; }
252207

208+
MPI_Status stat;
253209
int err = MPI_SUCCESS;
254210

255211
if (blocking) {
256-
MPI_Status stat;
257212
int src = (mpi.rank() - 1) % mpi.size();
258213
err = MPI_Recv(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
259-
src, tag, mpi.comm, &stat);
214+
src, tag, mpi.comm, &stat); CHKMPIERR(err);
260215
} else {
261-
MPI_Status stat;
262216
if (this->recv_request != MPI_REQUEST_NULL) {
263217
CLOG(DEBUG, "Encap") << "waiting on last recv request";
264-
err = MPI_Wait(&(this->recv_request), &stat);
218+
err = MPI_Wait(&(this->recv_request), &stat); CHKMPIERR(err);
265219
CLOG(DEBUG, "Encap") << "waiting done: " << stat;
266220
}
267221
}
268-
269-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
270222
}
271223

272224
template<typename scalar, typename time>
@@ -276,49 +228,28 @@ namespace pfasst
276228
if (mpi.size() == 1) { return; }
277229
if (mpi.rank() == mpi.size() - 1) { return; }
278230

231+
MPI_Status stat;
279232
int err = MPI_SUCCESS;
280233
int dest = (mpi.rank() + 1) % mpi.size();
281234

282235
if (blocking) {
283-
err = MPI_Send(this->data(), sizeof(scalar) * this->size(), MPI_CHAR, dest, tag, mpi.comm);
236+
err = MPI_Send(this->data(), sizeof(scalar) * this->size(), MPI_CHAR, dest, tag, mpi.comm); CHKMPIERR(err);
284237
} else {
285-
MPI_Request new_send_request = MPI_REQUEST_NULL;
238+
err = MPI_Wait(&(this->send_request), &stat); CHKMPIERR(err);
286239
err = MPI_Isend(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
287-
dest, tag, mpi.comm, &new_send_request);
288-
289-
// check and finalize old request
290-
if (this->send_request != MPI_REQUEST_NULL) {
291-
int old_complete = -1;
292-
MPI_Status test_old_stat;
293-
err = MPI_Request_get_status(this->send_request, &old_complete, &test_old_stat);
294-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
295-
296-
if (!(bool)old_complete) {
297-
err = MPI_Cancel(&(this->send_request));
298-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
299-
}
300-
301-
// cleanup resources
302-
err = MPI_Request_free(&(this->send_request));
303-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
304-
assert(this->send_request == MPI_REQUEST_NULL); // just to make sure
305-
}
306-
307-
// keep the new request handler
308-
std::swap(this->send_request, new_send_request);
240+
dest, tag, mpi.comm, &(this->send_request)); CHKMPIERR(err);
309241
}
310242

311-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
312243
}
313244

314245
template<typename scalar, typename time>
315246
void VectorEncapsulation<scalar, time>::broadcast(ICommunicator* comm)
316247
{
317248
auto& mpi = as_mpi(comm);
318249
int err = MPI_Bcast(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
319-
comm->size()-1, mpi.comm);
250+
comm->size()-1, mpi.comm); CHKMPIERR(err);
251+
320252

321-
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
322253
}
323254
#endif
324255

0 commit comments

Comments
 (0)