Skip to content

Commit 8ee6b70

Browse files
committed
WIP: add multirank structure to simpleDebug
1 parent b82e750 commit 8ee6b70

File tree

3 files changed

+303
-1
lines changed

3 files changed

+303
-1
lines changed

src/sst/core/impl/interactive/simpleDebug.cc

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,10 @@ SimpleDebugger::SimpleDebugger(Params& params) :
6464
[this](std::vector<std::string>& tokens) { return cmd_pwd(tokens); } },
6565
{ "chdir", "cd", "change 1 directory level in the object map", ConsoleCommandGroup::NAVIGATION,
6666
[this](std::vector<std::string>& tokens) { return cmd_cd(tokens); } },
67+
#if 1
6768
{ "list", "ls", "list the objects in the current level of the object map", ConsoleCommandGroup::NAVIGATION,
6869
[this](std::vector<std::string>& tokens) { return cmd_ls(tokens); } },
70+
#endif
6971
{ "time", "tm", "print current simulation time in cycles", ConsoleCommandGroup::STATE,
7072
[this](std::vector<std::string>& tokens) { return cmd_time(tokens); } },
7173
{ "print", "p", "[-rN] [<obj>]: print objects at the current level", ConsoleCommandGroup::STATE,
@@ -248,13 +250,19 @@ SimpleDebugger::summary()
248250

249251
int
250252
SimpleDebugger::execute(const std::string& msg)
253+
//SimpleDebugger::consoleExecute(const std::string& msg)
251254
{
252255
RankInfo info = getRank();
253256
RankInfo nRanks = getNumRanks();
254257
printf("\n---- Rank%d:Thread%d: Entering interactive mode at time %" PRI_SIMTIME " \n", info.rank, info.thread,
255258
getCurrentSimCycle());
256259
printf("%s\n", msg.c_str());
257260

261+
#if 1 // test new structure
262+
rankParallelExecute();
263+
return DONE;
264+
#endif
265+
258266
// Create a new ObjectMap
259267
obj_ = getComponentObjectMap();
260268

@@ -439,7 +447,11 @@ SimpleDebugger::dispatch_cmd(std::string& cmd)
439447
// normal execution
440448
auto consoleCommand = cmdRegistry.seek(tokens[0], CommandRegistry::SEARCH_TYPE::BUILTIN);
441449
if ( consoleCommand.second ) {
450+
#if 0
451+
bool succeed = consoleCommand.first.exec_par(cmd);
452+
#else
442453
bool succeed = consoleCommand.first.exec(tokens);
454+
#endif
443455
cmdHistoryBuf.append(cmd);
444456
return succeed;
445457
}
@@ -669,6 +681,37 @@ SimpleDebugger::cmd_pwd(std::vector<std::string>& UNUSED(tokens))
669681
}
670682

671683
// ls: list current directory
684+
#if 0
685+
bool
686+
SimpleDebugger::om_ls(std::string& tokens) {
687+
//tokenize string
688+
689+
690+
}
691+
692+
bool
693+
SimpleDebugger::cmd_ls(std::vector<std::string>& UNUSED(tokens), std::string& cmd)
694+
{
695+
std::cout << "SKK: cmd_ls\n";
696+
697+
// if serial ls(tokens)
698+
// if thread
699+
// if rank
700+
auto& vars = obj_->getVariables();
701+
for ( auto& x : vars ) {
702+
if ( x.second->isFundamental() ) {
703+
std::cout << x.first << " = " << x.second->get() << " (" << x.second->getType() << ")" << std::endl;
704+
}
705+
else {
706+
std::cout << x.first.c_str() << "/ (" << x.second->getType() << ")\n";
707+
}
708+
}
709+
return true;
710+
711+
}
712+
713+
#else
714+
672715
bool
673716
SimpleDebugger::cmd_ls(std::vector<std::string>& UNUSED(tokens))
674717
{
@@ -683,6 +726,7 @@ SimpleDebugger::cmd_ls(std::vector<std::string>& UNUSED(tokens))
683726
}
684727
return true;
685728
}
729+
#endif
686730

687731
// callback for autofill of object string (similar to ls)
688732
void
@@ -2260,5 +2304,222 @@ CommandRegistry::addHelp(std::string key, std::vector<std::string>& vec)
22602304
;
22612305
}
22622306

