Skip to content

Commit 4599def

Browse files
committed
Add yieldable nodes and suspend/resume to WorkGraph
Introduces yieldable nodes to WorkGraph, allowing tasks to yield and be rescheduled. Adds support for suspending and resuming graph execution, including handling of yielded nodes and reschedule limits.
1 parent 0e2902c commit 4599def

File tree

7 files changed

+567
-15
lines changed

7 files changed

+567
-15
lines changed

CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,11 @@ target_link_libraries(EntropyCore
103103
)
104104

105105
add_executable(WorkContractExample Examples/WorkContractExample.cpp)
106-
107106
target_link_libraries(WorkContractExample EntropyCore)
108107

108+
add_executable(WorkGraphYieldableExample Examples/WorkGraphYieldableExample.cpp)
109+
target_link_libraries(WorkGraphYieldableExample EntropyCore)
110+
109111
# Platform and compiler-specific settings
110112
if(WIN32)
111113
# Settings for Windows
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#include <iostream>
2+
#include <atomic>
3+
#include <thread>
4+
#include <chrono>
5+
#include "../src/Concurrency/WorkContractGroup.h"
6+
#include "../src/Concurrency/WorkService.h"
7+
#include "../src/Concurrency/WorkGraph.h"
8+
9+
using namespace EntropyEngine::Core::Concurrency;
10+
using namespace std::chrono_literals;
11+
12+
int main() {
13+
// Setup work service and contract group (like WorkContractExample)
14+
WorkService::Config config;
15+
config.threadCount = 4;
16+
WorkService service(config);
17+
service.start();
18+
19+
WorkContractGroup group(1000);
20+
service.addWorkContractGroup(&group);
21+
22+
// Example 1: Basic work graph with dependencies and main thread work
23+
{
24+
std::cout << "\n=== Example 1: Basic Work Graph with Dependencies ===\n";
25+
WorkGraph graph(&group);
26+
27+
// Create nodes
28+
auto task1 = graph.addNode([]() {
29+
std::cout << "Task 1: Background work\n";
30+
std::this_thread::sleep_for(100ms);
31+
}, "task1");
32+
33+
auto task2 = graph.addNode([]() {
34+
std::cout << "Task 2: More background work\n";
35+
std::this_thread::sleep_for(100ms);
36+
}, "task2");
37+
38+
auto mainThreadTask = graph.addNode([]() {
39+
std::cout << "Main Thread Task: UI Update\n";
40+
std::this_thread::sleep_for(50ms);
41+
}, "main-thread-task", nullptr, ExecutionType::MainThread);
42+
43+
auto finalTask = graph.addNode([]() {
44+
std::cout << "Final Task: Cleanup\n";
45+
}, "final");
46+
47+
// Set dependencies: task1 -> task2 -> mainThreadTask -> finalTask
48+
graph.addDependency(task1, task2);
49+
graph.addDependency(task2, mainThreadTask);
50+
graph.addDependency(mainThreadTask, finalTask);
51+
52+
// Execute
53+
graph.execute();
54+
55+
// Pump main thread work
56+
while (!graph.isComplete()) {
57+
group.executeMainThreadWork(10);
58+
std::this_thread::sleep_for(10ms);
59+
}
60+
61+
std::cout << "Graph 1 complete\n";
62+
}
63+
64+
// Example 2: Yieldable node that waits for atomic value
65+
{
66+
std::cout << "\n=== Example 2: Yieldable Node Waiting for Atomic ===\n";
67+
WorkGraph graph(&group);
68+
69+
std::atomic<bool> ready{false};
70+
71+
// Producer sets the atomic after 500ms
72+
auto producer = graph.addNode([&ready]() {
73+
std::cout << "Producer: Working...\n";
74+
std::this_thread::sleep_for(1000ms);
75+
ready = true;
76+
std::cout << "Producer: Data ready!\n";
77+
}, "producer");
78+
79+
// Consumer yields until atomic is true
80+
auto consumer = graph.addYieldableNode([&ready]() -> WorkResult {
81+
static int attempts = 0;
82+
attempts++;
83+
std::cout << "Consumer: Attempt " << attempts << " - checking...\n";
84+
85+
if (!ready.load()) {
86+
std::this_thread::sleep_for(100ms);
87+
return WorkResult::Yield;
88+
}
89+
90+
std::cout << "Consumer: Got data after " << attempts << " attempts!\n";
91+
return WorkResult::Complete;
92+
}, "consumer", nullptr, ExecutionType::AnyThread, 20); // Max 20 attempts
93+
94+
// Execute (no dependency - they run in parallel)
95+
graph.execute();
96+
graph.wait();
97+
98+
std::cout << "Graph 2 complete\n";
99+
}
100+
101+
// Example 3: Suspend/Resume functionality
102+
{
103+
std::cout << "\n=== Example 3: Suspend and Resume Graph ===\n";
104+
WorkGraph graph(&group);
105+
106+
std::atomic<int> counter{0};
107+
108+
// Create several nodes that increment counter
109+
auto node1 = graph.addNode([&counter]() {
110+
std::cout << "Node 1: Working...\n";
111+
std::this_thread::sleep_for(200ms);
112+
counter++;
113+
std::cout << "Node 1: Done (counter=" << counter.load() << ")\n";
114+
}, "node1");
115+
116+
auto node2 = graph.addNode([&counter]() {
117+
std::cout << "Node 2: Working...\n";
118+
std::this_thread::sleep_for(200ms);
119+
counter++;
120+
std::cout << "Node 2: Done (counter=" << counter.load() << ")\n";
121+
}, "node2");
122+
123+
// Yieldable node that increments counter multiple times
124+
auto yieldNode = graph.addYieldableNode([&counter]() -> WorkResult {
125+
static int iterations = 0;
126+
iterations++;
127+
std::cout << "Yield Node: Iteration " << iterations << "\n";
128+
counter++;
129+
std::this_thread::sleep_for(100ms);
130+
131+
if (iterations < 5) {
132+
return WorkResult::Yield;
133+
}
134+
std::cout << "Yield Node: Complete (counter=" << counter.load() << ")\n";
135+
return WorkResult::Complete;
136+
}, "yield-node");
137+
138+
auto node3 = graph.addNode([&counter]() {
139+
std::cout << "Node 3: Working...\n";
140+
std::this_thread::sleep_for(200ms);
141+
counter++;
142+
std::cout << "Node 3: Done (counter=" << counter.load() << ")\n";
143+
}, "node3");
144+
145+
// Set up dependencies: node1 -> node2 -> yieldNode -> node3
146+
graph.addDependency(node1, node2);
147+
graph.addDependency(node2, yieldNode);
148+
graph.addDependency(yieldNode, node3);
149+
150+
// Start execution
151+
graph.execute();
152+
std::cout << "Graph started\n";
153+
154+
// Let it run for a bit
155+
std::this_thread::sleep_for(300ms);
156+
157+
// Suspend the graph
158+
std::cout << "\n>>> SUSPENDING GRAPH <<<\n";
159+
graph.suspend();
160+
std::cout << "Graph suspended (counter=" << counter.load() << ")\n";
161+
162+
// Wait while suspended - nothing new should schedule
163+
std::cout << "Waiting 1 second while suspended...\n";
164+
std::this_thread::sleep_for(1000ms);
165+
std::cout << "Counter after suspension wait: " << counter.load() << "\n";
166+
167+
// Resume the graph
168+
std::cout << "\n>>> RESUMING GRAPH <<<\n";
169+
graph.resume();
170+
std::cout << "Graph resumed\n";
171+
172+
// Wait for completion
173+
auto result = graph.wait();
174+
std::cout << "Graph 3 complete (final counter=" << counter.load() << ")\n";
175+
}
176+
177+
service.stop();
178+
return 0;
179+
}

