Skip to content

Commit b82e750

Browse files
committed
WIP: multirank command test
1 parent a81ae61 commit b82e750

File tree

6 files changed

+249
-3
lines changed

6 files changed

+249
-3
lines changed

src/sst/core/sync/rankSyncParallelSkip.cc

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,4 +647,106 @@ std::atomic<bool> RankSyncParallelSkip::enter_shutdown_(false);
647647
std::atomic<unsigned> RankSyncParallelSkip::shutdown_mode_(0);
648648
std::atomic<bool> RankSyncParallelSkip::generate_ckpt_(false);
649649

650+
// SKK Test Producer Consumer
651+
int32_t RankSyncParallelSkip::test_rid_(0);
652+
int32_t RankSyncParallelSkip::test_tid_(0);
653+
int32_t RankSyncParallelSkip::test_cmd_(0); // 0 = DONE, 1 = PRINT
654+
655+
void
656+
RankSyncParallelSkip::testManager()
657+
{
658+
#ifdef SST_CONFIG_HAVE_MPI
659+
RankInfo rank_ = Simulation_impl::getSimulation()->getRank();
660+
int32_t cmd_buffer[3];
661+
int32_t result_buffer[1];
662+
int tag = 0;
663+
int src;
664+
int dst;
665+
666+
// Static variables shared between threads within the rank
667+
static std::atomic<int32_t> tid = 0;
668+
static std::atomic<int32_t> cmd = 0;
669+
static std::atomic<bool> done = false;
670+
static std::atomic<int32_t> result = 0;
671+
static Core::ThreadSafe::Barrier exchange_barrier(num_ranks_.thread);
672+
673+
Output::getDefaultObject().output("----ParallelSkip\n");
674+
675+
if (rank_.rank == 0 && rank_.thread == 0) { // Handles send of commands
676+
// Send print command to all ranks/threads
677+
cmd_buffer[2] = 1; // PRINT
678+
for (uint32_t rindex = 1; rindex < num_ranks_.rank; rindex++ ) {
679+
for ( uint32_t tindex = 0; tindex < num_ranks_.thread; tindex++ ) {
680+
Output::getDefaultObject().output("**Mgr R%d T%d Send PRINT: Dest R%d, T%d\n",
681+
rank_.rank, rank_.thread, rindex, tindex);
682+
// Send print command to correct rank and wait for return
683+
cmd_buffer[0] = rindex;
684+
cmd_buffer[1] = tindex;
685+
dst = rindex;
686+
MPI_Send(cmd_buffer, 3, MPI_INT32_T, dst, tag, MPI_COMM_WORLD );
687+
src = rindex;
688+
MPI_Recv(result_buffer, 1, MPI_INT32_T, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
689+
assert(result_buffer[0] == 1);
690+
} // for Threads
691+
} // for Ranks
692+
// Send done command to all ranks (can't use Bcast bc others don't know its coming)
693+
cmd_buffer[2] = 0; // DONE
694+
for (uint32_t rindex = 1; rindex < num_ranks_.rank; rindex++ ) {
695+
Output::getDefaultObject().output("**Mgr R%d T%d Send DONE: Dest R%d\n",
696+
rank_.rank, rank_.thread, rindex);
697+
// Send done command to correct rank and wait for return
698+
cmd_buffer[0] = rindex;
699+
cmd_buffer[1] = 0;
700+
dst = rindex;
701+
MPI_Send(cmd_buffer, 3, MPI_INT32_T, dst, tag, MPI_COMM_WORLD);
702+
} // for Ranks
703+
704+
} // end rank 0, thread 0
705+
else if (rank_.rank != 0) { // Other ranks
706+
Output::getDefaultObject().output("**Enter Worker: Rank:%d, Thread:%d\n", rank_.rank, rank_.thread);
707+
src = 0;
708+
dst = 0;
709+
tag = 0;
710+
while (!done.load()) {
711+
if (rank_.thread == 0) {
712+
MPI_Recv(cmd_buffer, 3, MPI_INT32_T, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
713+
Output::getDefaultObject().output("**Worker R%d T%d Recv: R%d, T%d cmd%d\n",
714+
rank_.rank, rank_.thread, cmd_buffer[0], cmd_buffer[1], cmd_buffer[2]);
715+
tid.store(cmd_buffer[1]);
716+
cmd.store(cmd_buffer[2]);
717+
if (cmd_buffer[2] == 0) { // DONE
718+
done.store(true);
719+
Output::getDefaultObject().output("**Worker DONE: R%d, T%d\n", rank_.rank, rank_.thread);
720+
}
721+
}
722+
723+
// Barrier before loading shared variables
724+
exchange_barrier.wait();
725+
726+
// Not DONE, process command
727+
if (cmd.load() != 0) {
728+
if (rank_.thread == tid.load()) {
729+
result.store(1);
730+
Output::getDefaultObject().output("**Worker PRINT: R%d, T%d\n", rank_.rank, rank_.thread);
731+
}
732+
// Wait to access result
733+
exchange_barrier.wait();
734+
// Thread 0 sends cmd result back to Rank 0
735+
if (rank_.thread == 0) {
736+
result_buffer[0] = result.load();
737+
MPI_Send(result_buffer, 1, MPI_INT32_T, dst, tag, MPI_COMM_WORLD);
738+
Output::getDefaultObject().output("**Worker R%d T%d Send: R%d, T%d\n",
739+
rank_.rank, rank_.thread, cmd_buffer[0], cmd_buffer[1]);
740+
}
741+
}
742+
743+
} // while !done
744+
} // end other ranks
745+
else { // other R0 threads, do nothing for now
746+
Output::getDefaultObject().output("**Enter NOP: R%d, T%d\n", rank_.rank, rank_.thread);
747+
} // other R0 threads
748+
#endif
749+
}
750+
751+
650752
} // namespace SST

src/sst/core/sync/rankSyncParallelSkip.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ class RankSyncParallelSkip : public RankSync
7272

7373
uint64_t getDataSize() const override;
7474

75+
// Test manager/worker
76+
void testManager() override;
77+
7578
private:
7679
static SimTime_t myNextSyncTime;
7780

@@ -155,6 +158,12 @@ class RankSyncParallelSkip : public RankSync
155158
static std::atomic<bool> enter_shutdown_;
156159
static std::atomic<unsigned> shutdown_mode_;
157160
static std::atomic<bool> generate_ckpt_;
161+
162+
// Test Manager/worker
163+
// SKK Test Producer Consumer
164+
static int32_t test_rid_;
165+
static int32_t test_tid_;
166+
static int32_t test_cmd_; // 0 = DONE, 1 = PRINT
158167
};
159168

160169
} // namespace SST

src/sst/core/sync/rankSyncSerialSkip.cc

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,4 +497,98 @@ std::atomic<bool> RankSyncSerialSkip::enter_shutdown_(false);
497497
std::atomic<unsigned> RankSyncSerialSkip::shutdown_mode_(0);
498498
std::atomic<bool> RankSyncSerialSkip::generate_ckpt_(false);
499499

500+
// SKK Test Producer Consumer
501+
int32_t RankSyncSerialSkip::test_rid_(0);
502+
int32_t RankSyncSerialSkip::test_tid_(0);
503+
int32_t RankSyncSerialSkip::test_cmd_(0); // 0 = DONE, 1 = PRINT
504+
505+
void
506+
RankSyncSerialSkip::testManager()
507+
{
508+
#ifdef SST_CONFIG_HAVE_MPI
509+
RankInfo rank_ = Simulation_impl::getSimulation()->getRank();
510+
int32_t cmd_buffer[3];
511+
int32_t result_buffer[1];
512+
bool done = false;
513+
int tag = 0;
514+
int src;
515+
int dst;
516+
517+
Output::getDefaultObject().output("----SerialSkip\n");
518+
519+
if (rank_.rank == 0 && rank_.thread == 0) { // Handles send of commands
520+
// Send print commands to all ranks/threads
521+
cmd_buffer[2] = 1; // PRINT
522+
for (uint32_t rindex = 1; rindex < num_ranks_.rank; rindex++ ) {
523+
for ( uint32_t tindex = 0; tindex < num_ranks_.thread; tindex++ ) {
524+
Output::getDefaultObject().output("Mgr R%d T%d Send PRINT: Dest R%d, T%d\n",
525+
rank_.rank, rank_.thread, rindex, tindex);
526+
// Send print command to correct rank and wait for return
527+
cmd_buffer[0] = rindex;
528+
cmd_buffer[1] = tindex;
529+
dst = rindex;
530+
MPI_Send(cmd_buffer, 3, MPI_INT32_T, dst, tag, MPI_COMM_WORLD );
531+
src = rindex;
532+
MPI_Recv(result_buffer, 1, MPI_INT32_T, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
533+
assert(result_buffer[0] == 1);
534+
} // for Threads
535+
} // for Ranks
536+
// Send done commands to all ranks/threads
537+
cmd_buffer[2] = 0; // DONE
538+
for (uint32_t rindex = 1; rindex < num_ranks_.rank; rindex++ ) {
539+
for ( uint32_t tindex = 0; tindex < num_ranks_.thread; tindex++ ) {
540+
Output::getDefaultObject().output("Mgr R%d T%d Send DONE: Dest R%d, T%d\n",
541+
rank_.rank, rank_.thread, rindex, tindex);
542+
// Send done command to correct rank and wait for return
543+
cmd_buffer[0] = rindex;
544+
cmd_buffer[1] = tindex;
545+
dst = rindex;
546+
MPI_Send(cmd_buffer, 3, MPI_INT32_T, dst, tag, MPI_COMM_WORLD);
547+
#if 0
548+
src = rindex;
549+
MPI_Recv(result_buffer, 1, MPI_INT32_T, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
550+
assert(result_buffer[0] == 0);
551+
#endif
552+
} // for Threads
553+
} // for Ranks
554+
555+
556+
} // rank 0, thread 0
557+
else if (rank_.rank != 0) { // not rank 0, thread 0
558+
Output::getDefaultObject().output("Enter Worker: Rank:%d, Thread:%d\n", rank_.rank, rank_.thread);
559+
src = 0;
560+
dst = 0;
561+
while (!done) {
562+
MPI_Recv(cmd_buffer, 3, MPI_INT32_T, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
563+
if (cmd_buffer[2] == 0) { // DONE
564+
done = true;
565+
//result_buffer[0] = 0;
566+
Output::getDefaultObject().output("Worker DONE: Rank:%d, Thread:%d\n", rank_.rank, rank_.thread);
567+
//MPI_Send(result_buffer, 1, MPI_INT32_T, dst, tag, MPI_COMM_WORLD);
568+
} else { // PRINT
569+
result_buffer[0] = 1;
570+
Output::getDefaultObject().output("Worker PRINT: Rank:%d, Thread:%d\n", rank_.rank, rank_.thread);
571+
MPI_Send(result_buffer, 1, MPI_INT32_T, dst, tag, MPI_COMM_WORLD);
572+
}
573+
} // while !done
574+
}
575+
else { // other R0 threads, do nothing I should never have multiple
576+
assert (false);
577+
Output::getDefaultObject().output("Enter NOP: R%d, T%d\n", rank_.rank, rank_.thread);
578+
}
579+
#endif
580+
}
581+
582+
#if 0
583+
void
584+
RankSyncSerialSkip::testWorker()
585+
{
586+
#ifdef SST_CONFIG_HAVE_MPI
587+
bool done = false;
588+
while (!done) {
589+
MPI_Receive
590+
}
591+
#endif
592+
}
593+
#endif
500594
} // namespace SST

src/sst/core/sync/rankSyncSerialSkip.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ class RankSyncSerialSkip : public RankSync
7070

7171
uint64_t getDataSize() const override;
7272

73+
// Test manager/worker
74+
void testManager() override;
75+
7376
private:
7477
static SimTime_t myNextSyncTime;
7578

@@ -105,6 +108,13 @@ class RankSyncSerialSkip : public RankSync
105108
static std::atomic<bool> enter_shutdown_;
106109
static std::atomic<unsigned> shutdown_mode_;
107110
static std::atomic<bool> generate_ckpt_;
111+
112+
// Test Manager/worker
113+
// SKK Test Producer Consumer
114+
static int32_t test_rid_;
115+
static int32_t test_tid_;
116+
static int32_t test_cmd_; // 0 = DONE, 1 = PRINT
117+
108118
};
109119

110120
} // namespace SST

