Skip to content

Conversation

@Simn
Copy link
Member

@Simn Simn commented Jan 25, 2026

And for my next trick, I make all your scheduler events disappear... This implements the scheduler side on top of #75. I'll write more about it later - the idea should be sound but something is still not quite right with the implementation because I'm getting hangs in some samples. That could of course also again be the more parallelism = more problems situation, but we'll see.

@Simn Simn changed the base branch from WsQueue to master January 26, 2026 21:31
@Simn Simn force-pushed the ThreadAwareScheduler branch from 403d86f to 85da098 Compare January 26, 2026 21:38
@Simn
Copy link
Member Author

Simn commented Jan 26, 2026

I was hoping that #81 would magically fix my problems here, but alas... My current nemesis is this sample on the JVM target:

import haxe.Timer;
import hxcoro.CoroRun;
import hxcoro.Coro.*;

function main() {
	for (numTasks in [1, 10, 100, 1_000, 10_000]) {
		final stamp = Timer.milliseconds();
		var racyInt = 0;
		CoroRun.runScoped(node -> {
			for (i in 0...numTasks) {
				node.async(node -> {
					racyInt++;
					var busy = 1_000;
					var localInt = 0;
					while (busy-- > 0) {
						yield();
					}
				});
			}
		});
		trace('numTasks: $numTasks, run-time: ${Timer.milliseconds() - stamp}ms, racyInt: $racyInt');
	}
}
source/Main.hx:21: numTasks: 1, run-time: 52ms, racyInt: 1
source/Main.hx:21: numTasks: 10, run-time: 24ms, racyInt: 9
source/Main.hx:21: numTasks: 100, run-time: 117ms, racyInt: 100
source/Main.hx:21: numTasks: 1000, run-time: 509ms, racyInt: 985
CoroTask 1116
        state: 2
        firstChild: hxcoro.task.CoroTaskWithLambda@15327b79
        numActiveChildren: 3
        resumeStatus: 2
        result: null
        error: null
FixedThreadPool
        isShutdown: false
        total worker loops: 31989688
        total worker dispatches: 30015327
        workers (active/available/total): 0/10/10
        queue 0: (r 10008445, w 10008445, l 8192):
        worker 1(Waiting), dispatch/loop: 11%/11%, queue: (r 0, w 0, l 16):
        worker 2(Waiting), dispatch/loop: 10%/10%, queue: (r 0, w 0, l 16):
        worker 3(Waiting), dispatch/loop: 10%/10%, queue: (r 0, w 0, l 16):
        worker 4(Waiting), dispatch/loop: 10%/10%, queue: (r 0, w 0, l 16):
        worker 5(Waiting), dispatch/loop: 9%/9%, queue: (r 0, w 0, l 16):
        worker 6(Waiting), dispatch/loop: 9%/9%, queue: (r 0, w 0, l 16):
        worker 7(Waiting), dispatch/loop: 9%/9%, queue: (r 0, w 0, l 16):
        worker 8(Waiting), dispatch/loop: 9%/9%, queue: (r 0, w 0, l 16):
        worker 9(Waiting), dispatch/loop: 9%/9%, queue: (r 0, w 0, l 16):
        worker 10(Waiting), dispatch/loop: 9%/9%, queue: (r 0, w 0, l 16):
Active child: hxcoro.task.CoroTaskWithLambda@f2a0b8e
CoroTask 8129
        state: 4
        firstChild: null
        numActiveChildren: 0
        resumeStatus: 1
        result: null
        error: Cancellation
Active child: hxcoro.task.CoroTaskWithLambda@593634ad
CoroTask 4720
        state: 4
        firstChild: null
        numActiveChildren: 0
        resumeStatus: 1
        result: null
        error: Cancellation
Active child: hxcoro.task.CoroTaskWithLambda@20fa23c1
CoroTask 2138
        state: 4
        firstChild: null
        numActiveChildren: 0
        resumeStatus: 1
        result: null
        error: Cancellation

This tells me that there are 3/10000 children who were never resumed. The most obvious explanation seems to be that the scheduler "loses" their yield events, so that's probably not it.

@Simn
Copy link
Member Author

Simn commented Jan 27, 2026

