Skip to content

Commit af911ad

Browse files
committed
Merge branch 'main' into autobuild
2 parents 5c07df4 + 4599def commit af911ad

File tree

7 files changed

+567
-15
lines changed

7 files changed

+567
-15
lines changed

CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,11 @@ target_link_libraries(EntropyCore
103103
)
104104

105105
add_executable(WorkContractExample Examples/WorkContractExample.cpp)
106-
107106
target_link_libraries(WorkContractExample EntropyCore)
108107

108+
add_executable(WorkGraphYieldableExample Examples/WorkGraphYieldableExample.cpp)
109+
target_link_libraries(WorkGraphYieldableExample EntropyCore)
110+
109111
# Platform and compiler-specific settings
110112
if(WIN32)
111113
# Settings for Windows
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#include <iostream>
2+
#include <atomic>
3+
#include <thread>
4+
#include <chrono>
5+
#include "../src/Concurrency/WorkContractGroup.h"
6+
#include "../src/Concurrency/WorkService.h"
7+
#include "../src/Concurrency/WorkGraph.h"
8+
9+
using namespace EntropyEngine::Core::Concurrency;
10+
using namespace std::chrono_literals;
11+
12+
int main() {
13+
// Setup work service and contract group (like WorkContractExample)
14+
WorkService::Config config;
15+
config.threadCount = 4;
16+
WorkService service(config);
17+
service.start();
18+
19+
WorkContractGroup group(1000);
20+
service.addWorkContractGroup(&group);
21+
22+
// Example 1: Basic work graph with dependencies and main thread work
23+
{
24+
std::cout << "\n=== Example 1: Basic Work Graph with Dependencies ===\n";
25+
WorkGraph graph(&group);
26+
27+
// Create nodes
28+
auto task1 = graph.addNode([]() {
29+
std::cout << "Task 1: Background work\n";
30+
std::this_thread::sleep_for(100ms);
31+
}, "task1");
32+
33+
auto task2 = graph.addNode([]() {
34+
std::cout << "Task 2: More background work\n";
35+
std::this_thread::sleep_for(100ms);
36+
}, "task2");
37+
38+
auto mainThreadTask = graph.addNode([]() {
39+
std::cout << "Main Thread Task: UI Update\n";
40+
std::this_thread::sleep_for(50ms);
41+
}, "main-thread-task", nullptr, ExecutionType::MainThread);
42+
43+
auto finalTask = graph.addNode([]() {
44+
std::cout << "Final Task: Cleanup\n";
45+
}, "final");
46+
47+
// Set dependencies: task1 -> task2 -> mainThreadTask -> finalTask
48+
graph.addDependency(task1, task2);
49+
graph.addDependency(task2, mainThreadTask);
50+
graph.addDependency(mainThreadTask, finalTask);
51+
52+
// Execute
53+
graph.execute();
54+
55+
// Pump main thread work
56+
while (!graph.isComplete()) {
57+
group.executeMainThreadWork(10);
58+
std::this_thread::sleep_for(10ms);
59+
}
60+
61+
std::cout << "Graph 1 complete\n";
62+
}
63+
64+
// Example 2: Yieldable node that waits for atomic value
65+
{
66+
std::cout << "\n=== Example 2: Yieldable Node Waiting for Atomic ===\n";
67+
WorkGraph graph(&group);
68+
69+
std::atomic<bool> ready{false};
70+
71+
// Producer sets the atomic after 500ms
72+
auto producer = graph.addNode([&ready]() {
73+
std::cout << "Producer: Working...\n";
74+
std::this_thread::sleep_for(1000ms);
75+
ready = true;
76+
std::cout << "Producer: Data ready!\n";
77+
}, "producer");
78+
79+
// Consumer yields until atomic is true
80+
auto consumer = graph.addYieldableNode([&ready]() -> WorkResult {
81+
static int attempts = 0;
82+
attempts++;
83+
std::cout << "Consumer: Attempt " << attempts << " - checking...\n";
84+
85+
if (!ready.load()) {
86+
std::this_thread::sleep_for(100ms);
87+
return WorkResult::Yield;
88+
}
89+
90+
std::cout << "Consumer: Got data after " << attempts << " attempts!\n";
91+
return WorkResult::Complete;
92+
}, "consumer", nullptr, ExecutionType::AnyThread, 20); // Max 20 attempts
93+
94+
// Execute (no dependency - they run in parallel)
95+
graph.execute();
96+
graph.wait();
97+
98+
std::cout << "Graph 2 complete\n";
99+
}
100+
101+
// Example 3: Suspend/Resume functionality
102+
{
103+
std::cout << "\n=== Example 3: Suspend and Resume Graph ===\n";
104+
WorkGraph graph(&group);
105+
106+
std::atomic<int> counter{0};
107+
108+
// Create several nodes that increment counter
109+
auto node1 = graph.addNode([&counter]() {
110+
std::cout << "Node 1: Working...\n";
111+
std::this_thread::sleep_for(200ms);
112+
counter++;
113+
std::cout << "Node 1: Done (counter=" << counter.load() << ")\n";
114+
}, "node1");
115+
116+
auto node2 = graph.addNode([&counter]() {
117+
std::cout << "Node 2: Working...\n";
118+
std::this_thread::sleep_for(200ms);
119+
counter++;
120+
std::cout << "Node 2: Done (counter=" << counter.load() << ")\n";
121+
}, "node2");
122+
123+
// Yieldable node that increments counter multiple times
124+
auto yieldNode = graph.addYieldableNode([&counter]() -> WorkResult {
125+
static int iterations = 0;
126+
iterations++;
127+
std::cout << "Yield Node: Iteration " << iterations << "\n";
128+
counter++;
129+
std::this_thread::sleep_for(100ms);
130+
131+
if (iterations < 5) {
132+
return WorkResult::Yield;
133+
}
134+
std::cout << "Yield Node: Complete (counter=" << counter.load() << ")\n";
135+
return WorkResult::Complete;
136+
}, "yield-node");
137+
138+
auto node3 = graph.addNode([&counter]() {
139+
std::cout << "Node 3: Working...\n";
140+
std::this_thread::sleep_for(200ms);
141+
counter++;
142+
std::cout << "Node 3: Done (counter=" << counter.load() << ")\n";
143+
}, "node3");
144+
145+
// Set up dependencies: node1 -> node2 -> yieldNode -> node3
146+
graph.addDependency(node1, node2);
147+
graph.addDependency(node2, yieldNode);
148+
graph.addDependency(yieldNode, node3);
149+
150+
// Start execution
151+
graph.execute();
152+
std::cout << "Graph started\n";
153+
154+
// Let it run for a bit
155+
std::this_thread::sleep_for(300ms);
156+
157+
// Suspend the graph
158+
std::cout << "\n>>> SUSPENDING GRAPH <<<\n";
159+
graph.suspend();
160+
std::cout << "Graph suspended (counter=" << counter.load() << ")\n";
161+
162+
// Wait while suspended - nothing new should schedule
163+
std::cout << "Waiting 1 second while suspended...\n";
164+
std::this_thread::sleep_for(1000ms);
165+
std::cout << "Counter after suspension wait: " << counter.load() << "\n";
166+
167+
// Resume the graph
168+
std::cout << "\n>>> RESUMING GRAPH <<<\n";
169+
graph.resume();
170+
std::cout << "Graph resumed\n";
171+
172+
// Wait for completion
173+
auto result = graph.wait();
174+
std::cout << "Graph 3 complete (final counter=" << counter.load() << ")\n";
175+
}
176+
177+
service.stop();
178+
return 0;
179+
}