src/sst/core/sync/syncManager.cc

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ class EmptyRankSync : public RankSync
155155

156156
// Don't want to reset time for Empty Sync
157157
void setRestartTime(SimTime_t UNUSED(time)) override {}
158+
159+
void testManager() override {}
158160
};
159161

160162
class EmptyThreadSync : public ThreadSync
@@ -741,6 +743,28 @@ SyncManager::partitionInfo() {
741743
}
742744
}
743745

746+
void
747+
SyncManager::testProducerConsumer()
748+
{
749+
#if 1
750+
rankSync_->testManager();
751+
#else
752+
if (rank_.thread == 0)
753+
rankSync_->testManager();
754+
else
755+
rankSync_->testWorker();
756+
#endif
757+
#if 1
758+
Output& out = sim_->getSimulationOutput();
759+
out.output("**SyncManager::testProducerConsumer: before barrier R%d: T%d\n",
760+
rank_.rank, rank_.thread);
761+
#endif
762+
763+
RankExecBarrier_[0].wait();
764+
sim_->setEndSim();
765+
766+
}
767+
744768
void
745769
SyncManager::execute()
746770
{
@@ -887,9 +911,9 @@ SyncManager::execute()
887911
rank_.rank, rank_.thread, enter_interactive, enter_shutdown, shutdown_mode);
888912
partitionInfo();
889913

890-
#if 0
891-
// SKK - this should be rankHandle...
892-
rankHandleInteractiveConsole();
914+
#if 1
915+
// SKK Test Producer/Consumer
916+
testProducerConsumer();
893917
#else
894918
//if (rank_.rank == 0) { // SKK Temporary to test passing of watchpoint triggers
895919
if (rank_.rank == 0 && rank_.thread == 0) {

src/sst/core/sync/syncManager.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ class RankSync
8787

8888
virtual uint64_t getDataSize() const = 0;
8989

90+
// Test manager/worker SKK
91+
virtual void testManager() = 0;
92+
9093
protected:
9194
SimTime_t nextSyncTime;
9295
TimeConverter max_period;
@@ -196,6 +199,8 @@ class SyncManager : public Action
196199

197200
NotSerializable(SST::SyncManager)
198201

202+
203+
199204
private:
200205
// Enum to track the next sync type
201206
enum sync_type_t { RANK, THREAD };
@@ -229,6 +234,8 @@ class SyncManager : public Action
229234
void getSimShutdownFlags(bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode);
230235
void getSimFlags(bool& enter_interactive, bool& enter_shutdown, Simulation_impl::ShutdownMode_t& shutdown_mode, bool& generate_ckpt);
231236
void partitionInfo();
237+
// Test manager/worker
238+
void testProducerConsumer();
232239
};
233240

234241
} // namespace SST

0 commit comments

Comments
 (0)