I have locally added a duplication check to BaseContinuation.onDispatch:

	var dispatchCounter = new AtomicInt(0);

	public function onDispatch() {
		if (dispatchCounter.exchange(1) != 0) {
			Sys.println("Duplicate dispatch: " + (this : Dynamic));
			return;
		}

And the same for resume in the resumeResult != SuspensionResult.suspended branch. Both of them show up:

Duplicate dispatch: hxcoro._Coro.HxCoro_hxcoro_Coro_yield@7718e177
haxe/coro/BaseContinuation.hx:97: Duplicate resume, hxcoro._Coro.HxCoro_hxcoro_Coro_yield@4de6ad4c, null, null, null, null, Invalid coroutine state, null

There doesn't appear to be a strict relation between the two, i.e. sometimes I get one, sometimes the other, sometimes both. The dispatch one could suggest that the pool steals the same object twice (and misses one in its place). The resume one I have no idea right now.

There also doesn't appear to be a relation between the number of duplicate calls and the number of hanging children, which is a little surprising too.

@Simn
Copy link
Member Author

Simn commented Jan 27, 2026

Also, to be clear, while I cannot reproduce this behavior on master, there's a pretty high chance that we get bottlenecked by futureMutex so hard that no parallelism problems ever exist. So I'm reluctant to blame this on the new scheduler.

@Simn
Copy link
Member Author

Simn commented Jan 27, 2026

Now that this is starting to mostly work (the CI failures are probably not directly related), I'll explain how the implementation works because this is actually quite neat.

The main idea is that every thread has a per-scheduler TLS queue. When schedule is called, the current thread checks if this TLS value is set. If it is, the event is added to it and the function returns. If it's not, the queue is created, installed as a TLS value and then registered with the scheduler by adding a Add(queue) event to its queueDeque deque. The event is then added to the queue and again the scheduler is done.

The main thread running the loop now performs four steps:

  1. It checks the queueDeque for new queues and builds a linked list from them.
  2. It extracts all expired timers from the heap and adds them to a toDispatch array (I'll probably turn that into a linked list too later).
  3. For each of the registered queues, it keeps popping elements until it finds null. Elements which should execute "now" are added to the toDispatch array, the others are inserted into the heap and will be looked at by future iterations in step 2.
  4. It iterates the array and dispatches everything.

With this approach we don't need any synchronization between schedule and run because it's effectively working with single-producer-single-consumer-queues. At worst the running loop misses an event which is added during its processing and will only see it during its next iteration, but such things can always happen with threaded execution.

For the sample above, C++ in particular loves this:

Before:

source/Main.hx:21: numTasks: 1, run-time: 16ms, racyInt: 1
source/Main.hx:21: numTasks: 10, run-time: 35ms, racyInt: 10
source/Main.hx:21: numTasks: 100, run-time: 243ms, racyInt: 100
source/Main.hx:21: numTasks: 1000, run-time: 2379ms, racyInt: 1000
source/Main.hx:21: numTasks: 10000, run-time: 31594ms, racyInt: 9999

After:

source/Main.hx:21: numTasks: 1, run-time: 10ms, racyInt: 1
source/Main.hx:21: numTasks: 10, run-time: 13ms, racyInt: 10
source/Main.hx:21: numTasks: 100, run-time: 82ms, racyInt: 100
source/Main.hx:21: numTasks: 1000, run-time: 636ms, racyInt: 996
source/Main.hx:21: numTasks: 10000, run-time: 8938ms, racyInt: 9996

The JVM thread activity also greatly appreciates this:

jprofiler_v8ckMKQE3a

HL once again doesn't seem to care one way or another. I'm beginning to think that the entire target isn't really doing well with threads...

@Simn Simn marked this pull request as ready for review January 27, 2026 12:28
@Simn
Copy link
Member Author

Simn commented Jan 27, 2026

I'm pasting here how to profile on HL so that I don't have to dig it up in Discord all the time:

import haxe.Timer;
import hxcoro.CoroRun;
import hxcoro.Coro.*;

function doProf(args) {
	switch (args) {
		case "start":
			hl.Profile.event(-7, "" + 10000); // setup
			hl.Profile.event(-3); // clear data
			hl.Profile.event(-5); // resume all
		case "dump":
			hl.Profile.event(-6); // save dump
			hl.Profile.event(-4); // pause all
			hl.Profile.event(-3); // clear data
		default:
	}
}

function main() {
	doProf("start");
	final numTasks = 100;
	final stamp = Timer.milliseconds();
	var racyInt = 0;
	CoroRun.runScoped(node -> {
		for (i in 0...numTasks) {
			node.async(node -> {
				racyInt++;
				var busy = 1_000;
				while (busy-- > 0) {
					yield();
				}
			});
		}
	});
	trace('numTasks: $numTasks, run-time: ${Timer.milliseconds() - stamp}ms, racyInt: $racyInt');
	doProf("dump");
}

This makes a hlprofile.dump in the project root. Then put profiler.hl next to it, which is the result of compiling https://github.com/HaxeFoundation/hashlink/blob/master/other/haxelib/profiler.hxml. After running that, hlprofile.dump can be dragged into Chrome -> F12 -> Performance.

And then it usually just shows that the GC can't handle many short-lived objects very well:

brave_qOAjNlDWCT

At least the threads do what I want them to do and are spending their time in the right place:

brave_EX72za35lR

@Simn
Copy link
Member Author

Simn commented Jan 27, 2026

For some reason, everyone really hates the Issue37 tests. I thought it might be another cancellation problem, but there have also been failures for the non-cancelling version. They always seem to end up in the same state:

    testNotCancelling
CoroTask 621
	state: Completing
	firstChild: CoroTaskWithLambda
	numActiveChildren: 1
	resumeStatus: Resumed
	result: null
	error: null
Active child: CoroTaskWithLambda
CoroTask 796
	state: Running
	firstChild: null
	numActiveChildren: 0
	resumeStatus: Unresumed
	result: null
	error: null

From the task ID we can infer that this is one of the reader tasks, because the 100 writer tasks are created first and would go to 721. That means that we never get out of this loop:

						while (channel.reader.waitForRead()) {
							delay(1);
							if (channel.reader.tryRead(o)) {
								aggregateValue.add(o.get());
								break;
							} else {
								continue;
							}
						}

I know this is an awkward pattern with the delay-call between the wait and the try, but I do think it should work...

@Simn
Copy link
Member Author

Simn commented Jan 27, 2026

So it's not (only) about the delay calls, HL just segfaulted without them:

Running issues.hf.Issue37...
    testRacing
    testNotCancelling
SIGNAL 11
fun$2871(src/issues/hf/Issue37.hx:79)

That last line points to the waitForRead call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants