From 777bf3f0d0d7f3e80fcf4871b65bd592c48f0ac5 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Wed, 5 Nov 2025 14:46:26 -0600 Subject: [PATCH 1/3] Support queue registration in reconverse --- src/CMakeLists.txt | 2 +- src/convcore.cpp | 3 +- src/converse_internal.h | 4 +- src/scheduler.cpp | 332 ++++++++++++++------------------------ src/scheduler.h | 22 +++ src/scheduler_helpers.cpp | 63 ++++++++ 6 files changed, 208 insertions(+), 218 deletions(-) create mode 100644 src/scheduler_helpers.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4566195..293083d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,6 +1,6 @@ target_include_directories(reconverse PRIVATE .) target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp random.cpp - scheduler.cpp cpuaffinity.cpp collectives.cpp + scheduler.cpp scheduler_helpers.cpp cpuaffinity.cpp collectives.cpp comm_backend/comm_backend_internal.cpp threads.cpp cldb.rand.cpp cldb.cpp cmirdmautils.cpp conv-rdma.cpp conv-topology.cpp msgmgr.cpp queueing.cpp) target_include_directories( diff --git a/src/convcore.cpp b/src/convcore.cpp index 0f43b21..2dc1abf 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -159,10 +159,11 @@ void converseRunPe(int rank) { } void CmiStartThreads() { - // allocate global arrayss + // allocate global arrays Cmi_queues = new ConverseQueue *[Cmi_mynodesize]; CmiHandlerTable = new std::vector *[Cmi_mynodesize]; CmiNodeQueue = new ConverseNodeQueue(); + CmiQueueRegisterInit(); _smp_mutex = CmiCreateLock(); CmiMemLock_lock = CmiCreateLock(); diff --git a/src/converse_internal.h b/src/converse_internal.h index 1d55c5a..4bc3866 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -67,9 +67,11 @@ CmiState *CmiGetState(void); void CmiInitState(int pe); ConverseQueue *CmiGetQueue(int pe); void CrnInit(void); - void CmiPushPE(int destPE, int messageSize, void *msg); +//queue reg init +void CmiQueueRegisterInit(void); + // node queue ConverseNodeQueue *CmiGetNodeQueue(); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 0a7e999..d7cd33a 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -1,145 +1,129 @@ #include "scheduler.h" -#include "converse.h" -#include "converse_internal.h" -#include "queue.h" -#include -/** - * The main scheduler loop for the Charm++ runtime. - */ -void CsdScheduler() { - // get pthread level queue - ConverseQueue *queue = CmiGetQueue(CmiMyRank()); +extern std::vector g_handlers; //list of handlers +extern Groups g_groups; //groups of handlers by index - // get node level queue - ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); - - int loop_counter = 0; - - while (CmiStopFlag() == 0) { - - CcdRaiseCondition(CcdSCHEDLOOP); +static inline void releaseIdle() { + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } +} - // poll node queue - if (!nodeQueue->empty()) { - auto result = nodeQueue->pop(); - if (result) { - void *msg = result.value(); - // process event - CmiHandleMessage(msg); +static inline void setIdle() { + if (!CmiGetIdle()) { + CmiSetIdle(true); + CmiSetIdleTime(CmiWallTimer()); + CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); + } + // if already idle, call still idle and (maybe) long idle + else { + CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); + if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { + CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); + } + } +} - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } +//poll converse-level node queue +bool pollConverseNodeQueue() { + ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); + if (!nodeQueue->empty()) { + auto result = nodeQueue->pop(); + if (result) { + void *msg = result.value(); + // process event + CmiHandleMessage(msg); + releaseIdle(); + return true; } + } + return false; +} - // poll thread queue - else if (!queue->empty()) { - // get next event (guaranteed to be there because only single consumer) - void *msg = queue->pop().value(); +//poll converse-level thread queue +bool pollConverseThreadQueue() { + ConverseQueue *queue = CmiGetQueue(CmiMyRank()); + if (!queue->empty()) { + // get next event (guaranteed to be there because only single consumer) + void *msg = queue->pop().value(); + // process event + CmiHandleMessage(msg); + releaseIdle(); + return true; + } + return false; +} +//poll node priority queue +bool pollNodePrioQueue() { + // Try to acquire lock without blocking + if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { + if (!QueueEmpty(CsvAccess(CsdNodeQueue))) { + void *msg = QueueTop(CsvAccess(CsdNodeQueue)); + QueuePop(CsvAccess(CsdNodeQueue)); + CmiUnlock(CsvAccess(CsdNodeQueueLock)); // process event CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } + releaseIdle(); + return true; + } else { + CmiUnlock(CsvAccess(CsdNodeQueueLock)); } + } + return false; +} - // poll node prio queue - else { - // Try to acquire lock without blocking - if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { - if (!QueueEmpty(CsvAccess(CsdNodeQueue))) { - void* msg = QueueTop(CsvAccess(CsdNodeQueue)); - QueuePop(CsvAccess(CsdNodeQueue)); - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - // process event - CmiHandleMessage(msg); +//poll thread priority queue +bool pollThreadPrioQueue() { + if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { + void *msg = QueueTop(CpvAccess(CsdSchedQueue)); + QueuePop(CpvAccess(CsdSchedQueue)); + // process event + CmiHandleMessage(msg); + releaseIdle(); + return true; + } + return false; +} - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } - else { - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - //empty queue so check thread prio queue - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); +bool pollProgress() +{ + if(CmiMyRank() % backend_poll_thread == 0) return comm_backend::progress(); + return false; +} - // process event - CmiHandleMessage(msg); +//will add queue polling functions +//called at node level (before threads created) +void CmiQueueRegisterInit() { + add_handler(pollConverseNodeQueue, 8); + add_handler(pollConverseThreadQueue, 1); + add_handler(pollNodePrioQueue, 16); + add_handler(pollThreadPrioQueue, 1); + add_handler(pollProgress, backend_poll_freq); +} - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - // the processor is idle - // if not already idle, set idle and raise condition - if (!CmiGetIdle()) { - CmiSetIdle(true); - CmiSetIdleTime(CmiWallTimer()); - CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); - } - // if already idle, call still idle and (maybe) long idle - else { - CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); - if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { - CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); - } - } - } - } - } - else { - // Could not acquire node queue lock, skip to thread prio queue - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); +/** + * The main scheduler loop for the Charm++ runtime. + */ +void CsdScheduler() { - // process event - CmiHandleMessage(msg); + uint64_t loop_counter = 0; - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - // the processor is idle - // if not already idle, set idle and raise condition - if (!CmiGetIdle()) { - CmiSetIdle(true); - CmiSetIdleTime(CmiWallTimer()); - CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); - } - // if already idle, call still idle and (maybe) long idle - else { - CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); - if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { - CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); - } - } - } - } + while (CmiStopFlag() == 0) { + + CcdRaiseCondition(CcdSCHEDLOOP); + //poll queues + unsigned idx = static_cast(loop_counter & 63ULL); + bool workDone = false; + for (auto fn : g_groups[idx]) { + workDone |= fn(); } - if((CmiMyRank() % backend_poll_thread == 0) && (loop_counter++ == (backend_poll_freq - 1))) - { - loop_counter = 0; - comm_backend::progress(); + if(!workDone) { + setIdle(); } - CcdCallBacks(); + loop_counter++; } } @@ -149,106 +133,24 @@ void CsdScheduler() { * are empty, not when the scheduler is stopped. */ void CsdSchedulePoll() { - // get pthread level queue - ConverseQueue *queue = CmiGetQueue(CmiMyRank()); - - // get node level queue - ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); + uint64_t loop_counter = 0; while(1){ CcdCallBacks(); - CcdRaiseCondition(CcdSCHEDLOOP); - - // poll node queue - if (!nodeQueue->empty()) { - auto result = nodeQueue->pop(); - if (result) { - void *msg = result.value(); - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } + //poll queues + unsigned idx = static_cast(loop_counter & 63ULL); + bool workDone = false; + for (auto fn : g_groups[idx]) { + workDone |= fn(); } - - // poll thread queue - else if (!queue->empty()) { - // get next event (guaranteed to be there because only single consumer) - void *msg = queue->pop().value(); - - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } - - // poll node prio queue - else { - // Try to acquire lock without blocking - if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { - if (!QueueEmpty(CsvAccess(CsdNodeQueue))) { - void *msg = QueueTop(CsvAccess(CsdNodeQueue)); - QueuePop(CsvAccess(CsdNodeQueue)); - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } - else { - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); - - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - comm_backend::progress(); - break; //break when queues are empty - } - } - } - else { - // Could not acquire node queue lock, skip to thread prio queue - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); - - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - comm_backend::progress(); - break; //break when queues are empty - } - } + if(!workDone) { + setIdle(); + break; } + loop_counter++; + } } diff --git a/src/scheduler.h b/src/scheduler.h index a311734..33b878d 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -1,2 +1,24 @@ +#ifndef _SCHEDULER_H_ +#define _SCHEDULER_H_ +#include "converse.h" +#include "converse_internal.h" +#include "queue.h" +#include +#include +#include + +using QueuePollHandlerFn = bool(*)(void); //we need a return value to indicate if work was done + +struct QueuePollHandler { + QueuePollHandlerFn fn; + uint64_t mask{0}; // 64-bit mask: bit i == call at loop index i (0..63) + unsigned period{0}; // 1..64, 0 => disabled + unsigned phase{0}; +}; + +using Groups = std::array, 64>; + +void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase = 0); void CsdScheduler(); +#endif diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp new file mode 100644 index 0000000..2dea2b9 --- /dev/null +++ b/src/scheduler_helpers.cpp @@ -0,0 +1,63 @@ +#include "scheduler.h" + +std::vector g_handlers; //list of handlers +Groups g_groups; //groups of handlers by index + +// Build a 64-bit mask for a period n (1..64) with optional phase (0..n-1) +inline uint64_t make_mask_every_n(unsigned n, unsigned phase = 0) { + if (n == 0) return 0ULL; + if (n == 1) return ~0ULL; + if (n > 64) n = 64; // clamp to 64 + uint64_t mask = 0ULL; + for (unsigned pos = 0; pos < 64; ++pos) { + if (((pos + phase) % n) == 0) mask |= (1ULL << pos); + } + return mask; +} + +// Rebuild groups from current handler masks (in-place). +// Single-threaded callers may call this whenever a handler mask changes. +inline void rebuild_groups() { + // Clear all groups + for (auto &v : g_groups) v.clear(); + + // Populate groups from each handler's mask + for (const auto &h : g_handlers) { + uint64_t m = h.mask; + if (m == 0) continue; + for (unsigned bit = 0; bit < 64; ++bit) { + if ((m >> bit) & 1ULL) { + g_groups[bit].push_back(h.fn); + } + } + } +} + +// Set handler period and phase (period: 1..64, 0 disables). +// Rebuilds groups immediately (cheap relative to hot path). +inline void set_frequency(size_t handlerIndex, unsigned period, unsigned phase = 0) { + if (handlerIndex >= g_handlers.size()) return; + QueuePollHandler &h = g_handlers[handlerIndex]; + + if (period == 0) { + h.period = 0; + h.phase = 0; + h.mask = 0ULL; + } else { + if (period > 64) period = 64; + h.period = period; + h.phase = phase % period; + h.mask = make_mask_every_n(h.period, h.phase); + } + rebuild_groups(); +} + +// Add a handler that will poll a queue at given frequency. +void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase) +{ + g_handlers.push_back({fn}); + size_t index = g_handlers.size() - 1; + set_frequency(index, period, phase); +} + + From d5839f337703f148658c4b1f4b885cf653f25a53 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Wed, 5 Nov 2025 15:00:24 -0600 Subject: [PATCH 2/3] polling progress doesn't count --- src/scheduler.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index d7cd33a..151efa2 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -89,8 +89,8 @@ bool pollThreadPrioQueue() { bool pollProgress() { - if(CmiMyRank() % backend_poll_thread == 0) return comm_backend::progress(); - return false; + if(CmiMyRank() % backend_poll_thread == 0) comm_backend::progress(); + return false; //polling progress doesn't count } //will add queue polling functions From f5aad458e9fdb7a343eb661b68ce3fd4de098d06 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Fri, 5 Dec 2025 08:13:42 -0600 Subject: [PATCH 3/3] new way to add handlers --- src/scheduler.h | 7 +++++++ src/scheduler_helpers.cpp | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/src/scheduler.h b/src/scheduler.h index 33b878d..52ec57b 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -20,5 +20,12 @@ using Groups = std::array, 64>; void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase = 0); +// Add multiple handlers at once +// pairs of poll handlers and relative frequencies (will be normalized regardless of actual value) +// (frequency/total)*64 +// example: if the frequencies are 8, 1, 16, 1, 4, then they are added up to 30, then normalized to 17, 2, 34, 2, 9 +// then assign to slots based on these normalized values +void add_list_of_handlers(const std::vector>& handlers); + void CsdScheduler(); #endif diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index 2dea2b9..dec258b 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -2,6 +2,8 @@ std::vector g_handlers; //list of handlers Groups g_groups; //groups of handlers by index +QueuePollHandlerFn *poll_handlers; // fixed size array +#define ARRAY_SIZE 64 // Build a 64-bit mask for a period n (1..64) with optional phase (0..n-1) inline uint64_t make_mask_every_n(unsigned n, unsigned phase = 0) { @@ -60,4 +62,35 @@ void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase) set_frequency(index, period, phase); } - +void add_list_of_handlers(const std::vector>& handlers){ + // total frequency + unsigned int total = 0; + for(const auto& handler : handlers){ + total += handler.second; + } + if(total == 0) return; // nothing to add + // loop through handlers and add them to the table + // spread out based on normalized frequency + poll_handlers = new QueuePollHandlerFn[ARRAY_SIZE]; + unsigned int current_index = 0; + for(const auto& handler : handlers){ + unsigned int freq = handler.second; + unsigned int normalized = (freq * ARRAY_SIZE) / total; //estimate of how many slots this handler should take + if(normalized == 0) normalized = 1; // at least once + // go through loop and find empty slots + // spread out as evenly as possible + unsigned int remaining = normalized; + unsigned int step = ARRAY_SIZE / normalized; + unsigned int index = current_index; + while(remaining > 0){ + //find next empty slot + while(poll_handlers[index] != nullptr){ + index = (index + 1) % ARRAY_SIZE; + } + poll_handlers[index] = handler.first; + remaining--; + index = (index + step) % ARRAY_SIZE; + } + current_index = (current_index + 1) % ARRAY_SIZE; + } +}