Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2c38311
try a different approach for blocking loops
Simn Feb 9, 2026
8e70204
maybe like this
Simn Feb 9, 2026
846e596
remove unneeded code
Simn Feb 9, 2026
5d23642
add wakeUp function
Simn Feb 9, 2026
74b93aa
try waking up via an async for luv schedulers
Simn Feb 9, 2026
6d56837
try to be more accurate
Simn Feb 9, 2026
a367b16
disable issue 37 to avoid distractions
Simn Feb 9, 2026
32586e7
avoid race condition for now
Simn Feb 9, 2026
2c76350
track wokenUp state explicitly
Simn Feb 9, 2026
4a5fcad
don't try to wake up C++'s luv scheduler like that
Simn Feb 9, 2026
23c6399
try with a sanity mutex
Simn Feb 9, 2026
a50a2a0
Revert "disable issue 37 to avoid distractions"
Simn Feb 9, 2026
df1dc74
Reapply "disable issue 37 to avoid distractions"
Simn Feb 9, 2026
5d6263b
try with ThreadPoolDispatcher again
Simn Feb 9, 2026
43fe7ae
loop at end of entrypoint tests to avoid dangling events
Simn Feb 9, 2026
c49a216
how's this?
Simn Feb 9, 2026
3902af0
it just doesn't work the way I want it to
Simn Feb 9, 2026
188e68f
use semaphore to ensure onCompletion is done before returning
Simn Feb 10, 2026
360cb44
add IShutDown to handle dispatcher shutdown
Simn Feb 10, 2026
0da2910
add runMode argument to loop
Simn Feb 10, 2026
09c1746
Merge branch 'master' into blocking-loop
Simn Feb 10, 2026
34ac013
Merge branch 'master' into blocking-loop
Simn Feb 10, 2026
2a694bb
neko works now
Simn Feb 10, 2026
327a8e5
Merge branch 'master' into blocking-loop
Simn Feb 10, 2026
fa16aec
Merge branch 'master' into blocking-loop
Simn Feb 10, 2026
5cd8971
try with a simple semaphore
Simn Feb 10, 2026
0da4896
Merge branch 'master' into blocking-loop
Simn Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/hxcoro/dispatchers/IShutDown.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package hxcoro.dispatchers;

interface IShutDown {
function shutDown():Void;
}
2 changes: 1 addition & 1 deletion src/hxcoro/dispatchers/LuvDispatcher.cpp.hx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import haxe.coro.schedulers.IScheduler;
import haxe.ds.Option;
import hxcoro.schedulers.LuvScheduler;

