Skip to content

Commit 1a19376

Browse files
committed
Merge branch 'feature/pfstatus' into development
* feature/pfstatus: mpi: Minor tidying. tests: Re-organise the adaptive MLSDC tests a bit. tests: Add AdaptiveErrorTest for AD example to MPI PFASST. status: Bug fix. status: Move to blocking send/recv. status: Break iterations in PFASST when converged. status: Begin adding converged status MPI communication. Signed-off-by: Torbjörn Klatt <[email protected]> Conflicts: examples/advection_diffusion/advection_diffusion_sweeper.hpp
2 parents 549019c + fd9fbb3 commit 1a19376

File tree

7 files changed

+216
-45
lines changed

7 files changed

+216
-45
lines changed

examples/advection_diffusion/advection_diffusion_sweeper.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ namespace pfasst
141141

142142
auto n = this->get_controller()->get_step();
143143
auto k = this->get_controller()->get_iteration();
144-
CLOG(INFO, "Advec") << "err: " << n << " " << k << " " << max << " (" << qend.size() << "," << predict << ")";
145144

145+
CLOG(INFO, "Advec") << "err: " << n << " " << k << " " << max << " (" << qend.size() << "," << predict << ")";
146146
this->errors.insert(vtype(ktype(n, k), max));
147147
}
148148

examples/advection_diffusion/mpi_pfasst.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,20 @@ using namespace std;
2121

2222
#include "advection_diffusion_sweeper.hpp"
2323
#include "spectral_transfer_1d.hpp"
24+
2425
using namespace pfasst::encap;
2526
using namespace pfasst::mpi;
2627

