Skip to content

Commit 8b0ce0b

Browse files
committed
Scheduler for async resumption
1 parent afd150f commit 8b0ce0b

File tree

3 files changed

+99
-72
lines changed

3 files changed

+99
-72
lines changed

src/Control/Monad/Aff.js

Lines changed: 88 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -100,21 +100,50 @@ var Aff = function () {
100100
}
101101
}
102102

103+
var schedule = function () {
104+
var limit = 1024;
105+
var size = 0;
106+
var ix = 0;
107+
var queue = new Array(limit);
108+
var draining = false;
109+
110+
return function (cb) {
111+
var i, thunk;
112+
if (size === limit) {
113+
throw new Error("[Aff] Scheduler full");
114+
}
115+
queue[(ix + size) % limit] = cb;
116+
size++;
117+
118+
if (!draining) {
119+
draining = true;
120+
while (size) {
121+
size--;
122+
thunk = queue[ix];
123+
queue[ix] = void 0;
124+
ix = (ix + 1) % limit;
125+
thunk();
126+
}
127+
draining = false;
128+
}
129+
};
130+
}();
131+
103132
// Fiber state machine
104-
var BLOCKED = 0; // No effect is running.
105-
var PENDING = 1; // An async effect is running.
106-
var RETURN = 2; // The current stack has returned.
107-
var CONTINUE = 3; // Run the next effect.
108-
var BINDSTEP = 4; // Apply the next bind.
133+
var SUSPENDED = 0; // Suspended, pending a join.
134+
var CONTINUE = 1; // Interpret the next instruction.
135+
var BINDSTEP = 2; // Apply the next bind.
136+
var PENDING = 3; // An async effect is running.
137+
var RETURN = 4; // The current stack has returned.
109138
var KILLFORKS = 5; // Killing supervised forks.
110139
var COMPLETED = 6; // The entire fiber has completed.
111140

