Skip to content

Commit b21dddd

Browse files
authored
Implement blocking loops instead of busy ones (#106)
* try a different approach for blocking loops * maybe like this * remove unneeded code * add wakeUp function * try waking up via an async for luv schedulers * try to be more accurate * disable issue 37 to avoid distractions * avoid race condition for now * track wokenUp state explicitly * don't try to wake up C++'s luv scheduler like that * try with a sanity mutex * Revert "disable issue 37 to avoid distractions" This reverts commit a367b16. * Reapply "disable issue 37 to avoid distractions" This reverts commit a50a2a0. * try with ThreadPoolDispatcher again * loop at end of entrypoint tests to avoid dangling events * how's this? * it just doesn't work the way I want it to * use semaphore to ensure onCompletion is done before returning * add IShutDown to handle dispatcher shutdown * add runMode argument to loop * neko works now * try with a simple semaphore
1 parent 8582837 commit b21dddd

File tree

13 files changed

+127
-51
lines changed

13 files changed

+127
-51
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package hxcoro.dispatchers;
2+
3+
interface IShutDown {
4+
function shutDown():Void;
5+
}

src/hxcoro/dispatchers/LuvDispatcher.cpp.hx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import haxe.coro.schedulers.IScheduler;
88
import haxe.ds.Option;
99
import hxcoro.schedulers.LuvScheduler;
1010

11-
class LuvDispatcher extends Dispatcher
11+
class LuvDispatcher extends Dispatcher implements IShutDown
1212
{
1313
final loop:LuvLoop;
1414
final workQueue:AsyncDeque<()->Void>;

src/hxcoro/run/LoopRun.hx

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package hxcoro.run;
22

33
import haxe.Timer;
44
import haxe.coro.context.Context;
5-
import hxcoro.exceptions.TimeoutException;
65
import hxcoro.schedulers.ILoop;
76
import hxcoro.task.CoroTask;
87
import hxcoro.task.ICoroTask;
@@ -40,9 +39,21 @@ class LoopRun {
4039
that are already running.
4140
**/
4241
static function awaitTaskCompletion<T>(loop:ILoop, task:ICoroTask<T>) {
42+
#if target.threaded
43+
final semaphore = new sys.thread.Semaphore(0);
44+
task.onCompletion((_, _) -> {
45+
loop.wakeUp();
46+
semaphore.release();
47+
});
48+
#end
49+
4350
while (task.isActive()) {
44-
loop.loop();
51+
loop.loop(Once);
4552
}
53+
54+
#if target.threaded
55+
semaphore.acquire();
56+
#end
4657
}
4758

4859
/**

src/hxcoro/run/Setup.hx

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package hxcoro.run;
22

3+
import haxe.coro.schedulers.IScheduler;
4+
import hxcoro.dispatchers.IShutDown;
35
import haxe.coro.BaseContinuation;
46
import haxe.coro.context.Context;
57
import haxe.coro.dispatchers.Dispatcher;
@@ -57,19 +59,25 @@ class Setup {
5759

5860
#if (cpp && hxcpp_luv_io)
5961

60-
static public function createLuv() {
62+
static public function createLuvGen(createDispatcher:(cpp.luv.Luv.LuvLoop, IScheduler) -> Dispatcher) {
6163
final loop = cpp.luv.Luv.allocLoop();
6264
final scheduler = new hxcoro.schedulers.LuvScheduler(loop);
63-
final dispatcher = new hxcoro.dispatchers.LuvDispatcher(loop, scheduler);
65+
final dispatcher = createDispatcher(loop, scheduler);
6466
function finalize() {
65-
dispatcher.shutDown();
67+
scheduler.shutDown();
68+
if (dispatcher is IShutDown) {
69+
(cast dispatcher : IShutDown).shutDown();
70+
}
6671
cpp.luv.Luv.stopLoop(loop);
67-
cpp.luv.Luv.shutdownLoop(loop);
6872
cpp.luv.Luv.freeLoop(loop);
6973
}
7074
return new LoopSetup(scheduler, dispatcher, finalize);
7175
}
7276

77+
static public function createLuv() {
78+
return createLuvGen((uvLoop, loop) -> new hxcoro.dispatchers.LuvDispatcher(uvLoop, loop));
79+
}
80+
7381
#elseif interp
7482

7583
static public function createLuv() {

src/hxcoro/schedulers/EventLoopScheduler.hx

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package hxcoro.schedulers;
22

3+
import hxcoro.concurrent.BackOff;
4+
import hxcoro.schedulers.ILoop;
35
import haxe.coro.IContinuation;
46
import haxe.Timer;
57
import haxe.Int64;
@@ -40,8 +42,9 @@ class HeapScheduler implements IScheduler {
4042
}
4143

4244
class EventLoopScheduler extends HeapScheduler implements ILoop {
43-
public function loop() {
45+
function loopOnce() {
4446
final currentTime = now();
47+
var didSomething = false;
4548
while (true) {
4649
futureMutex.acquire();
4750
var minimum = heap.minimum();
@@ -52,12 +55,27 @@ class EventLoopScheduler extends HeapScheduler implements ILoop {
5255
heap.extract();
5356
futureMutex.release();
5457

58+
didSomething = true;
5559
minimum.dispatch();
5660
}
5761

5862
futureMutex.release();
63+
return didSomething;
5964
}
6065

66+
public function loop(runMode:RunMode) {
67+
switch (runMode) {
68+
case NoWait:
69+
loopOnce();
70+
case Once:
71+
while (!loopOnce()) {
72+
BackOff.backOff();
73+
}
74+
}
75+
}
76+
77+
public function wakeUp() {}
78+
6179
public function toString() {
6280
return '[EventLoopScheduler]';
6381
}

src/hxcoro/schedulers/ILoop.hx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package hxcoro.schedulers;
22

3+
enum abstract RunMode(Int) {
4+
final Once = 1;
5+
final NoWait = 2;
6+
}
7+
38
interface ILoop {
4-
function loop():Void;
9+
function loop(runMode:RunMode):Void;
10+
function wakeUp():Void;
511
}

src/hxcoro/schedulers/LuvScheduler.cpp.hx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,12 @@ class LuvScheduler implements IScheduler implements ILoop {
162162
consumeDeque(closeQueue, event -> event.stop());
163163
}
164164

165-
public function loop() {
166-
cpp.luv.Luv.runLoop(uvLoop, NoWait);
165+
public function loop(runMode:RunMode) {
166+
cpp.luv.Luv.runLoop(uvLoop, cast runMode);
167+
}
168+
169+
public function wakeUp() {
170+
@:privateAccess eventQueue.async.send();
167171
}
168172

169173
public function shutDown() {

src/hxcoro/schedulers/LuvScheduler.eval.hx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,12 @@ class LuvScheduler implements IScheduler implements ILoop {
143143
return uvLoop.now().toInt64();
144144
}
145145

146-
public function loop() {
147-
uvLoop.run(NOWAIT);
146+
public function loop(runMode:ILoop.RunMode) {
147+
uvLoop.run(cast runMode);
148+
}
149+
150+
public function wakeUp() {
151+
@:privateAccess eventQueue.async.send();
148152
}
149153

150154
inline function consumeDeque<T>(deque:AsyncDeque<T>, f:T->Void) {

src/hxcoro/schedulers/ThreadAwareScheduler.hx

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
package hxcoro.schedulers;
22

3-
import haxe.coro.IContinuation;
4-
import haxe.Exception;
5-
import sys.thread.Thread;
6-
import hxcoro.ds.CircularVector;
7-
import haxe.Timer;
83
import haxe.Int64;
9-
import sys.thread.Tls;
10-
import sys.thread.Deque;
11-
import haxe.exceptions.ArgumentException;
4+
import haxe.Timer;
5+
import haxe.coro.IContinuation;
126
import haxe.coro.schedulers.IScheduler;
137
import haxe.coro.schedulers.ISchedulerHandle;
8+
import haxe.exceptions.ArgumentException;
9+
import hxcoro.concurrent.AtomicInt;
10+
import hxcoro.ds.CircularVector;
11+
import hxcoro.schedulers.ILoop;
12+
import sys.thread.Deque;
13+
import sys.thread.Semaphore;
14+
import sys.thread.Thread;
15+
import sys.thread.Tls;
1416

1517
private class CircularQueueData {
1618
public var read:Int;
@@ -82,13 +84,15 @@ class ThreadAwareScheduler implements IScheduler implements ILoop {
8284
final queueTls:Tls<TlsQueue>;
8385
final queueDeque:Deque<TlsQueueEvent>;
8486
final rootEvent:ScheduledEvent;
87+
final semaphore:Semaphore;
8588
var firstQueue:Null<TlsQueue>;
8689

8790
public function new() {
8891
heap = new MinimumHeap();
8992
queueTls = new Tls();
9093
queueDeque = new Deque();
9194
rootEvent = new ScheduledEvent(null, 0);
95+
semaphore = new Semaphore(0);
9296
}
9397

9498
function getTlsQueue():TlsQueue {
@@ -114,9 +118,8 @@ class ThreadAwareScheduler implements IScheduler implements ILoop {
114118
}
115119

116120
final event = new ScheduledEvent(cont, now() + ms);
117-
118121
getTlsQueue().add(event);
119-
122+
semaphore.release();
120123
return event;
121124
}
122125

@@ -151,7 +154,7 @@ class ThreadAwareScheduler implements IScheduler implements ILoop {
151154
}
152155
}
153156

154-
public function loop() {
157+
function loopNoWait() {
155158
final currentTime = now();
156159

157160
// First we consume the coordination deque so we know all queues.
@@ -193,15 +196,41 @@ class ThreadAwareScheduler implements IScheduler implements ILoop {
193196
current = current.next;
194197
}
195198
var event:Null<ScheduledEvent> = rootEvent;
199+
var didSomething = false;
196200
while (true) {
197201
event = event.next;
198202
if (event == null) {
199-
break;
203+
return didSomething;
200204
}
205+
didSomething = true;
201206
event.dispatch();
202207
}
203208
}
204209

210+
public function loop(runMode:RunMode) {
211+
var wasWokenUp = false;
212+
while(true) {
213+
if (loopNoWait() || wasWokenUp || runMode == NoWait) {
214+
// If we did something we're fine.
215+
return;
216+
}
217+
final minimum = heap.minimum();
218+
if (minimum != null) {
219+
// If there's a scheduled event, its due time is our max timeout time.
220+
final timeout:Float = (Int64.toInt(minimum.runTime - now())) / 1000;
221+
semaphore.tryAcquire(timeout);
222+
} else {
223+
// Otherwise we wait until something happens.
224+
semaphore.acquire();
225+
}
226+
wasWokenUp = true;
227+
}
228+
}
229+
230+
public function wakeUp() {
231+
semaphore.release();
232+
}
233+
205234
public function dump() {
206235
Sys.println("ThreadAwareScheduler");
207236
Sys.println('\theap minimum: ${heap.minimum()}');

src/hxcoro/schedulers/VirtualTimeScheduler.hx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package hxcoro.schedulers;
22

3+
import hxcoro.schedulers.ILoop;
34
import haxe.Int64;
45
import haxe.exceptions.ArgumentException;
56

@@ -55,7 +56,9 @@ class VirtualTimeScheduler extends EventLoopScheduler.HeapScheduler implements I
5556
return hasMoreEvents;
5657
}
5758

58-
public function loop() {
59-
while (advanceBy(1)) {}
59+
public function loop(runMode:RunMode) {
60+
advanceBy(1);
6061
}
62+
63+
public function wakeUp() {}
6164
}

0 commit comments

Comments
 (0)