class LuvDispatcher extends Dispatcher
class LuvDispatcher extends Dispatcher implements IShutDown
{
final loop:LuvLoop;
final workQueue:AsyncDeque<()->Void>;
Expand Down
15 changes: 13 additions & 2 deletions src/hxcoro/run/LoopRun.hx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package hxcoro.run;

import haxe.Timer;
import haxe.coro.context.Context;
import hxcoro.exceptions.TimeoutException;
import hxcoro.schedulers.ILoop;
import hxcoro.task.CoroTask;
import hxcoro.task.ICoroTask;
Expand Down Expand Up @@ -40,9 +39,21 @@ class LoopRun {
that are already running.
**/
static function awaitTaskCompletion<T>(loop:ILoop, task:ICoroTask<T>) {
#if target.threaded
final semaphore = new sys.thread.Semaphore(0);
task.onCompletion((_, _) -> {
loop.wakeUp();
semaphore.release();
});
#end

while (task.isActive()) {
loop.loop();
loop.loop(Once);
}

#if target.threaded
semaphore.acquire();
#end
}

/**
Expand Down
16 changes: 12 additions & 4 deletions src/hxcoro/run/Setup.hx
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package hxcoro.run;

import haxe.coro.schedulers.IScheduler;
import hxcoro.dispatchers.IShutDown;
import haxe.coro.BaseContinuation;
import haxe.coro.context.Context;
import haxe.coro.dispatchers.Dispatcher;
Expand Down Expand Up @@ -57,19 +59,25 @@ class Setup {

#if (cpp && hxcpp_luv_io)

static public function createLuv() {
static public function createLuvGen(createDispatcher:(cpp.luv.Luv.LuvLoop, IScheduler) -> Dispatcher) {
final loop = cpp.luv.Luv.allocLoop();
final scheduler = new hxcoro.schedulers.LuvScheduler(loop);
final dispatcher = new hxcoro.dispatchers.LuvDispatcher(loop, scheduler);
final dispatcher = createDispatcher(loop, scheduler);
function finalize() {
dispatcher.shutDown();
scheduler.shutDown();
if (dispatcher is IShutDown) {
(cast dispatcher : IShutDown).shutDown();
}
cpp.luv.Luv.stopLoop(loop);
cpp.luv.Luv.shutdownLoop(loop);
cpp.luv.Luv.freeLoop(loop);
}
return new LoopSetup(scheduler, dispatcher, finalize);
}

static public function createLuv() {
return createLuvGen((uvLoop, loop) -> new hxcoro.dispatchers.LuvDispatcher(uvLoop, loop));
}

#elseif interp

static public function createLuv() {
Expand Down
20 changes: 19 additions & 1 deletion src/hxcoro/schedulers/EventLoopScheduler.hx
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package hxcoro.schedulers;

import hxcoro.concurrent.BackOff;
import hxcoro.schedulers.ILoop;
import haxe.coro.IContinuation;
import haxe.Timer;
import haxe.Int64;
Expand Down Expand Up @@ -40,8 +42,9 @@ class HeapScheduler implements IScheduler {
}

class EventLoopScheduler extends HeapScheduler implements ILoop {
public function loop() {
function loopOnce() {
final currentTime = now();
var didSomething = false;
while (true) {
futureMutex.acquire();
var minimum = heap.minimum();
Expand All @@ -52,12 +55,27 @@ class EventLoopScheduler extends HeapScheduler implements ILoop {
heap.extract();
futureMutex.release();

didSomething = true;
minimum.dispatch();
}

futureMutex.release();
return didSomething;
}

public function loop(runMode:RunMode) {
switch (runMode) {
case NoWait:
loopOnce();
case Once:
while (!loopOnce()) {
BackOff.backOff();
}
}
}

public function wakeUp() {}

public function toString() {
return '[EventLoopScheduler]';
}
Expand Down
8 changes: 7 additions & 1 deletion src/hxcoro/schedulers/ILoop.hx
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package hxcoro.schedulers;

enum abstract RunMode(Int) {
final Once = 1;
final NoWait = 2;
}

interface ILoop {
function loop():Void;
function loop(runMode:RunMode):Void;
function wakeUp():Void;
}
8 changes: 6 additions & 2 deletions src/hxcoro/schedulers/LuvScheduler.cpp.hx
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,12 @@ class LuvScheduler implements IScheduler implements ILoop {
consumeDeque(closeQueue, event -> event.stop());
}

public function loop() {
cpp.luv.Luv.runLoop(uvLoop, NoWait);
public function loop(runMode:RunMode) {
cpp.luv.Luv.runLoop(uvLoop, cast runMode);
}

public function wakeUp() {
@:privateAccess eventQueue.async.send();
}

public function shutDown() {
Expand Down
8 changes: 6 additions & 2 deletions src/hxcoro/schedulers/LuvScheduler.eval.hx
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ class LuvScheduler implements IScheduler implements ILoop {
return uvLoop.now().toInt64();
}

public function loop() {
uvLoop.run(NOWAIT);
public function loop(runMode:ILoop.RunMode) {
uvLoop.run(cast runMode);
}

public function wakeUp() {
@:privateAccess eventQueue.async.send();
}

inline function consumeDeque<T>(deque:AsyncDeque<T>, f:T->Void) {
Expand Down
53 changes: 41 additions & 12 deletions src/hxcoro/schedulers/ThreadAwareScheduler.hx
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package hxcoro.schedulers;

import haxe.coro.IContinuation;
import haxe.Exception;
import sys.thread.Thread;
import hxcoro.ds.CircularVector;
import haxe.Timer;
import haxe.Int64;
import sys.thread.Tls;
import sys.thread.Deque;
import haxe.exceptions.ArgumentException;
import haxe.Timer;
import haxe.coro.IContinuation;
import haxe.coro.schedulers.IScheduler;
import haxe.coro.schedulers.ISchedulerHandle;
import haxe.exceptions.ArgumentException;
import hxcoro.concurrent.AtomicInt;
import hxcoro.ds.CircularVector;
import hxcoro.schedulers.ILoop;
import sys.thread.Deque;
import sys.thread.Semaphore;
import sys.thread.Thread;
import sys.thread.Tls;

private class CircularQueueData {
public var read:Int;
Expand Down Expand Up @@ -82,13 +84,15 @@ class ThreadAwareScheduler implements IScheduler implements ILoop {
final queueTls:Tls<TlsQueue>;
final queueDeque:Deque<TlsQueueEvent>;
final rootEvent:ScheduledEvent;
final semaphore:Semaphore;
var firstQueue:Null<TlsQueue>;

public function new() {
heap = new MinimumHeap();
queueTls = new Tls();
queueDeque = new Deque();
rootEvent = new ScheduledEvent(null, 0);
semaphore = new Semaphore(0);
}

function getTlsQueue():TlsQueue {
Expand All @@ -114,9 +118,8 @@ class ThreadAwareScheduler implements IScheduler implements ILoop {
}

final event = new ScheduledEvent(cont, now() + ms);

getTlsQueue().add(event);

semaphore.release();
return event;
}

Expand Down Expand Up @@ -151,7 +154,7 @@ class ThreadAwareScheduler implements IScheduler implements ILoop {
}
}

public function loop() {
function loopNoWait() {
final currentTime = now();

// First we consume the coordination deque so we know all queues.
Expand Down Expand Up @@ -193,15 +196,41 @@ class ThreadAwareScheduler implements IScheduler implements ILoop {
current = current.next;
}
var event:Null<ScheduledEvent> = rootEvent;
var didSomething = false;
while (true) {
event = event.next;
if (event == null) {
break;
return didSomething;
}
didSomething = true;
event.dispatch();
}
}

public function loop(runMode:RunMode) {
var wasWokenUp = false;
while(true) {
if (loopNoWait() || wasWokenUp || runMode == NoWait) {
// If we did something we're fine.
return;
}
final minimum = heap.minimum();
if (minimum != null) {
// If there's a scheduled event, its due time is our max timeout time.
final timeout:Float = (Int64.toInt(minimum.runTime - now())) / 1000;
semaphore.tryAcquire(timeout);
} else {
// Otherwise we wait until something happens.
semaphore.acquire();
}
wasWokenUp = true;
}
}

public function wakeUp() {
semaphore.release();
}

public function dump() {
Sys.println("ThreadAwareScheduler");
Sys.println('\theap minimum: ${heap.minimum()}');
Expand Down
7 changes: 5 additions & 2 deletions src/hxcoro/schedulers/VirtualTimeScheduler.hx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package hxcoro.schedulers;

import hxcoro.schedulers.ILoop;
import haxe.Int64;
import haxe.exceptions.ArgumentException;

Expand Down Expand Up @@ -55,7 +56,9 @@ class VirtualTimeScheduler extends EventLoopScheduler.HeapScheduler implements I
return hasMoreEvents;
}

public function loop() {
while (advanceBy(1)) {}
public function loop(runMode:RunMode) {
advanceBy(1);
}

public function wakeUp() {}
}
4 changes: 2 additions & 2 deletions tests/src/ds/channels/TestBoundedChannel.hx
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TestBoundedChannel extends utest.Test {
});
task.start();
while (task.isActive()) {
scheduler.loop();
scheduler.loop(NoWait);
scheduler.advanceBy(1);
}
final expected = [for (i in 0...size + 1) i];
Expand Down Expand Up @@ -231,7 +231,7 @@ class TestBoundedChannel extends utest.Test {
});
task.start();
while (task.isActive()) {
scheduler.loop();
scheduler.loop(NoWait);
scheduler.advanceBy(1);
}
Assert.same([None, Some(1), None, Some(2), Some(3), None], task.get());
Expand Down
24 changes: 6 additions & 18 deletions tests/src/run/TestEntrypoints.hx
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class TestEntrypoints extends utest.Test {
assertLastMessage("Launched Task 1 says hello");
assertNoCurrentMessage();

loop.loop();
loop.loop(Once);

assertAwaitLastMessage("Launched Task 1 says goodbye");
assertNoCurrentMessage();
Expand All @@ -93,7 +93,7 @@ class TestEntrypoints extends utest.Test {
assertNoCurrentMessage();

// Created Task 2 is still missing, but it was never started so running the loop at this point doesn't do anything.
loop.loop();
loop.loop(NoWait);

assertNoCurrentMessage();

Expand All @@ -103,7 +103,7 @@ class TestEntrypoints extends utest.Test {
assertLastMessage("Created Task 2 says hello");

// But with this we finally get it.
loop.loop();
loop.loop(Once);

assertAwaitLastMessage("Created Task 2 says goodbye");
assertNoCurrentMessage();
Expand All @@ -112,6 +112,7 @@ class TestEntrypoints extends utest.Test {
function runSuite(context:Context, loop:ILoop) {
launchTask(context, loop);
createTasks(context, loop);
loop.loop(NoWait);
}

public function testEventTrampoline() {
Expand Down Expand Up @@ -140,21 +141,8 @@ class TestEntrypoints extends utest.Test {

#if (cpp && hxcpp_luv_io)

function setupLuv(createDispatcher:(cpp.luv.Luv.LuvLoop, IScheduler) -> Dispatcher) {
final loop = cpp.luv.Luv.allocLoop();
final scheduler = new hxcoro.schedulers.LuvScheduler(loop);
final dispatcher = createDispatcher(loop, scheduler);
function finalize() {
scheduler.shutDown();
cpp.luv.Luv.stopLoop(loop);
cpp.luv.Luv.shutdownLoop(loop);
cpp.luv.Luv.freeLoop(loop);
}
return new LoopSetup(scheduler, dispatcher, finalize);
}

public function testLuvTrampoline() {
final setup = setupLuv((uvLoop, loop) -> new TrampolineDispatcher(loop));
final setup = Setup.createLuvGen((uvLoop, loop) -> new TrampolineDispatcher(loop));
final context = setup.createContext();
runSuite(context, setup.loop);
setup.close();
Expand All @@ -163,7 +151,7 @@ class TestEntrypoints extends utest.Test {

public function testLuvThreadPool() {
final pool = new hxcoro.thread.FixedThreadPool(1);
final setup = setupLuv((uvLoop, loop) -> new ThreadPoolDispatcher(loop, pool));
final setup = Setup.createLuvGen((uvLoop, loop) -> new ThreadPoolDispatcher(loop, pool));
final context = setup.createContext();
runSuite(context, setup.loop);
setup.close();
Expand Down
Loading