Skip to content

Commit 517ee5b

Browse files
committed
WIP: Initial single-thread, multi-rank rankSync support for interactive console flag exchange
1 parent 376ab94 commit 517ee5b

File tree

7 files changed

+592
-116
lines changed

7 files changed

+592
-116
lines changed

src/sst/core/simulation.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,7 +1009,7 @@ Simulation_impl::setup_interactive_mode()
10091009

10101010

10111011
// Start just with multithreading right now
1012-
if ( num_ranks.rank == 1 ) {
1012+
//if ( num_ranks.rank == 1 ) {
10131013
// std::cout << "skk: run: setup interactive\n";
10141014
if ( interactive_type_ != "" ) {
10151015
// --interactive-console used to override default
@@ -1047,20 +1047,20 @@ Simulation_impl::setup_interactive_mode()
10471047
}
10481048

10491049
// Special case, invoke interactive console now rather than wait for sync
1050-
//if ( num_ranks.rank == 1 && num_ranks.thread > 1 && offset == 0 ) { // SKK
1051-
if ( num_ranks.thread > 1 && offset == 0 ) {
1050+
if ( num_ranks.rank == 1 && num_ranks.thread > 1 && offset == 0 ) { // SKK
1051+
//if ( num_ranks.thread > 1 && offset == 0 ) {
10521052
enter_interactive_ = true;
10531053
syncManager->handleInteractiveConsole();
10541054
enter_interactive_ = false;
1055-
printf("After handleInteractiveConsole: sim_->enter_interactive %d\n", enter_interactive_);
1055+
//printf("After handleInteractiveConsole in setup_interactive_mode: sim_->enter_interactive %d\n", enter_interactive_);
10561056
}
10571057
else {
10581058
InteractiveAction* act =
10591059
new InteractiveAction(this, format_string("Interactive start at %" PRI_SIMTIME, offset));
10601060
act->insertIntoTimeVortex(currentSimCycle + offset);
10611061
}
10621062
}
1063-
}
1063+
//}
10641064
}
10651065

10661066
void

src/sst/core/sync/rankSyncParallelSkip.cc

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,79 @@ RankSyncParallelSkip::getSignals(int& end, int& usr, int& alrm)
159159
return sig_end_ || sig_usr_ || sig_alrm_;
160160
}
161161

162+
void
163+
RankSyncParallelSkip::setShutdownFlags(bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode)
164+
{
165+
// SKK This must be atomic because it can be set from any thread
166+
//printf("Enter rankSync setFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
167+
// enter_interactive, enter_shutdown, shutdown_mode);
168+
if (enter_shutdown) {
169+
enter_shutdown_.store(enter_shutdown);
170+
shutdown_mode_.store(static_cast<unsigned>(shutdown_mode));
171+
}
172+
//printf("Exit rankSync setFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
173+
// enter_interactive_.load(), enter_shutdown_.load(), shutdown_mode_.load());
174+
}
175+
176+
177+
void
178+
RankSyncParallelSkip::setFlags(bool enter_interactive, bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode)
179+
{
180+
// SKK This must be atomic because it can be set from any thread
181+
//printf("Enter threadSync setFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
182+
// enter_interactive, enter_shutdown, shutdown_mode);
183+
if (enter_interactive)
184+
enter_interactive_.store(enter_interactive);
185+
186+
setShutdownFlags(enter_shutdown, shutdown_mode);
187+
188+
//printf("Exit threadSync setFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
189+
// enter_interactive_.load(), enter_shutdown_.load(), shutdown_mode_.load());
190+
}
191+
192+
void
193+
RankSyncParallelSkip::getShutdownFlags( bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode)
194+
{
195+
enter_shutdown = enter_shutdown_.load();
196+
switch (shutdown_mode_) {
197+
case 0:
198+
shutdown_mode = Simulation_impl::ShutdownMode_t::SHUTDOWN_CLEAN;
199+
break;
200+
case 1:
201+
shutdown_mode = Simulation_impl::ShutdownMode_t::SHUTDOWN_SIGNAL;
202+
break;
203+
case 2:
204+
shutdown_mode = Simulation_impl::ShutdownMode_t::SHUTDOWN_EMERGENCY;
205+
break;
206+
}
207+
208+
//printf("ExitthreadSync getFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
209+
// enter_interactive, enter_shutdown, shutdown_mode);
210+
}
211+
212+
void
213+
RankSyncParallelSkip::getFlags( bool& enter_interactive, bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode)
214+
{
215+
216+
enter_interactive = enter_interactive_.load();
217+
getShutdownFlags( enter_shutdown, shutdown_mode);
218+
219+
//printf("ExitthreadSync getFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
220+
// enter_interactive, enter_shutdown, shutdown_mode);
221+
}
222+
223+
void
224+
RankSyncParallelSkip::clearFlags()
225+
{
226+
enter_interactive_.store(false);
227+
enter_shutdown_.store(false);
228+
shutdown_mode_.store(0);
229+
230+
//printf("Clear Flags: enter_interactive %d, enter_shutdown %d, shutdown_mode %d\n",
231+
// enter_interactive_, enter_shutdown_, shutdown_mode_);
232+
233+
}
234+
162235
uint64_t
163236
RankSyncParallelSkip::getDataSize() const
164237
{
@@ -187,6 +260,31 @@ RankSyncParallelSkip::execute(int thread)
187260
}
188261
}
189262

