Skip to content

Commit 376ab94

Browse files
committed
WIP: Restructure thread flag exchange
1 parent 4e1249a commit 376ab94

File tree

7 files changed

+325
-53
lines changed

7 files changed

+325
-53
lines changed

src/sst/core/simulation.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,9 +1047,12 @@ Simulation_impl::setup_interactive_mode()
10471047
}
10481048

10491049
// Special case, invoke interactive console now rather than wait for sync
1050+
//if ( num_ranks.rank == 1 && num_ranks.thread > 1 && offset == 0 ) { // SKK
10501051
if ( num_ranks.thread > 1 && offset == 0 ) {
10511052
enter_interactive_ = true;
10521053
syncManager->handleInteractiveConsole();
1054+
enter_interactive_ = false;
1055+
printf("After handleInteractiveConsole: sim_->enter_interactive %d\n", enter_interactive_);
10531056
}
10541057
else {
10551058
InteractiveAction* act =

src/sst/core/sync/syncManager.cc

Lines changed: 152 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
#include <sys/time.h>
3434
#include <unistd.h>
3535

36+
#define SKK 0
37+
3638
namespace SST {
3739

3840
class InteractiveConsole;
@@ -170,6 +172,27 @@ class EmptyThreadSync : public ThreadSync
170172
return false;
171173
}
172174

175+
void setShutdownFlags(bool UNUSED(enter_shutdown),
176+
Simulation_impl::ShutdownMode_t UNUSED(shutdown_mode)) override {}
177+
178+
void setFlags(bool UNUSED(enter_interactive), bool UNUSED(enter_shutdown),
179+
Simulation_impl::ShutdownMode_t UNUSED(shutdown_mode)) override {}
180+
181+
void getShutdownFlags( bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) override
182+
{
183+
enter_shutdown = false;
184+
shutdown_mode = Simulation_impl::ShutdownMode_t::SHUTDOWN_CLEAN;
185+
}
186+
187+
void getFlags( bool& enter_interactive, bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) override
188+
{
189+
enter_interactive = false;
190+
getShutdownFlags(enter_shutdown, shutdown_mode);
191+
}
192+
193+
/** Clear interactive flags before next run */
194+
void clearFlags() override {}
195+
173196
/** Register a Link which this Sync Object is responsible for */
174197
void registerLink(const std::string& UNUSED(name), Link* UNUSED(link)) override {}
175198
ActivityQueue* registerRemoteLink(int UNUSED(tid), const std::string& UNUSED(name), Link* UNUSED(link)) override
@@ -372,27 +395,67 @@ SyncManager::exchangeLinkInfo()
372395
rankSync_->exchangeLinkInfo(rank_.rank);
373396
}
374397