src/Concurrency/NodeScheduler.cpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,25 @@ std::function<void()> NodeScheduler::createWorkWrapper(NodeHandle node) {
232232
_eventBus->publish(NodeExecutingEvent(_graph, node));
233233
}
234234

235-
// Execute the work
235+
// Execute the work based on variant type
236236
bool failed = false;
237+
bool yielded = false;
237238
std::exception_ptr exception;
238239

239240
try {
240-
if (nodeData->work) {
241-
nodeData->work();
241+
if (nodeData->isYieldable) {
242+
// Handle yieldable work function
243+
if (auto* yieldableWork = std::get_if<YieldableWorkFunction>(&nodeData->work)) {
244+
WorkResult result = (*yieldableWork)();
245+
if (result == WorkResult::Yield) {
246+
yielded = true;
247+
}
248+
}
249+
} else {
250+
// Handle legacy void work function
251+
if (auto* voidWork = std::get_if<std::function<void()>>(&nodeData->work)) {
252+
(*voidWork)();
253+
}
242254
}
243255
} catch (...) {
244256
failed = true;
@@ -250,11 +262,16 @@ std::function<void()> NodeScheduler::createWorkWrapper(NodeHandle node) {
250262
return; // Scheduler is gone, skip cleanup
251263
}
252264

253-
// Notify completion or failure
265+
// Notify completion, failure, or yield
254266
if (failed) {
255267
if (_callbacks.onNodeFailed) {
256268
_callbacks.onNodeFailed(node, exception);
257269
}
270+
} else if (yielded) {
271+
// Notify via callback
272+
if (_callbacks.onNodeYielded) {
273+
_callbacks.onNodeYielded(node);
274+
}
258275
} else {
259276
if (_callbacks.onNodeCompleted) {
260277
_callbacks.onNodeCompleted(node);

src/Concurrency/NodeScheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class NodeScheduler {
122122
std::function<void(NodeHandle)> onNodeCompleted; ///< Node finished successfully
123123
std::function<void(NodeHandle, std::exception_ptr)> onNodeFailed; ///< Node threw an exception
124124
std::function<void(NodeHandle)> onNodeDropped; ///< Node dropped (deferred queue overflow)
125+
std::function<void(NodeHandle)> onNodeYielded; ///< Node yielded execution (will reschedule)
125126
};
126127

127128
/**

0 commit comments

Comments
 (0)