263+
void
264+
RankSyncParallelSkip::interactiveExchange()
265+
{
266+
#ifdef SST_CONFIG_HAVE_MPI
267+
int32_t local_flags[1] = { static_cast<int32_t>(enter_interactive_) };
268+
int32_t global_flags[1] = { 0 };
269+
MPI_Allreduce(&local_flags, &global_flags, 1, MPI_INT32_T, MPI_MAX, MPI_COMM_WORLD);
270+
271+
enter_interactive_ = global_flags[0];
272+
#endif
273+
}
274+
275+
void
276+
RankSyncParallelSkip::shutdownExchange()
277+
{
278+
#ifdef SST_CONFIG_HAVE_MPI
279+
int32_t local_flags[2] = { static_cast<int32_t>(enter_shutdown_), static_cast<int32_t>(shutdown_mode_) };
280+
int32_t global_flags[2] = { 0, 0 };
281+
MPI_Allreduce(&local_flags, &global_flags, 2, MPI_INT32_T, MPI_MAX, MPI_COMM_WORLD);
282+
283+
enter_shutdown_ = global_flags[0];
284+
shutdown_mode_ = global_flags[1];
285+
#endif
286+
}
287+
190288
void
191289
RankSyncParallelSkip::exchange_slave(int thread)
192290
{
@@ -385,6 +483,14 @@ RankSyncParallelSkip::exchange_master(int UNUSED(thread))
385483
sig_usr_ = global_signals[1];
386484
sig_alrm_ = global_signals[2];
387485

486+
int32_t local_flags[3] = { static_cast<int32_t>(enter_interactive_), static_cast<int32_t>(enter_shutdown_), static_cast<int32_t>(shutdown_mode_) };
487+
int32_t global_flags[3] = { 0, 0, 0 };
488+
MPI_Allreduce(&local_flags, &global_flags, 3, MPI_INT32_T, MPI_MAX, MPI_COMM_WORLD);
489+
490+
enter_interactive_ = global_flags[0];
491+
enter_shutdown_ = global_flags[1];
492+
shutdown_mode_ = global_flags[2];
493+
388494
#endif
389495
}
390496

@@ -507,5 +613,8 @@ RankSyncParallelSkip::deserializeMessage(comm_recv_pair* msg)
507613
int RankSyncParallelSkip::sig_end_(0);
508614
int RankSyncParallelSkip::sig_usr_(0);
509615
int RankSyncParallelSkip::sig_alrm_(0);
616+
std::atomic<bool> RankSyncParallelSkip::enter_interactive_(false);
617+
std::atomic<bool> RankSyncParallelSkip::enter_shutdown_(false);
618+
std::atomic<unsigned> RankSyncParallelSkip::shutdown_mode_(0);
510619

511620
} // namespace SST

src/sst/core/sync/rankSyncParallelSkip.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ class RankSyncParallelSkip : public RankSync
5252
/** Return exchanged signals after sync */
5353
bool getSignals(int& end, int& usr, int& alrm) override;
5454

55+
/** Set interactive flags to exchange during sync */
56+
// SKK Separated enter_interactive from from shutdown since they may be needed separately
57+
void setShutdownFlags(bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode) override;
58+
void setFlags(bool enter_interactive, bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode) override;
59+
/** Return exchanged interactive flags after sync */
60+
void getShutdownFlags( bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) override;
61+
void getFlags( bool& enter_interactive, bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) override;
62+
/** Clear interactive flags before next run */
63+
void clearFlags() override;
64+
void interactiveExchange() override;
65+
void shutdownExchange() override;
66+
5567
SimTime_t getNextSyncTime() override { return myNextSyncTime; }
5668

5769
void setRestartTime(SimTime_t time) override;
@@ -137,6 +149,9 @@ class RankSyncParallelSkip : public RankSync
137149
static int sig_end_;
138150
static int sig_usr_;
139151
static int sig_alrm_;
152+
static std::atomic<bool> enter_interactive_;
153+
static std::atomic<bool> enter_shutdown_;
154+
static std::atomic<unsigned> shutdown_mode_;
140155
};
141156

142157
} // namespace SST

src/sst/core/sync/rankSyncSerialSkip.cc

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,75 @@ RankSyncSerialSkip::getSignals(int& end, int& usr, int& alrm)
120120
return sig_end_ || sig_usr_ || sig_alrm_;
121121
}
122122

123+
void
124+
RankSyncSerialSkip::setShutdownFlags(bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode)
125+
{
126+
// SKK This must be atomic because it can be set from any thread
127+
//printf("Enter rankSync setFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
128+
// enter_interactive, enter_shutdown, shutdown_mode);
129+
if (enter_shutdown) {
130+
enter_shutdown_.store(enter_shutdown);
131+
shutdown_mode_.store(static_cast<unsigned>(shutdown_mode));
132+
}
133+
//printf("Exit rankSync setFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
134+
// enter_interactive_.load(), enter_shutdown_.load(), shutdown_mode_.load());
135+
}
136+
137+
138+
void
139+
RankSyncSerialSkip::setFlags(bool enter_interactive, bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode)
140+
{
141+
// SKK This must be atomic because it can be set from any thread
142+
//printf("Enter threadSync setFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
143+
// enter_interactive, enter_shutdown, shutdown_mode);
144+
if (enter_interactive)
145+
enter_interactive_.store(enter_interactive);
146+
147+
setShutdownFlags(enter_shutdown, shutdown_mode);
148+
149+
//printf("Exit threadSync setFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
150+
// enter_interactive_.load(), enter_shutdown_.load(), shutdown_mode_.load());
151+
}
152+
153+
void
154+
RankSyncSerialSkip::getShutdownFlags( bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode)
155+
{
156+
enter_shutdown = enter_shutdown_.load();
157+
switch (shutdown_mode_) {
158+
case 0:
159+
shutdown_mode = Simulation_impl::ShutdownMode_t::SHUTDOWN_CLEAN;
160+
break;
161+
case 1:
162+
shutdown_mode = Simulation_impl::ShutdownMode_t::SHUTDOWN_SIGNAL;
163+
break;
164+
case 2:
165+
shutdown_mode = Simulation_impl::ShutdownMode_t::SHUTDOWN_EMERGENCY;
166+
break;
167+
}
168+
169+
//printf("ExitthreadSync getFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
170+
// enter_interactive, enter_shutdown, shutdown_mode);
171+
}
172+
173+
void
174+
RankSyncSerialSkip::getFlags( bool& enter_interactive, bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode)
175+
{
176+
177+
enter_interactive = enter_interactive_.load();
178+
getShutdownFlags( enter_shutdown, shutdown_mode);
179+
180+
//printf("ExitthreadSync getFlags: \n input: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
181+
// enter_interactive, enter_shutdown, shutdown_mode);
182+
}
183+
184+
void
185+
RankSyncSerialSkip::clearFlags()
186+
{
187+
enter_interactive_.store(false);
188+
enter_shutdown_.store(false);
189+
shutdown_mode_.store(0);
190+
}
191+
123192
uint64_t
124193
RankSyncSerialSkip::getDataSize() const
125194
{
@@ -138,6 +207,31 @@ RankSyncSerialSkip::execute(int thread)
138207
}
139208
}
140209

210+
void
211+
RankSyncSerialSkip::interactiveExchange()
212+
{
213+
#ifdef SST_CONFIG_HAVE_MPI
214+
int32_t local_flags[1] = { static_cast<int32_t>(enter_interactive_) };
215+
int32_t global_flags[1] = { 0 };
216+
MPI_Allreduce(&local_flags, &global_flags, 1, MPI_INT32_T, MPI_MAX, MPI_COMM_WORLD);
217+
218+
enter_interactive_ = global_flags[0];
219+
#endif
220+
}
221+
222+
void
223+
RankSyncSerialSkip::shutdownExchange()
224+
{
225+
#ifdef SST_CONFIG_HAVE_MPI
226+
int32_t local_flags[2] = { static_cast<int32_t>(enter_shutdown_), static_cast<int32_t>(shutdown_mode_) };
227+
int32_t global_flags[2] = { 0, 0 };
228+
MPI_Allreduce(&local_flags, &global_flags, 2, MPI_INT32_T, MPI_MAX, MPI_COMM_WORLD);
229+
230+
enter_shutdown_ = global_flags[0];
231+
shutdown_mode_ = global_flags[1];
232+
#endif
233+
}
234+
141235
void
142236
RankSyncSerialSkip::exchange()
143237
{
@@ -262,6 +356,15 @@ RankSyncSerialSkip::exchange()
262356
sig_end_ = global_signals[0];
263357
sig_usr_ = global_signals[1];
264358
sig_alrm_ = global_signals[2];
359+
360+
int32_t local_flags[3] = { static_cast<int32_t>(enter_interactive_), static_cast<int32_t>(enter_shutdown_), static_cast<int32_t>(shutdown_mode_) };
361+
int32_t global_flags[3] = { 0, 0, 0 };
362+
MPI_Allreduce(&local_flags, &global_flags, 3, MPI_INT32_T, MPI_MAX, MPI_COMM_WORLD);
363+
364+
enter_interactive_ = global_flags[0];
365+
enter_shutdown_ = global_flags[1];
366+
shutdown_mode_ = global_flags[2];
367+
265368
#endif
266369
}
267370

@@ -360,5 +463,8 @@ RankSyncSerialSkip::exchangeLinkUntimedData(int UNUSED_WO_MPI(thread), std::atom
360463
int RankSyncSerialSkip::sig_end_(0);
361464
int RankSyncSerialSkip::sig_usr_(0);
362465
int RankSyncSerialSkip::sig_alrm_(0);
466+
std::atomic<bool> RankSyncSerialSkip::enter_interactive_(false);
467+
std::atomic<bool> RankSyncSerialSkip::enter_shutdown_(false);
468+
std::atomic<unsigned> RankSyncSerialSkip::shutdown_mode_(0);
363469

364470
} // namespace SST

src/sst/core/sync/rankSyncSerialSkip.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ class RankSyncSerialSkip : public RankSync
5050
/** Return exchanged signals after sync */
5151
bool getSignals(int& end, int& usr, int& alrm) override;
5252

53+
/** Set interactive flags to exchange during sync */
54+
// SKK Separated enter_interactive from from shutdown since they may be needed separately
55+
void setShutdownFlags(bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode) override;
56+
void setFlags(bool enter_interactive, bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode) override;
57+
/** Return exchanged interactive flags after sync */
58+
void getShutdownFlags( bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) override;
59+
void getFlags( bool& enter_interactive, bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) override;
60+
/** Clear interactive flags before next run */
61+
void clearFlags() override;
62+
void interactiveExchange() override;
63+
void shutdownExchange() override;
64+
5365
SimTime_t getNextSyncTime() override { return myNextSyncTime; }
5466

5567
void setRestartTime(SimTime_t time) override;
@@ -87,6 +99,9 @@ class RankSyncSerialSkip : public RankSync
8799
static int sig_end_;
88100
static int sig_usr_;
89101
static int sig_alrm_;
102+
static std::atomic<bool> enter_interactive_;
103+
static std::atomic<bool> enter_shutdown_;
104+
static std::atomic<unsigned> shutdown_mode_;
90105
};
91106

92107
} // namespace SST

0 commit comments

Comments
 (0)