375-
void
398+
void
399+
SyncManager::getSimShutdownFlags(bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) {
400+
401+
// Get sim flags to exchange in threadSync
402+
enter_shutdown = sim_->enter_shutdown_;
403+
shutdown_mode = sim_->shutdown_mode_;
404+
}
405+
406+
// sim_->getSimFlags(enter_interactive, enter_shutdown, shutdown_mode) checkpoint?
407+
void
408+
SyncManager::getSimFlags(bool& enter_interactive, bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) {
409+
410+
// Get sim flags to exchange in threadSync
411+
enter_interactive = sim_->enter_interactive_;
412+
getSimShutdownFlags(enter_shutdown, shutdown_mode);
413+
#if 0
414+
// Can't clear here because we use to decide whether triggered
415+
sim_->enter_interactive_ = false;
416+
sim_->enter_shutdown_ = false;
417+
sim_->shutdown_mode_ = Simulation_impl::SHUTDOWN_CLEAN;
418+
#endif
419+
}
420+
421+
void //bool
376422
SyncManager::handleShutdown()
377423
{
424+
// If console is handled only by thread 0, then only thread 0 needs to check for cmd_shutdown here
425+
// The watchpoint action shutdown can be triggered by any thread
378426
#if 0
379427
// Check for shutdown
380428
Output& out = sim_->getSimulationOutput();
381429
out.output("skk:syncmgr:handleShutdown:Before T%d:endSim=%d, shutdown_mode=%d, enter_shutdown=%d, \n", rank_.thread,
382430
sim_->endSim, sim_->shutdown_mode_, sim_->enter_shutdown_);
383431
#endif
384-
// If my thread's enter_shutdown_ is true, then set shared enter_shutdown
385-
if ( sim_->enter_shutdown_ == true ) {
386-
enter_shutdown_.fetch_or(true);
387-
endSim_.fetch_or(true);
388-
if ( sim_->shutdown_mode_ == true ) {
389-
shutdown_mode_.store(true);
390-
}
391-
}
392-
ic_barrier_.wait();
393-
if ( endSim_ == true ) {
432+
433+
//printf("Rank%d, Thread%d: Enter handleShutdown: sim->: enter_interactive %d, enter_shutdown %d, shutdown_mode %d\n",
434+
// rank_.rank, rank_.thread, sim_->enter_interactive_, sim_->enter_shutdown_, sim_->shutdown_mode_);
435+
436+
//bool enter_interactive;
437+
bool enter_shutdown;
438+
Simulation_impl::ShutdownMode_t shutdown_mode;
439+
440+
getSimShutdownFlags(enter_shutdown, shutdown_mode); // get flags from sim_
441+
threadSync_->setShutdownFlags(enter_shutdown, shutdown_mode); // Atomically update shared values in threadSync
442+
ic_barrier_.wait(); // barrier
443+
threadSync_->getShutdownFlags(enter_shutdown, shutdown_mode); // Get shared (cumulative) flags from threadSync
444+
445+
if ( enter_shutdown ) {
394446
sim_->setEndSim();
447+
//printf("Rank%d, Thread%d: handleShutdown setEndSim: \n", rank_.rank, rank_.thread);
448+
// Clear sim flags
449+
// SKK May be duplicative for now because this function called directly in sim for --interactive-start=0
450+
sim_->enter_interactive_ = false;
451+
sim_->enter_shutdown_ = false;
452+
// SKK No need to threadSync-> clear because we're just going to shutdown?
453+
//return true;
395454
}
455+
456+
//printf("After HandleShutdown: sim: sim_->enter_interactive %d, sim_->enter_shutdown %d, sim_->shutdown_mode %d\n",
457+
// enter_interactive, enter_shutdown, shutdown_mode);
458+
//return false;
396459
}
397460

398461
void
@@ -402,23 +465,13 @@ SyncManager::handleInteractiveConsole()
402465

403466
// Handle interactive console for multithreaded runs
404467
// Serial execution handles this in simulation run so it happens right away
405-
if ( num_ranks_.thread > 1 ) {
468+
assert(num_ranks_.thread != 1); // SKK I should never have a threadSync with a single thread, correct?
406469

407-
// 1) Check local enter_interactive_ and set mask if needed (could use mask to show triggers)
408-
if ( sim_->enter_interactive_ == true ) {
409-
unsigned bit = 1UL << rank_.thread;
410-
enter_interactive_mask_.fetch_or(bit);
411-
}
412-
ic_barrier_.wait(); // Ensure everyone has written the mask before checking
413-
414-
// If enter interactive set for any thread, set shared ic_mask
415-
unsigned ic_mask = enter_interactive_mask_.load();
416470
#if 0
417471
Output& out = sim_->getSimulationOutput();
418472
out.output("skk:syncmgr:execute: T%d: check enter_interactive_=%d\n", rank_.thread, sim_->enter_interactive_);
419473
out.output("skk:syncmgr:execute: T%d: enter_interactive_mask_=0x%x\n", rank_.thread, ic_mask);
420474
#endif
421-
if ( ic_mask ) {
422475
// 2) Print list of threads (in order) and whether triggered
423476
for ( uint32_t tindex = 0; tindex < num_ranks_.thread; tindex++ ) {
424477
if ( rank_.thread == tindex ) {
@@ -474,11 +527,14 @@ SyncManager::handleInteractiveConsole()
474527
}
475528
}
476529
ic_barrier_.wait();
477-
handleShutdown(); // Check if console issued shutdown command
530+
// If console is handled only by thread 0, then only thread 0 needs to check for cmd_shutdown here
531+
// Could we handle this directly? If cmd_shutdown, then return SHUTDOWN (-3) and share it across threads?
532+
533+
handleShutdown(); // Check if console issued shutdown command SKK have this return true/false for shutdown to skip next step
478534

479535
tid = current_ic_thread_.load();
480536
ic_state = current_ic_state_.load();
481-
// out.output("T%d: tid %d, ic_state %d\n", rank_.thread, tid, ic_state);
537+
//out.output("T%d: tid %d, ic_state %d\n", rank_.thread, tid, ic_state);
482538
if ( ic_state == InteractiveConsole::ICretcode::SUMMARY ) { // Print thread info summary
483539
for ( uint32_t tindex = 0; tindex < num_ranks_.thread; tindex++ ) {
484540
if ( rank_.thread == tindex ) {
@@ -494,24 +550,51 @@ SyncManager::handleInteractiveConsole()
494550
current_ic_state_.store(0);
495551
} // if state == SUMMARY, print thread info summary
496552
}
497-
498-
// 5) When done, clear interactive mask and enter interactive flags for next round
499-
if ( rank_.thread == 0 ) {
500-
enter_interactive_mask_.store(0);
501-
}
553+
554+
// SKK Right now this is called directly for --interactive-start=0, so we need to clear here
502555
sim_->enter_interactive_ = false;
556+
//printf("After while loop: sim: sim_->enter_interactive %d\n", sim_->enter_interactive_);
557+
// SKK Do I need to clear outside flags? I should not need shared enter_ic or shutdown inside here
558+
// Ensure everyone has latest flags before clearing
503559
ic_barrier_.wait();
504-
ic_mask = enter_interactive_mask_.load();
505-
// out.output("skk:syncmgr:execute: T%d: After: check enter_interactive_=%d\n", rank_.thread,
506-
// sim_->enter_interactive_); out.output("skk:syncmgr:execute: T%d: AFter: enter_interactive_mask_=0x%x\n",
507-
// rank_.thread, ic_mask);
508-
}
509-
} // end if num_ranks_.thread > 1: Handle interactive console
560+
threadSync_->clearFlags();
561+
562+
}
563+
564+
void
565+
SyncManager::partitionInfo() {
566+
Output& out = sim_->getSimulationOutput();
567+
for (uint32_t rindex = 0; rindex < num_ranks_.rank; rindex++ ) {
568+
for ( uint32_t tindex = 0; tindex < num_ranks_.thread; tindex++ ) {
569+
if ( rank_.rank == rindex && rank_.thread == tindex ) {
570+
out.output("Rank:%d, Thread:%d (Process %d)", rank_.rank, rank_.thread, getppid());
571+
// Print component summary
572+
if ( sim_->interactive_ != nullptr ) {
573+
sim_->interactive_->summary();
574+
}
575+
}
576+
}
577+
}
510578
}
511579

512580
void
513581
SyncManager::execute()
514582
{
583+
#if 0 // SKK
584+
std::string type = "RANK";
585+
if (next_sync_type_ == THREAD)
586+
type = "THREAD";
587+
#if 1
588+
Output& out = sim_->getSimulationOutput();
589+
out.output("SyncManager::execute: Rank %d: Thread %d: Type %s\n",
590+
rank_.rank, rank_.thread, type.c_str());
591+
#else
592+
std::cout << "SyncManager::execute: Rank " << rank_.rank
593+
<< ": Thread " << rank_.thread
594+
<< ": Type " << type << std::endl;
595+
#endif
596+
#endif // SKK
597+
515598
SST_SYNC_PROFILE_START
516599

517600
if ( profile_tools_ ) profile_tools_->syncManagerStart();
@@ -573,13 +656,14 @@ SyncManager::execute()
573656
next_checkpoint_time = checkpoint_->check(getDeliveryTime());
574657

575658

576-
#if 1
659+
#if 0
577660
// Handle interactive console
578661
if ( rank_.thread == 0 ) {
579662
// std::cout << "skk: syncmgr rank: t0: interactive execute\n";
580663
if ( sim_->enter_interactive_ == true ) {
581664
sim_->enter_interactive_ = false; // IC may schedule IC again
582-
if ( sim_->interactive_ != nullptr ) sim_->interactive_->execute(sim_->interactive_msg_);
665+
//if ( sim_->interactive_ != nullptr ) sim_->interactive_->execute(sim_->interactive_msg_);
666+
printf("Rank%d, Thread%d: In RANK interactive console\n", rank_.rank, rank_.thread);
583667
}
584668
}
585669

@@ -604,7 +688,23 @@ SyncManager::execute()
604688
real_time_->getSignals(sig_end, sig_usr, sig_alrm);
605689
threadSync_->setSignals(sig_end, sig_usr, sig_alrm);
606690
}
607-
threadSync_->execute(); // exchange event queues
691+
692+
#if 1 // Move exchange of enter_interactive, shutdown, and checkpoint flags here, similar to getSignals
693+
// Not that only thread 0 receives signals so it is the only one to execute above
694+
// However, any thread can trigger interactive or shutdown, so need to have all threads store
695+
// That is also why the setFlags must be atomic
696+
bool enter_interactive;
697+
bool enter_shutdown;
698+
Simulation_impl::ShutdownMode_t shutdown_mode;
699+
if (num_ranks_.rank == 1) { // SKK Do we need to do this in all cases or does it get handled in RANK?
700+
// Get local sim flags
701+
getSimFlags(enter_interactive, enter_shutdown, shutdown_mode);
702+
// Each thread atomically sets shared flags in threadSync
703+
threadSync_->setFlags(enter_interactive, enter_shutdown, shutdown_mode);
704+
}
705+
706+
#endif
707+
threadSync_->execute(); // exchange event queues, includes barrier
608708

609709
// Handle signals for multi-threaded runs/no MPI
610710
if ( num_ranks_.rank == 1 ) {
@@ -618,14 +718,7 @@ SyncManager::execute()
618718
real_time_->performSignal(sig_end);
619719
else if ( signals_received ) {
620720
if ( sig_usr ) real_time_->performSignal(sig_usr);
621-
#if 0
622721
if ( sig_alrm ) real_time_->performSignal(sig_alrm);
623-
#else
624-
if ( sig_alrm ) {
625-
// out.output("skk:syncmgr:execute: T%d: in sigalrm\n", rank_.thread);
626-
real_time_->performSignal(sig_alrm);
627-
}
628-
#endif
629722
}
630723

631724
// Check local checkpoint generate flag and set shared generate if needed.
@@ -640,9 +733,21 @@ SyncManager::execute()
640733
next_checkpoint_time = checkpoint_->check(getDeliveryTime());
641734
ckpt_generate_.store(0);
642735

736+
threadSync_->getFlags(enter_interactive, enter_shutdown, shutdown_mode);
737+
//printf("After threadSync_->getFlags: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
738+
// enter_interactive, enter_shutdown, shutdown_mode);
739+
if (enter_shutdown) {
740+
sim_->setEndSim();
741+
ic_barrier_.wait();
742+
threadSync_->clearFlags();
743+
}
744+
else if (enter_interactive) {
745+
handleInteractiveConsole(); // Check of any thread set interactive console
746+
}
747+
//threadSync_->clearFlags(); // SKK May currently be duplicated at end of handleIC for ic-start=0 case
748+
//printf("After threadSync_->clearFlags: enter_interactive %d, enter_shutdown %d, shutdown_mode %d \n",
749+
// enter_interactive, enter_shutdown, shutdown_mode);
643750

644-
handleShutdown(); // Check if any thread set shutdown
645-
handleInteractiveConsole(); // Check of any thread set interactive console
646751

647752
} // if num_ranks_.rank == 1 i.e. only multithreading
648753

@@ -750,12 +855,9 @@ SyncManager::addProfileTool(Profile::SyncProfileTool* tool)
750855
}
751856

