Skip to content

Commit f31aa1e

Browse files
committed
fix: cleanup unused but open non-blocking requests
reduces number of still allocated REQUEST handles at end of program down to about 1/2. still a lot of those there; don't know why. source of number of still allocated REQUESTS: output from MPICH3 debug build: In direct memory block for handle type REQUEST, 7 handles are still allocated In indirect memory block 0 for handle type REQUEST, 23 handles are still allocated In direct memory block for handle type REQUEST, 8 handles are still allocated In indirect memory block 0 for handle type REQUEST, 298 handles are still allocated Signed-off-by: Torbjörn Klatt <[email protected]>
1 parent 3a020a9 commit f31aa1e

File tree

2 files changed

+107
-26
lines changed

2 files changed

+107
-26
lines changed

include/pfasst/encap/vector.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ namespace pfasst
6161
*/
6262
VectorEncapsulation(Encapsulation<time>&& other);
6363

64-
virtual ~VectorEncapsulation() = default;
64+
virtual ~VectorEncapsulation();
6565
//! @}
6666

6767
//! @{

src/pfasst/encap/vector_impl.hpp

Lines changed: 106 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,48 @@ namespace pfasst
3434
: VectorEncapsulation(dynamic_cast<VectorEncapsulation<scalar, time>&&>(other))
3535
{}
3636

37+
template<typename scalar, typename time>
38+
VectorEncapsulation<scalar, time>::~VectorEncapsulation()
39+
{
40+
#ifdef WITH_MPI
41+
// TODO: refactor that request handler cleanup
42+
43+
int err = MPI_SUCCESS;
44+
45+
// check and finalize old request
46+
if (this->recv_request != MPI_REQUEST_NULL) {
47+
int old_complete = -1;
48+
MPI_Status test_old_stat;
49+
err = MPI_Request_get_status(this->recv_request, &old_complete, &test_old_stat);
50+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
51+
if (!(bool)old_complete) {
52+
err = MPI_Cancel(&(this->recv_request));
53+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
54+
}
55+
// cleanup resources
56+
err = MPI_Request_free(&(this->recv_request));
57+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
58+
assert(this->recv_request == MPI_REQUEST_NULL); // just to make sure
59+
}
60+
61+
// check and finalize old request
62+
if (this->send_request != MPI_REQUEST_NULL) {
63+
int old_complete = -1;
64+
MPI_Status test_old_stat;
65+
err = MPI_Request_get_status(this->send_request, &old_complete, &test_old_stat);
66+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
67+
if (!(bool)old_complete) {
68+
err = MPI_Cancel(&(this->send_request));
69+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
70+
}
71+
// cleanup resources
72+
err = MPI_Request_free(&(this->send_request));
73+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
74+
assert(this->send_request == MPI_REQUEST_NULL); // just to make sure
75+
}
76+
#endif
77+
}
78+
3779
template<typename scalar, typename time>
3880
void VectorEncapsulation<scalar, time>::zero()
3981
{
@@ -165,18 +207,40 @@ namespace pfasst
165207
}
166208