src/Concurrency/NodeScheduler.cpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,25 @@ std::function<void()> NodeScheduler::createWorkWrapper(NodeHandle node) {
232232
_eventBus->publish(NodeExecutingEvent(_graph, node));
233233
}
234234

235-
// Execute the work
235+
// Execute the work based on variant type
236236
bool failed = false;
237+
bool yielded = false;
237238
std::exception_ptr exception;
238239

239240
try {
240-
if (nodeData->work) {
241-
nodeData->work();
241+
if (nodeData->isYieldable) {
242+
// Handle yieldable work function
243+
if (auto* yieldableWork = std::get_if<YieldableWorkFunction>(&nodeData->work)) {
244+
WorkResult result = (*yieldableWork)();
245+
if (result == WorkResult::Yield) {
246+
yielded = true;
247+
}
248+
}
249+
} else {
250+
// Handle legacy void work function
251+
if (auto* voidWork = std::get_if<std::function<void()>>(&nodeData->work)) {
252+
(*voidWork)();
253+
}
242254
}
243255
} catch (...) {
244256
failed = true;
@@ -250,11 +262,16 @@ std::function<void()> NodeScheduler::createWorkWrapper(NodeHandle node) {
250262
return; // Scheduler is gone, skip cleanup
251263
}
252264

253-
// Notify completion or failure
265+
// Notify completion, failure, or yield
254266
if (failed) {
255267
if (_callbacks.onNodeFailed) {
256268
_callbacks.onNodeFailed(node, exception);
257269
}
270+
} else if (yielded) {
271+
// Notify via callback
272+
if (_callbacks.onNodeYielded) {
273+
_callbacks.onNodeYielded(node);
274+
}
258275
} else {
259276
if (_callbacks.onNodeCompleted) {
260277
_callbacks.onNodeCompleted(node);

src/Concurrency/NodeScheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class NodeScheduler {
122122
std::function<void(NodeHandle)> onNodeCompleted; ///< Node finished successfully
123123
std::function<void(NodeHandle, std::exception_ptr)> onNodeFailed; ///< Node threw an exception
124124
std::function<void(NodeHandle)> onNodeDropped; ///< Node dropped (deferred queue overflow)
125+
std::function<void(NodeHandle)> onNodeYielded; ///< Node yielded execution (will reschedule)
125126
};
126127

127128
/**

0 commit comments

Comments
 (0)