27-
2828
namespace pfasst
2929
{
3030
namespace examples
3131
{
3232
namespace advection_diffusion
3333
{
34-
error_map run_mpi_pfasst()
34+
error_map run_mpi_pfasst(double abs_residual_tol, size_t niters=4)
3535
{
3636
const size_t nsteps = 4;
3737
const double dt = 0.01;
38-
const size_t niters = 4;
3938

4039
vector<pair<size_t, quadrature::QuadratureType>> nodes = {
4140
{ 3, quadrature::QuadratureType::GaussLobatto },
@@ -67,6 +66,7 @@ namespace pfasst
6766
pf.set_comm(&comm);
6867
pf.set_duration(0.0, nsteps * dt, dt, niters);
6968
pf.set_nsweeps({2, 1});
69+
pf.get_finest<AdvectionDiffusionSweeper<>>()->set_residual_tolerances(abs_residual_tol, 0.0);
7070
pf.run();
7171

7272
auto fine = pf.get_finest<AdvectionDiffusionSweeper<>>();
@@ -81,7 +81,7 @@ namespace pfasst
8181
int main(int argc, char** argv)
8282
{
8383
MPI_Init(&argc, &argv);
84-
pfasst::examples::advection_diffusion::run_mpi_pfasst();
84+
pfasst::examples::advection_diffusion::run_mpi_pfasst(0.0);
8585
fftw_cleanup();
8686
MPI_Finalize();
8787
}

include/pfasst/interfaces.hpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,56 @@ namespace pfasst
6767
}
6868
};
6969

70+
class IStatus;
71+
7072
class ICommunicator
7173
{
7274
public:
7375
virtual ~ICommunicator() { }
7476
virtual int size() = 0;
7577
virtual int rank() = 0;
78+
79+
shared_ptr<IStatus> status;
80+
};
81+
82+
class IStatus
83+
{
84+
protected:
85+
ICommunicator* comm;
86+
87+
public:
88+
virtual ~IStatus() { }
89+
virtual void clear() = 0;
90+
virtual void set_converged(bool converged) = 0;
91+
virtual bool get_converged(int rank) = 0;
92+
virtual void post() = 0;
93+
virtual void send() = 0;
94+
virtual void recv() = 0;
95+
96+
virtual void set_comm(ICommunicator* comm)
97+
{
98+
this->comm = comm;
99+
}
100+
101+
virtual bool previous_is_iterating()
102+
{
103+
if (this->comm->rank() == 0) {
104+
return false;
105+
}
106+
return !this->get_converged(this->comm->rank()-1);
107+
}
108+
109+
virtual bool keep_iterating()
110+
{
111+
if (this->comm->rank() == 0) {
112+
return !this->get_converged(0);
113+
}
114+
return !this->get_converged(this->comm->rank()) || !this->get_converged(this->comm->rank()-1);
115+
}
76116
};
77117

118+
119+
78120
/**
79121
* abstract SDC sweeper.
80122
* @tparam time time precision

include/pfasst/mpi_communicator.hpp

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
#define _PFASST_MPI_COMMUNICATOR_HPP_
77

88
#include <exception>
9+
#include <vector>
910

1011
#include <mpi.h>
1112

1213
#include "interfaces.hpp"
14+
#include "logging.hpp"
1315

1416
using namespace std;
1517

@@ -28,6 +30,8 @@ namespace pfasst
2830
}
2931
};
3032

33+
class MPIStatus;
34+
3135

3236
class MPICommunicator
3337
: public ICommunicator
@@ -50,9 +54,6 @@ namespace pfasst
5054
{
5155
set_comm(comm);
5256
}
53-
54-
virtual ~MPICommunicator()
55-
{}
5657
//! @}
5758

5859
//! @{
@@ -61,11 +62,97 @@ namespace pfasst
6162
this->comm = comm;
6263
MPI_Comm_size(this->comm, &(this->_size));
6364
MPI_Comm_rank(this->comm, &(this->_rank));
65+
66+
shared_ptr<MPIStatus> status = make_shared<MPIStatus>();
67+
this->status = status;
68+
this->status->set_comm(this);
6469
}
6570

6671
int size() { return this->_size; }
6772
int rank() { return this->_rank; }
68-
//! @
73+
//! @}
74+
};
75+
76+
77+
class MPIStatus
78+
: public IStatus
79+
{
80+
vector<bool> converged;
81+
MPICommunicator* mpi;
82+
83+
public:
84+
85+
virtual void set_comm(ICommunicator* comm)
86+
{
87+
this->comm = comm;
88+
this->converged.resize(comm->size());
89+
90+
this->mpi = dynamic_cast<MPICommunicator*>(comm); assert(this->mpi);
91+
}
92+
93+
virtual void clear() override
94+
{
95+
std::fill(converged.begin(), converged.end(), false);
96+
}
97+
98+
virtual void set_converged(bool converged) override
99+
{
100+
LOG(DEBUG) << "mpi rank " << this->comm->rank() << " set converged to " << converged;
101+
this->converged.at(this->comm->rank()) = converged;
102+
}
103+
104+
virtual bool get_converged(int rank) override
105+
{
106+
return this->converged.at(rank);
107+
}
108+
109+
virtual void post()
110+
{
111+
// noop: send/recv for status info is blocking
112+
}
113+
114+
virtual void send()
115+
{
116+
// don't send forward if: single processor run, or we're the last processor
117+
if (mpi->size() == 1) { return; }
118+
if (mpi->rank() == mpi->size() - 1) { return; }
119+
120+
int iconverged = converged.at(mpi->rank()) ? 1 : 0;
121+
122+
LOG(DEBUG) << "mpi rank " << this->comm->rank() << " status send " << iconverged;
123+
124+
int err = MPI_Send(&iconverged, sizeof(int), MPI_INT,
125+
(mpi->rank() + 1) % mpi->size(), 1, mpi->comm);
126+
127+
if (err != MPI_SUCCESS) {
128+
throw MPIError();
129+
}
130+
}
131+
132+
virtual void recv()
133+
{
134+
// don't recv if: single processor run, or we're the first processor
135+
if (mpi->size() == 1) { return; }
136+
if (mpi->rank() == 0) { return; }
137+
138+
if (get_converged(mpi->rank()-1)) {
139+
LOG(DEBUG) << "mpi rank " << this->comm->rank() << " skipping status recv";
140+
return;
141+
}
142+
143+
MPI_Status stat;
144+
int iconverged;
145+
int err = MPI_Recv(&iconverged, sizeof(iconverged), MPI_INT,
146+
(mpi->rank() - 1) % mpi->size(), 1, mpi->comm, &stat);
147+
148+
if (err != MPI_SUCCESS) {
149+
throw MPIError();
150+
}
151+
152+
converged.at(mpi->rank()-1) = iconverged == 1 ? true : false;
153+
154+
LOG(DEBUG) << "mpi rank " << this->comm->rank() << " status recv " << iconverged;
155+
}
69156
};
70157

71158
} // ::pfasst::mpi

include/pfasst/pfasst.hpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#ifndef _PFASST_PFASST_HPP_
66
#define _PFASST_PFASST_HPP_
77

8+
#include "logging.hpp"
89
#include "mlsdc.hpp"
910

1011
using namespace std;
@@ -72,9 +73,12 @@ namespace pfasst
7273
}
7374

7475
for (this->set_iteration(0);
75-
this->get_iteration() < this->get_max_iterations();
76+
this->get_iteration() < this->get_max_iterations() && this->comm->status->keep_iterating();
7677
this->advance_iteration()) {
77-
post();
78+
79+
if (this->comm->status->previous_is_iterating()) {
80+
post();
81+
}
7882
cycle_v(this->finest());
7983
}
8084

@@ -85,6 +89,8 @@ namespace pfasst
8589
if (nblock < nblocks - 1) {
8690
broadcast();
8791
}
92+
93+
this->comm->status->clear();
8894
}
8995
}
9096

@@ -100,8 +106,8 @@ namespace pfasst
100106

101107
perform_sweeps(l.level);
102108

103-
if (l == this->finest()) {
104-
// note: convergence tests belong here
109+
if (l == this->finest() && fine->converged()) {
110+
this->comm->status->set_converged(true);
105111
}
106112

107113
fine->send(comm, tag(l), false);
@@ -130,8 +136,10 @@ namespace pfasst
130136

131137
trns->interpolate(fine, crse, true);
132138

133-
fine->recv(comm, tag(l), false);
134-
trns->interpolate_initial(fine, crse);
139+
if (this->comm->status->previous_is_iterating()) {
140+
fine->recv(comm, tag(l), false);
141+
trns->interpolate_initial(fine, crse);
142+
}
135143

136144
if (l < this->finest()) {
137145
perform_sweeps(l.level);
@@ -147,9 +155,13 @@ namespace pfasst
147155
{
148156
auto crse = l.current();
149157

150-
crse->recv(comm, tag(l), true);
158+
if (this->comm->status->previous_is_iterating()) {
159+
crse->recv(comm, tag(l), true);
160+
}
161+
this->comm->status->recv();
151162
this->perform_sweeps(l.level);
152163
crse->send(comm, tag(l), true);
164+
this->comm->status->send();
153165
return l + 1;
154166
}
155167

@@ -221,6 +233,7 @@ namespace pfasst
221233

222234
void post()
223235
{
236+
this->comm->status->post();
224237
for (auto l = this->coarsest() + 1; l <= this->finest(); ++l) {
225238
l.current()->post(comm, tag(l));
226239
}

tests/examples/advection_diffusion/test_advection_diffusion.cpp

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,39 +35,43 @@ TEST(ErrorTest, VanillaSDC)
3535
auto get_iter = [](const vtype x) { return get<1>(get<0>(x)); };
3636
auto get_error = [](const vtype x) { return get<1>(x); };
3737

38-
{
39-
auto errors = run_vanilla_sdc(0.0);
40-
auto max_iter = get_iter(*std::max_element(errors.begin(), errors.end(),
41-
[get_iter](const vtype p1, const vtype p2) { return get_iter(p1) < get_iter(p2); }));
42-
43-
vector<double> tol = { 7e-9, 7e-9, 7e-9, 7e-9 };
44-
vector<double> err;
45-
for (auto& x: errors) {
46-
if (get_iter(x) == max_iter) {
47-
err.push_back(get_error(x));
48-
}
49-
}
38+
auto errors = run_vanilla_sdc(0.0);
39+
auto max_iter = get_iter(*std::max_element(errors.begin(), errors.end(),
40+
[get_iter](const vtype p1, const vtype p2) { return get_iter(p1) < get_iter(p2); }));
5041

51-
EXPECT_THAT(err, testing::Pointwise(DoubleLess(), tol));
52-
ASSERT_EQ(max_iter, (size_t)3);
42+
vector<double> tol = { 7e-9, 7e-9, 7e-9, 7e-9 };
43+
vector<double> err;
44+
for (auto& x: errors) {
45+
if (get_iter(x) == max_iter) {
46+
err.push_back(get_error(x));
47+
}
5348
}
5449

55-
{
56-
auto errors = run_vanilla_sdc(1.e-6);
57-
auto max_iter = get_iter(*std::max_element(errors.begin(), errors.end(),
58-
[get_iter](const vtype p1, const vtype p2) { return get_iter(p1) < get_iter(p2); }));
59-
60-
vector<double> tol = { 5e-8, 5e-8, 5e-8, 5e-8 };
61-
vector<double> err;
62-
for (auto& x: errors) {
63-
if (get_iter(x) == max_iter) {
64-
err.push_back(get_error(x));
65-
}
66-
}
50+
EXPECT_THAT(err, testing::Pointwise(DoubleLess(), tol));
51+
ASSERT_EQ(max_iter, (size_t) 3);
52+
}
53+
54+
TEST(AdaptiveErrorTest, VanillaSDC)
55+
{
56+
typedef error_map::value_type vtype;
6757

68-
EXPECT_THAT(err, testing::Pointwise(DoubleLess(), tol));
69-
ASSERT_EQ(max_iter, (size_t)2);
58+
auto get_iter = [](const vtype x) { return get<1>(get<0>(x)); };
59+
auto get_error = [](const vtype x) { return get<1>(x); };
60+
61+
auto errors = run_vanilla_sdc(1.e-6);
62+
auto max_iter = get_iter(*std::max_element(errors.begin(), errors.end(),
63+
[get_iter](const vtype p1, const vtype p2) { return get_iter(p1) < get_iter(p2); }));
64+
65+
vector<double> tol = { 5e-8, 5e-8, 5e-8, 5e-8 };
66+
vector<double> err;
67+
for (auto& x: errors) {
68+
if (get_iter(x) == max_iter) {
69+
err.push_back(get_error(x));
70+
}
7071
}
72+
73+
EXPECT_THAT(err, testing::Pointwise(DoubleLess(), tol));
74+
ASSERT_EQ(max_iter, (size_t) 2);
7175
}
7276

7377
TEST(ErrorTest, SerialMLSDC)

0 commit comments

Comments
 (0)