167209
#ifdef WITH_MPI
168-
template<typename scalar, typename time>
210+
template<typename scalar, typename time>
169211
void VectorEncapsulation<scalar, time>::post(ICommunicator* comm, int tag)
170212
{
171213
auto& mpi = as_mpi(comm);
172214
if (mpi.size() == 1) { return; }
173215
if (mpi.rank() == 0) { return; }
174216

175-
int err = MPI_Irecv(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
176-
(mpi.rank() - 1) % mpi.size(), tag, mpi.comm, &recv_request);
177-
if (err != MPI_SUCCESS) {
178-
throw MPIError();
217+
int err = MPI_SUCCESS;
218+
219+
MPI_Request new_recv_request = MPI_REQUEST_NULL;
220+
int src = (mpi.rank() - 1) % mpi.size();
221+
err = MPI_Irecv(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
222+
src, tag, mpi.comm, &new_recv_request);
223+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
224+
225+
// check and finalize old request
226+
if (this->recv_request != MPI_REQUEST_NULL) {
227+
int old_complete = -1;
228+
MPI_Status test_old_stat;
229+
err = MPI_Request_get_status(this->recv_request, &old_complete, &test_old_stat);
230+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
231+
if (!(bool)old_complete) {
232+
err = MPI_Cancel(&(this->recv_request));
233+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
234+
}
235+
// cleanup resources
236+
err = MPI_Request_free(&(this->recv_request));
237+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
238+
assert(this->recv_request == MPI_REQUEST_NULL); // just to make sure
179239
}
240+
241+
// keep the new request handler
242+
std::swap(this->recv_request, new_recv_request);
243+
assert(new_recv_request == MPI_REQUEST_NULL);
180244
}
181245

182246
template<typename scalar, typename time>
@@ -186,19 +250,23 @@ namespace pfasst
186250
if (mpi.size() == 1) { return; }
187251
if (mpi.rank() == 0) { return; }
188252

189-
int err;
253+
int err = MPI_SUCCESS;
254+
190255
if (blocking) {
191256
MPI_Status stat;
257+
int src = (mpi.rank() - 1) % mpi.size();
192258
err = MPI_Recv(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
193-
(mpi.rank() - 1) % mpi.size(), tag, mpi.comm, &stat);
259+
src, tag, mpi.comm, &stat);
194260
} else {
195261
MPI_Status stat;
196-
err = MPI_Wait(&recv_request, &stat);
262+
if (this->recv_request != MPI_REQUEST_NULL) {
263+
CLOG(DEBUG, "Encap") << "waiting on last recv request";
264+
err = MPI_Wait(&(this->recv_request), &stat);
265+
CLOG(DEBUG, "Encap") << "waiting done: " << stat;
266+
}
197267
}
198268

199-
if (err != MPI_SUCCESS) {
200-
throw MPIError();
201-
}
269+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
202270
}
203271

204272
template<typename scalar, typename time>
@@ -209,23 +277,38 @@ namespace pfasst
209277
if (mpi.rank() == mpi.size() - 1) { return; }
210278

211279
int err = MPI_SUCCESS;
280+
int dest = (mpi.rank() + 1) % mpi.size();
281+
212282
if (blocking) {
213-
err = MPI_Send(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
214-
(mpi.rank() + 1) % mpi.size(), tag, mpi.comm);
283+
err = MPI_Send(this->data(), sizeof(scalar) * this->size(), MPI_CHAR, dest, tag, mpi.comm);
215284
} else {
216-
MPI_Status stat;
217-
err = MPI_Wait(&send_request, &stat);
218-
if (err != MPI_SUCCESS) {
219-
throw MPIError();
285+
MPI_Request new_send_request = MPI_REQUEST_NULL;
286+
err = MPI_Isend(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
287+
dest, tag, mpi.comm, &new_send_request);
288+
289+
// check and finalize old request
290+
if (this->send_request != MPI_REQUEST_NULL) {
291+
int old_complete = -1;
292+
MPI_Status test_old_stat;
293+
err = MPI_Request_get_status(this->send_request, &old_complete, &test_old_stat);
294+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
295+
296+
if (!(bool)old_complete) {
297+
err = MPI_Cancel(&(this->send_request));
298+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
299+
}
300+
301+
// cleanup resources
302+
err = MPI_Request_free(&(this->send_request));
303+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
304+
assert(this->send_request == MPI_REQUEST_NULL); // just to make sure
220305
}
221306

222-
err = MPI_Isend(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
223-
(mpi.rank() + 1) % mpi.size(), tag, mpi.comm, &send_request);
307+
// keep the new request handler
308+
std::swap(this->send_request, new_send_request);
224309
}
225310

226-
if (err != MPI_SUCCESS) {
227-
throw MPIError();
228-
}
311+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
229312
}
230313

231314
template<typename scalar, typename time>
@@ -235,9 +318,7 @@ namespace pfasst
235318
int err = MPI_Bcast(this->data(), sizeof(scalar) * this->size(), MPI_CHAR,
236319
comm->size()-1, mpi.comm);
237320

238-
if (err != MPI_SUCCESS) {
239-
throw MPIError();
240-
}
321+
if (err != MPI_SUCCESS) { throw MPIError::from_code(err); }
241322
}
242323
#endif
243324

0 commit comments

Comments
 (0)