112-
function runFiber(util, suspended, aff, completeCb) {
141+
function runFiber(util, initStatus, aff, completeCb) {
113142
// Monotonically increasing tick, increased on each asynchronous turn.
114143
var runTick = 0;
115144

116145
// The current branch of the state machine.
117-
var status = CONTINUE;
146+
var status = initStatus;
118147

119148
// The current point of interest for the state machine branch.
120149
var step = aff; // Successful step
@@ -146,10 +175,10 @@ var Aff = function () {
146175
// Temporary bindings for the various branches.
147176
var tmp, result, attempt, canceler;
148177

149-
function launchChildFiber(fid, suspended, child) {
178+
function launchChildFiber(fid, childStatus, child) {
150179
forkCount++;
151180
var blocked = true;
152-
var fiber = runFiber(util, suspended, child, function () {
181+
var fiber = runFiber(util, childStatus, child, function () {
153182
forkCount--;
154183
if (blocked) {
155184
blocked = false;
@@ -178,7 +207,7 @@ var Aff = function () {
178207
forks = {};
179208
forkCount = 0;
180209
for (var i = 0, len = killId; i < len; i++) {
181-
kills[i] = runFiber(util, false, kills[i], function () {
210+
kills[i] = runFiber(util, CONTINUE, kills[i], function () {
182211
delete kills[i];
183212
killId--;
184213
if (killId === 0) {
@@ -190,7 +219,7 @@ var Aff = function () {
190219
return new Aff(SYNC, function () {
191220
for (var k in kills) {
192221
if (kills.hasOwnProperty(k)) {
193-
runFiber(util, false, kills[k].kill(error), function () {});
222+
runFiber(util, CONTINUE, kills[k].kill(error), function () {});
194223
}
195224
}
196225
});
@@ -252,7 +281,6 @@ var Aff = function () {
252281
break;
253282

254283
case SYNC:
255-
status = BLOCKED;
256284
result = runSync(util.left, util.right, step._1);
257285
if (util.isLeft(result)) {
258286
status = RETURN;
@@ -267,41 +295,30 @@ var Aff = function () {
267295
break;
268296

269297
case ASYNC:
270-
status = BLOCKED;
271-
canceler = runAsync(util.left, step._1, function (result) {
298+
status = PENDING;
299+
step = runAsync(util.left, step._1, function (result) {
272300
return function () {
273301
if (runTick !== localRunTick) {
274302
return;
275-
}
276-
tmp = status;
277-
if (util.isLeft(result)) {
278-
status = RETURN;
279-
fail = result;
280-
} else if (bhead === null) {
281-
status = RETURN;
282-
step = result;
283303
} else {
284-
status = BINDSTEP;
285-
step = util.fromRight(result);
286-
}
287-
// We only need to invoke `run` if the subsequent block has
288-
// switch the status to PENDING. Otherwise the callback was
289-
// resolved synchronously, and the current loop can continue
290-
// normally.
291-
if (tmp === PENDING) {
292-
run(++runTick);
293-
} else {
294-
localRunTick = ++runTick;
304+
runTick++;
295305
}
306+
schedule(function () {
307+
if (util.isLeft(result)) {
308+
status = RETURN;
309+
fail = result;
310+
} else if (bhead === null) {
311+
status = RETURN;
312+
step = result;
313+
} else {
314+
status = BINDSTEP;
315+
step = util.fromRight(result);
316+
}
317+
run(runTick);
318+
});
296319
};
297320
});
298-
// If the callback was resolved synchronously, the status will have
299-
// switched to CONTINUE, and we should not move on to PENDING.
300-
if (status === BLOCKED) {
301-
status = PENDING;
302-
step = canceler;
303-
}
304-
break;
321+
return;
305322

306323
// Enqueue the current stack of binds and continue
307324
case CATCH:
@@ -375,7 +392,7 @@ var Aff = function () {
375392
}
376393
break;
377394

378-
// If we have a bracket, we should enqueue the finalizer branch,
395+
// If we have a bracket, we should enqueue the handlers,
379396
// and continue with the success branch only if the fiber has
380397
// not been interrupted. If the bracket acquisition failed, we
381398
// should not run either.
@@ -393,6 +410,8 @@ var Aff = function () {
393410
}
394411
break;
395412

413+
// Enqueue the appropriate handler. We increase the bracket count
414+
// because it should be cancelled.
396415
case BRACKETED:
397416
bracket++;
398417
attempts = new Aff(CONS, new Aff(FINALIZED, step), attempts._2);
@@ -447,14 +466,16 @@ var Aff = function () {
447466
if (util.isLeft(step) && !joins) {
448467
setTimeout(function () {
449468
// Guard on joins because a completely synchronous fiber can
450-
// still have an observer.
469+
// still have an observer which was added after-the-fact.
451470
if (!joins) {
452471
throw util.fromLeft(step);
453472
}
454473
}, 0);
455474
}
456475
return;
457-
case BLOCKED: return;
476+
case SUSPENDED:
477+
status = CONTINUE;
478+
break;
458479
case PENDING: return;
459480
}
460481
}
@@ -479,12 +500,11 @@ var Aff = function () {
479500
var killCb = function () {
480501
return cb(util.right(void 0));
481502
};
482-
if (suspended) {
483-
suspended = false;
503+
switch (status) {
504+
case SUSPENDED:
484505
status = COMPLETED;
485506
interrupt = util.left(error);
486-
}
487-
switch (status) {
507+
/* fallthrough */
488508
case COMPLETED:
489509
canceler = nonCanceler;
490510
killCb()();
@@ -524,20 +544,25 @@ var Aff = function () {
524544

525545
var join = new Aff(ASYNC, function (cb) {
526546
return function () {
527-
if (suspended) {
528-
suspended = false;
547+
var canceler;
548+
switch (status) {
549+
case SUSPENDED:
550+
canceler = addJoinCallback(cb);
529551
run(runTick);
530-
}
531-
if (status === COMPLETED) {
532-
joins = true;
552+
break;
553+
case COMPLETED:
554+
canceler = nonCanceler;
555+
joins = true;
533556
cb(step)();
534-
return nonCanceler;
557+
break;
558+
default:
559+
canceler = addJoinCallback(cb);
535560
}
536-
return addJoinCallback(cb);
561+
return canceler;
537562
};
538563
});
539564

540-
if (suspended === false) {
565+
if (status === CONTINUE) {
541566
run(runTick);
542567
}
543568

@@ -592,7 +617,7 @@ var Aff = function () {
592617
// collect all the fibers first.
593618
kills[count++] = function (aff) {
594619
return function () {
595-
return runFiber(util, false, aff, function (result) {
620+
return runFiber(util, CONTINUE, aff, function (result) {
596621
count--;
597622
if (fail === null && util.isLeft(result)) {
598623
fail = result;
@@ -817,7 +842,7 @@ var Aff = function () {
817842
// tree.
818843
fibers[fid] = function (aff, completeCb) {
819844
return new Aff(THUNK, function () {
820-
return runFiber(util, false, aff, completeCb);
845+
return runFiber(util, CONTINUE, aff, completeCb);
821846
});
822847
}(tmp, resolve(step));
823848
}
@@ -869,7 +894,7 @@ var Aff = function () {
869894
// We can drop the fibers here because we are only canceling join
870895
// attempts, which are synchronous anyway.
871896
for (var kid = 0, n = killId; kid < n; kid++) {
872-
runFiber(util, false, kills[kid].kill(error), function () {});
897+
runFiber(util, CONTINUE, kills[kid].kill(error), function () {});
873898
}
874899

875900
var newKills = kill(error, root, cb);
@@ -879,7 +904,7 @@ var Aff = function () {
879904
return function () {
880905
for (var kid in newKills) {
881906
if (newKills.hasOwnProperty(kid)) {
882-
runFiber(util, false, newKills[kid].kill(killError), function () {});
907+
runFiber(util, CONTINUE, newKills[kid].kill(killError), function () {});
883908
}
884909
}
885910
return nonCanceler;
@@ -945,9 +970,9 @@ exports._bind = function (aff) {
945970
};
946971
};
947972

948-
exports._fork = function (suspended) {
973+
exports._fork = function (status) {
949974
return function (aff) {
950-
return Aff.Fork(suspended, aff);
975+
return Aff.Fork(status, aff);
951976
};
952977
};
953978

@@ -1026,9 +1051,9 @@ exports._delay = function () {
10261051
};
10271052
}();
10281053

1029-
exports._launchAff = function (util, suspended, aff) {
1054+
exports._launchAff = function (util, status, aff) {
10301055
return function () {
1031-
return Aff.runFiber(util, suspended, aff, function () {});
1056+
return Aff.runFiber(util, status, aff, function () {});
10321057
};
10331058
};
10341059

src/Control/Monad/Aff.purs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,11 @@ instance monoidCanceler ∷ Monoid (Canceler eff) where
187187

188188
-- | Forks an `Aff` from an `Eff` context, returning the `Fiber`.
189189
launchAff eff a. Aff eff a Eff eff (Fiber eff a)
190-
launchAff aff = Fn.runFn3 _launchAff ffiUtil false aff
190+
launchAff aff = Fn.runFn3 _launchAff ffiUtil 1 aff
191191

192192
-- | Suspends an `Aff` from an `Eff` context, returning the `Fiber`.
193193
launchSuspendedAff eff a. Aff eff a Eff eff (Fiber eff a)
194-
launchSuspendedAff aff = Fn.runFn3 _launchAff ffiUtil true aff
194+
launchSuspendedAff aff = Fn.runFn3 _launchAff ffiUtil 0 aff
195195

196196
-- | Forks an `Aff` from an `Eff` context and also takes a callback to run when
197197
-- | it completes. Returns the pending `Fiber`.
@@ -207,13 +207,13 @@ runAff_ k aff = void $ runAff k aff
207207
-- | `Fiber`. When the parent `Fiber` completes, the child will be killed if it
208208
-- | has not completed.
209209
forkAff eff a. Aff eff a Aff eff (Fiber eff a)
210-
forkAff = _fork false
210+
forkAff = _fork 1
211211

212212
-- | Suspends a supervised `Aff` from within a parent `Aff` context, returning
213213
-- | the `Fiber`. A suspended `Fiber` does not execute until requested, via
214214
-- | `joinFiber`.
215215
suspendAff eff a. Aff eff a Aff eff (Fiber eff a)
216-
suspendAff = _fork true
216+
suspendAff = _fork 0
217217

218218
-- | Forks an unsupervised `Aff`, returning the `Fiber`.
219219
spawnAff eff a. Aff eff a Aff eff (Fiber eff a)
@@ -261,7 +261,7 @@ bracket acquire completed =
261261
foreign import _pure eff a. a Aff eff a
262262
foreign import _throwError eff a. Error Aff eff a
263263
foreign import _catchError eff a. Aff eff a (Error Aff eff a) Aff eff a
264-
foreign import _fork eff a. Boolean Aff eff a Aff eff (Fiber eff a)
264+
foreign import _fork eff a. Int Aff eff a Aff eff (Fiber eff a)
265265
foreign import _map eff a b. (a b) Aff eff a Aff eff b
266266
foreign import _bind eff a b. Aff eff a (a Aff eff b) Aff eff b
267267
foreign import _delay a eff. Fn.Fn2 (Unit Either a Unit) Number (Aff eff Unit)
@@ -295,7 +295,7 @@ foreign import _launchAff
295295
eff a
296296
. Fn.Fn3
297297
FFIUtil
298-
Boolean
298+
Int
299299
(Aff eff a)
300300
(Eff eff (Fiber eff a))
301301

test/Test/Main.purs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,14 @@ test_multi_join = assert "join/multi" do
138138
delay (Milliseconds 20.0)
139139
modifyRef ref (_ + 1)
140140
pure 20
141-
n1 ← sum <$> traverse joinFiber
141+
n1 ← traverse joinFiber
142142
[ f1
143143
, f1
144144
, f1
145145
, f2
146146
]
147147
n2 ← readRef ref
148-
pure (n1 == 50 && n2 == 3)
148+
pure (sum n1 == 50 && n2 == 3)
149149

150150
test_suspend eff. TestAff eff Unit
151151
test_suspend = assert "suspend" do
@@ -264,7 +264,9 @@ test_kill_canceler ∷ ∀ eff. TestAff eff Unit
264264
test_kill_canceler = assert "kill/canceler" do
265265
ref ← newRef ""
266266
fiber ← forkAff do
267-
n ← makeAff \_ → pure (Canceler \_ → liftEff (writeRef ref "cancel"))
267+
n ← makeAff \_ → pure $ Canceler \_ → do
268+
delay (Milliseconds 20.0)
269+
liftEff (writeRef ref "cancel")
268270
writeRef ref "done"
269271
killFiber (error "Nope") fiber
270272
res ← try (joinFiber fiber)

0 commit comments

Comments
 (0)