2307+
// Test new rank execute structure
2308+
2309+
bool
2310+
SimpleDebugger::handleCommandAll(std::atomic<int32_t>& tid, std::atomic<int32_t>& cmd,
2311+
std::stringstream& result, Core::ThreadSafe::Barrier& exchange_barrier, Core::ThreadSafe::Barrier& process_barrier)
2312+
{
2313+
2314+
RankInfo rank_ = Simulation_impl::getSimulation()->getRank();
2315+
// Wait for shared variables to be stored by T0
2316+
exchange_barrier.wait();
2317+
// Get shared variables
2318+
int32_t cmd_local = cmd.load();
2319+
int32_t tid_local = tid.load();
2320+
//int32_t done_local = done.load();
2321+
// If not DONE, process command
2322+
if (cmd_local != 0) {
2323+
// If I am target thread, handle the incoming command
2324+
if (rank_.thread == tid_local) {
2325+
#if 1
2326+
//result.store(1);
2327+
result << "**Worker PRINT: R" << rank_.rank << ", T" << rank_.thread << "\n";
2328+
//Output::getDefaultObject().output("**Worker PRINT: R%d, T%d\n", rank_.rank, rank_.thread);
2329+
#else
2330+
handleCommand(cmd, done, result);
2331+
#endif
2332+
}
2333+
2334+
} else { // DONE
2335+
//result.store(0);
2336+
if (rank_.thread == tid_local) {
2337+
result << "**Worker DONE: R" << rank_.rank << ", T" << rank_.thread << "\n";
2338+
}
2339+
//Output::getDefaultObject().output("**Worker R%d T%d DONE\n",
2340+
// rank_.rank, rank_.thread);
2341+
}
2342+
// Wait for result to be stored by target thread
2343+
process_barrier.wait();
2344+
//if (done_local) {
2345+
if (cmd_local == 0) {
2346+
return true;
2347+
} else {
2348+
return false;
2349+
}
2350+
}
2351+
2352+
int // ultimately it may return the message buffer size
2353+
SimpleDebugger::packResultBuffer( std::stringstream& result, char** result_buffer) {
2354+
std::string result_str = result.str();
2355+
//result_buffer = new char[result_str.length() +1];
2356+
*result_buffer = (char *) malloc(result_str.length() +1);
2357+
int length = result_str.length() +1;
2358+
std::strcpy(*result_buffer, result_str.c_str());
2359+
//Output::getDefaultObject().output("Result: %s", *result_buffer);
2360+
2361+
// Clear result for next time
2362+
result.str("");
2363+
result.clear();
2364+
2365+
return length;
2366+
}
2367+
2368+
void // ultimately it may return the message buffer size
2369+
SimpleDebugger::packCommandBuffer( int32_t rank_id, int32_t thread_id, int32_t* cmd_buffer, int32_t cmd) {
2370+
cmd_buffer[0] = rank_id;
2371+
cmd_buffer[1] = thread_id;
2372+
cmd_buffer[2] = cmd;
2373+
}
2374+
2375+
void
2376+
SimpleDebugger::unpackCommandBuffer( int32_t* cmd_buffer, std::atomic<int32_t>& cmd, std::atomic<int32_t>& tid)
2377+
{
2378+
2379+
RankInfo rank_ = Simulation_impl::getSimulation()->getRank();
2380+
//Output::getDefaultObject().output("**Worker R%d T%d Recv: R%d, T%d cmd%d\n",
2381+
// rank_.rank, rank_.thread, cmd_buffer[0], cmd_buffer[1], cmd_buffer[2]);
2382+
tid.store(cmd_buffer[1]);
2383+
cmd.store(cmd_buffer[2]);
2384+
2385+
}
2386+
2387+
2388+
void
2389+
SimpleDebugger::rankParallelExecute()
2390+
{
2391+
#ifdef SST_CONFIG_HAVE_MPI
2392+
RankInfo num_ranks_ = Simulation_impl::getSimulation()->getNumRanks();
2393+
RankInfo rank_ = Simulation_impl::getSimulation()->getRank();
2394+
int32_t cmd_buffer[3];
2395+
char* result_buffer = nullptr;
2396+
int tag = 0;
2397+
int src;
2398+
int dst;
2399+
bool quit = false;
2400+
2401+
// Static variables shared between threads within the rank
2402+
static std::atomic<int32_t> tid = 0;
2403+
static std::atomic<int32_t> cmd = 0;
2404+
static std::stringstream result;
2405+
2406+
static Core::ThreadSafe::Barrier exchange_barrier(num_ranks_.thread);
2407+
static Core::ThreadSafe::Barrier process_barrier(num_ranks_.thread);
2408+
2409+
//Output::getDefaultObject().output("----ParallelSkip\n");
2410+
2411+
if (rank_.rank == 0 && rank_.thread == 0) { // Handles send of commands
2412+
2413+
// Execute console and get next command
2414+
int32_t rindex = 1;
2415+
int32_t tindex = 1;
2416+
//Output::getDefaultObject().output("**Mgr R%d T%d Send PRINT: Dest R%d, T%d\n",
2417+
// rank_.rank, rank_.thread, rindex, tindex);
2418+
// Send print command to correct rank and wait for result
2419+
packCommandBuffer(rindex, tindex, cmd_buffer, 1);
2420+
dst = rindex;
2421+
2422+
// Local to this rank
2423+
if (rindex == 0) {
2424+
// Store shared variables from cmd_buffer
2425+
unpackCommandBuffer(cmd_buffer, cmd, tid);
2426+
// Target thread processes command
2427+
quit = handleCommandAll(tid, cmd, result, exchange_barrier, process_barrier);
2428+
assert(!quit);
2429+
2430+
// Pack result buffer
2431+
2432+
//Output::getDefaultObject().output("original Result: %s", result.str().c_str());
2433+
packResultBuffer(result, &result_buffer);
2434+
Output::getDefaultObject().output("Result: %s", result_buffer);
2435+
} // end rindex == 0
2436+
// Send to other rank for processing
2437+
else {
2438+
MPI_Send(cmd_buffer, 3, MPI_INT32_T, dst, tag, MPI_COMM_WORLD );
2439+
src = rindex;
2440+
int length;
2441+
MPI_Status status;
2442+
2443+
// Probe for an incoming message to get its length
2444+
MPI_Probe(src, tag, MPI_COMM_WORLD, &status);
2445+
// Get the actual number of elements (characters)
2446+
MPI_Get_count(&status, MPI_CHAR, &length);
2447+
result_buffer = (char*)malloc(length * sizeof(char));
2448+
MPI_Recv(result_buffer, length, MPI_CHAR, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
2449+
Output::getDefaultObject().output("Result: %s", result_buffer);
2450+
2451+
} // end Send to other rank
2452+
2453+
2454+
// DONE: Send done command to all ranks (can't use Bcast bc others don't know it is coming)
2455+
for (uint32_t rindex = 0; rindex < num_ranks_.rank; rindex++ ) {
2456+
2457+
//Output::getDefaultObject().output("**Mgr R%d T%d Send DONE: Dest R%d\n",
2458+
// rank_.rank, rank_.thread, rindex);
2459+
// Send done command to correct rank and wait for return
2460+
packCommandBuffer(rindex, tindex, cmd_buffer, 0);
2461+
dst = rindex;
2462+
2463+
// Local to this rank
2464+
if (rindex == 0) {
2465+
// Store shared variables from cmd_buffer
2466+
unpackCommandBuffer(cmd_buffer, cmd, tid);
2467+
// Target thread processes command
2468+
quit = handleCommandAll(tid, cmd, result, exchange_barrier, process_barrier);
2469+
assert(quit);
2470+
2471+
packResultBuffer(result, &result_buffer);
2472+
Output::getDefaultObject().output("Result: %s", result_buffer);
2473+
}
2474+
// Send to other rank for processing
2475+
else {
2476+
MPI_Send(cmd_buffer, 3, MPI_INT32_T, dst, tag, MPI_COMM_WORLD);
2477+
src = rindex;
2478+
int length;
2479+
MPI_Status status;
2480+
// Probe for an incoming message to get its length
2481+
MPI_Probe(src, tag, MPI_COMM_WORLD, &status);
2482+
// Get the actual number of elements (characters)
2483+
MPI_Get_count(&status, MPI_CHAR, &length);
2484+
result_buffer = (char*)malloc(length * sizeof(char));
2485+
MPI_Recv(result_buffer, length, MPI_CHAR, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
2486+
Output::getDefaultObject().output("ResultN: %s\n", result_buffer);
2487+
2488+
} // end else send to other rank
2489+
} // end for Ranks
2490+
} // end rank 0
2491+
2492+
else if (rank_.rank !=0 && rank_.thread == 0) { // Other ranks, thread 0
2493+
2494+
//Output::getDefaultObject().output("**Enter Worker: Rank:%d, Thread:%d\n", rank_.rank, rank_.thread);
2495+
src = 0;
2496+
dst = 0;
2497+
tag = 0;
2498+
while (!quit) {
2499+
// Wait for message
2500+
MPI_Recv(cmd_buffer, 3, MPI_INT32_T, src, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
2501+
// Store shared variables from cmd_buffer
2502+
unpackCommandBuffer(cmd_buffer, cmd, tid);
2503+
// Target thread processes command
2504+
quit = handleCommandAll(tid, cmd, result, exchange_barrier, process_barrier);
2505+
// Return result
2506+
2507+
int length = packResultBuffer(result, &result_buffer);
2508+
MPI_Send(result_buffer, length, MPI_CHAR, dst, tag, MPI_COMM_WORLD);
2509+
} // while !done
2510+
} // end other ranks
2511+
else { // All threads != 0
2512+
2513+
while (!quit) {
2514+
quit = handleCommandAll(tid, cmd, result, exchange_barrier, process_barrier);
2515+
}
2516+
//Output::getDefaultObject().output("**Worker R%d T%d Exit Loop\n",
2517+
// rank_.rank, rank_.thread);
2518+
}
2519+
2520+
#endif // SST_CONFIG_HAVE_MPI
2521+
} //end rankParallelExecute
2522+
2523+
22632524

22642525
} // namespace SST::IMPL::Interactive

src/sst/core/impl/interactive/simpleDebug.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636
#include <utility>
3737
#include <vector>
3838

39+
#include "sst/core/sst_mpi.h"
40+
41+
#define SKK 1
42+
3943
namespace SST::IMPL::Interactive {
4044

4145
enum class ConsoleCommandGroup { GENERAL, NAVIGATION, STATE, WATCH, SIMULATION, LOGGING, MISC, USER };
@@ -76,6 +80,20 @@ class ConsoleCommand
7680
group_(group),
7781
func_(func)
7882
{}
83+
84+
// Constructor for parallel execution (e.g. multi-thread and/or multi-rank) adds parallel fcn (funcp_) that takes a string
85+
// to simplify passing to other rank/thread
86+
#if 0
87+
ConsoleCommand(std::string str_long, std::string str_short, std::string str_help, ConsoleCommandGroup group,
88+
std::function<bool(std::vector<std::string>& tokens)> func, std::function<bool(std::string& cmd_str)> func_par) :
89+
str_long_(str_long),
90+
str_short_(str_short),
91+
str_help_(str_help),
92+
group_(group),
93+
func_(func),
94+
func_par_(func)
95+
{}
96+
#endif
7997
// Constructor for user-defined commands
8098
ConsoleCommand(std::string str_long) :
8199
str_long_(str_long),
@@ -91,6 +109,9 @@ class ConsoleCommand
91109
const ConsoleCommandGroup& group() const { return group_; }
92110
// Command Execution
93111
bool exec(std::vector<std::string>& tokens) { return func_(tokens); }
112+
#if 0
113+
bool exec_par(std::string& cmd_str) { return func_par_(cmd_str); }
114+
#endif
94115
bool match(const std::string& token)
95116
{
96117
std::string lctoken = toLower(token);
@@ -110,6 +131,9 @@ class ConsoleCommand
110131
std::string str_help_;
111132
ConsoleCommandGroup group_;
112133
std::function<bool(std::vector<std::string>& tokens)> func_;
134+
#if 0
135+
std::function<bool(std::string& cmd_str)> func_par_;
136+
#endif
113137
std::string toLower(std::string s)
114138
{
115139
std::transform(s.begin(), s.end(), s.begin(), ::tolower);
@@ -363,6 +387,14 @@ class SimpleDebugger : public SST::InteractiveConsole
363387
// Verbosity controlled console printing
364388
uint32_t verbosity = 0;
365389
void msg(VERBOSITY_MASK mask, std::string message);
390+
391+
// Test new rank parallel execute
392+
bool handleCommandAll(std::atomic<int32_t>& tid, std::atomic<int32_t>& cmd, std::stringstream& result,
393+
Core::ThreadSafe::Barrier& exchange_barrier, Core::ThreadSafe::Barrier& process_barrier);
394+
int packResultBuffer( std::stringstream& result, char** result_buffer);
395+
void packCommandBuffer( int32_t rank_id, int32_t thread_id, int32_t* cmd_buffer, int32_t cmd );
396+
void unpackCommandBuffer( int32_t* cmd_buffer, std::atomic<int32_t>& cmd, std::atomic<int32_t>& tid);
397+
void rankParallelExecute();
366398
};
367399
368400
} // namespace SST::IMPL::Interactive

0 commit comments

Comments
 (0)