Skip to content

Commit 84615ce

Browse files
author
Aidan Lee
committed
Add libuv work pool dispatcher
1 parent 5ac44fc commit 84615ce

File tree

2 files changed

+42
-9
lines changed

2 files changed

+42
-9
lines changed

src/hxcoro/CoroRun.hx

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,19 +126,17 @@ class CoroRun {
126126

127127
static public function runWith<T>(context:Context, lambda:NodeLambda<T>):T {
128128
final loop = cpp.luv.Luv.allocLoop();
129-
final scheduler = new hxcoro.schedulers.LuvScheduler(loop);
130-
final pool = new hxcoro.thread.FixedThreadPool(2);
131-
final dispatcher = new hxcoro.dispatchers.ThreadPoolDispatcher(scheduler, pool);
129+
final dispatcher = new hxcoro.schedulers.LuvScheduler.LuvDispatcher(loop);
132130

133131
final scope = new CoroTask(context.clone().with(dispatcher), CoroTask.CoroScopeStrategy);
134132
scope.onCompletion((_, _) -> {
135-
scheduler.shutdown();
133+
dispatcher.shutdown();
136134
});
137135
scope.runNodeLambda(lambda);
138136

139137
cpp.luv.Luv.runLoop(loop, Default);
140138
cpp.luv.Luv.freeLoop(loop);
141-
pool.shutdown();
139+
// dispatcher.shutdown();
142140

143141
switch (scope.getError()) {
144142
case null:

src/hxcoro/schedulers/LuvScheduler.cpp.hx

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package hxcoro.schedulers;
22

3+
import cpp.luv.Work;
4+
import haxe.coro.dispatchers.Dispatcher;
35
import haxe.atomic.AtomicInt;
46
import sys.thread.Deque;
57
import haxe.Int64;
@@ -131,10 +133,10 @@ class LuvScheduler implements IScheduler {
131133
/**
132134
Creates a new `LuvScheduler` instance.
133135
**/
134-
public function new(loop:LuvLoop) {
135-
this.loop = loop;
136-
eventQueue = new AsyncDeque(loop, loopEvents);
137-
closeQueue = new AsyncDeque(loop, loopCloses);
136+
public function new(loop:LuvLoop, eventQueue:AsyncDeque<LuvTimerEvent>, closeQueue:AsyncDeque<LuvTimerEvent>) {
137+
this.loop = loop;
138+
this.eventQueue = eventQueue;
139+
this.closeQueue = closeQueue;
138140
}
139141

140142
@:inheritDoc
@@ -148,6 +150,32 @@ class LuvScheduler implements IScheduler {
148150
public function now() {
149151
return haxe.Timer.milliseconds(); // TODO: where?
150152
}
153+
}
154+
155+
class LuvDispatcher extends Dispatcher
156+
{
157+
final loop:LuvLoop;
158+
final s : LuvScheduler;
159+
final workQueue:AsyncDeque<()->Void>;
160+
final eventQueue:AsyncDeque<LuvTimerEvent>;
161+
final closeQueue:AsyncDeque<LuvTimerEvent>;
162+
163+
function get_scheduler():IScheduler {
164+
return s;
165+
}
166+
167+
public function new(loop) {
168+
this.loop = loop;
169+
170+
workQueue = new AsyncDeque(loop, loopWork);
171+
eventQueue = new AsyncDeque(loop, loopEvents);
172+
closeQueue = new AsyncDeque(loop, loopCloses);
173+
s = new LuvScheduler(loop, eventQueue, closeQueue);
174+
}
175+
176+
public function dispatch(obj:IDispatchObject) {
177+
workQueue.add(obj.onDispatch);
178+
}
151179

152180
inline function consumeDeque<T>(deque:AsyncDeque<T>, f:T->Void) {
153181
do {
@@ -167,7 +195,14 @@ class LuvScheduler implements IScheduler {
167195
consumeDeque(closeQueue, event -> event.stop());
168196
}
169197

198+
function loopWork() {
199+
consumeDeque(workQueue, event -> {
200+
Work.queue(loop, event);
201+
});
202+
}
203+
170204
public function shutdown() {
205+
workQueue.close();
171206
eventQueue.close();
172207
closeQueue.close();
173208
loopCloses();

0 commit comments

Comments
 (0)