752857
std::atomic<unsigned> SyncManager::ckpt_generate_ { 0 };
753-
std::atomic<unsigned> SyncManager::enter_interactive_mask_ { 0 };
754858
std::atomic<int> SyncManager::current_ic_thread_ { 0 };
755859
std::atomic<int> SyncManager::current_ic_state_ { 0 };
756-
std::atomic<unsigned> SyncManager::enter_shutdown_ { 0 };
757860
std::atomic<unsigned> SyncManager::endSim_ { false };
758-
std::atomic<unsigned> SyncManager::shutdown_mode_ { false };
759861
Core::ThreadSafe::Barrier SyncManager::ic_barrier_;
760862

761863

src/sst/core/sync/syncManager.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "sst/core/action.h"
1616
#include "sst/core/link.h"
1717
#include "sst/core/rankInfo.h"
18+
#include "sst/core/simulation_impl.h"
1819
#include "sst/core/sst_types.h"
1920
#include "sst/core/threadsafe.h"
2021

@@ -109,6 +110,15 @@ class ThreadSync
109110
/** Return exchanged signals after sync */
110111
virtual bool getSignals(int& end, int& usr, int& alrm) = 0;
111112

113+
/** Set interactive flags to exchange during sync */
114+
virtual void setShutdownFlags(bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode) = 0;
115+
virtual void setFlags(bool enter_interactive, bool enter_shutdown, Simulation_impl::ShutdownMode_t shutdown_mode) = 0;
116+
/** Return exchanged interactive flags after sync */
117+
virtual void getShutdownFlags( bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) = 0;
118+
virtual void getFlags( bool& enter_interactive, bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode) = 0;
119+
/** Clear interactive flags before next run */
120+
virtual void clearFlags() = 0;
121+
112122
virtual SimTime_t getNextSyncTime() { return nextSyncTime; }
113123
virtual void setRestartTime(SimTime_t time) { nextSyncTime = time; }
114124

@@ -191,18 +201,18 @@ class SyncManager : public Action
191201
RealTimeManager* real_time_;
192202
CheckpointAction* checkpoint_;
193203
static std::atomic<unsigned> ckpt_generate_;
194-
static std::atomic<unsigned> enter_interactive_mask_;
195204
static std::atomic<int> current_ic_thread_;
196205
static std::atomic<int> current_ic_state_;
197-
static std::atomic<unsigned> enter_shutdown_;
198206
static std::atomic<unsigned> endSim_;
199-
static std::atomic<unsigned> shutdown_mode_;
200207
static Core::ThreadSafe::Barrier ic_barrier_;
201208

202209
SyncProfileToolList* profile_tools_ = nullptr;
203210

204211
void computeNextInsert(SimTime_t next_checkpoint_time = MAX_SIMTIME_T);
205212
void setupSyncObjects();
213+
void getSimShutdownFlags(bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode);
214+
void getSimFlags(bool& enter_interactive, bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode);
215+
void partitionInfo();
206216
};
207217

208218
} // namespace SST

0 commit comments

Comments
 (0)