Skip to content

Commit 646dd95

Browse files
committed
Comments, compat, thread -> fiber, generalBracket
1 parent edce196 commit 646dd95

File tree

6 files changed

+311
-149
lines changed

6 files changed

+311
-149
lines changed

src/Control/Monad/Aff.js

Lines changed: 56 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ data Aff eff a
1616
| Sync (Eff eff a)
1717
| Async ((Either Error a -> Eff eff Unit) -> Eff eff (Canceler eff))
1818
| forall b. Catch (Error -> a) (Aff eff b) ?(b -> a)
19-
| forall b. Bracket (Aff eff b) (b -> Aff eff Unit) (b -> Aff eff a)
19+
| forall b. Bracket (Aff eff b) (BracketConditions eff b) (b -> Aff eff a)
2020
2121
*/
2222
var PURE = "Pure";
@@ -44,10 +44,11 @@ var ALT = "Alt";
4444
var CONS = "Cons"; // Cons-list, for stacks
4545
var RECOVER = "Recover"; // Continue with error handler
4646
var RESUME = "Resume"; // Continue indiscriminately
47+
var BRACKETED = "Bracketed"; // Continue with bracket finalizers
4748
var FINALIZED = "Finalized"; // Marker for finalization
4849

49-
var FORKED = "Forked"; // Reference to a forked thread, with resumption stack
50-
var THREAD = "Thread"; // Actual thread reference
50+
var FORKED = "Forked"; // Reference to a forked fiber, with resumption stack
51+
var FIBER = "Fiber"; // Actual fiber reference
5152
var THUNK = "Thunk"; // Primed effect, ready to invoke
5253

5354
function Aff(tag, _1, _2, _3) {
@@ -119,10 +120,10 @@ exports.makeAff = function (k) {
119120
return new Aff(ASYNC, k);
120121
};
121122

122-
exports.bracket = function (acquire) {
123-
return function (release) {
123+
exports.generalBracket = function (acquire) {
124+
return function (options) {
124125
return function (k) {
125-
return new Aff(BRACKET, acquire, release, k);
126+
return new Aff(BRACKET, acquire, options, k);
126127
};
127128
};
128129
};
@@ -199,13 +200,13 @@ function runAsync(left, eff, k) {
199200
}
200201
}
201202

202-
// Thread state machine
203+
// Fiber state machine
203204
var BLOCKED = 0; // No effect is running.
204205
var PENDING = 1; // An async effect is running.
205206
var RETURN = 2; // The current stack has returned.
206207
var CONTINUE = 3; // Run the next effect.
207208
var BINDSTEP = 4; // Apply the next bind.
208-
var COMPLETED = 5; // The entire thread has completed.
209+
var COMPLETED = 5; // The entire fiber has completed.
209210

210211
exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
211212
return function () {
@@ -220,7 +221,7 @@ exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
220221
var fail = null; // Failure step
221222
var interrupt = null; // Asynchronous interrupt
222223

223-
// Stack of continuations for the current thread.
224+
// Stack of continuations for the current fiber.
224225
var bhead = null;
225226
var btail = null;
226227

@@ -241,11 +242,11 @@ exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
241242
var tmp, result, attempt, canceler;
242243

243244
// Each invocation of `run` requires a tick. When an asynchronous effect is
244-
// resolved, we must check that the local tick coincides with the thread
245+
// resolved, we must check that the local tick coincides with the fiber
245246
// tick before resuming. This prevents multiple async continuations from
246-
// accidentally resuming the same thread. A common example may be invoking
247+
// accidentally resuming the same fiber. A common example may be invoking
247248
// the provided callback in `makeAff` more than once, but it may also be an
248-
// async effect resuming after the thread was already cancelled.
249+
// async effect resuming after the fiber was already cancelled.
249250
function run(localRunTick) {
250251
while (1) {
251252
tmp = null;
@@ -372,7 +373,7 @@ exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
372373

373374
case RETURN:
374375
// If the current stack has returned, and we have no other stacks to
375-
// resume or finalizers to run, the thread has halted and we can
376+
// resume or finalizers to run, the fiber has halted and we can
376377
// invoke all join callbacks. Otherwise we need to resume.
377378
if (attempts === null) {
378379
runTick++; // Increment the counter to prevent reentry after completion.
@@ -412,14 +413,14 @@ exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
412413
break;
413414

414415
// If we have a bracket, we should enqueue the finalizer branch,
415-
// and continue with the success branch only if the thread has
416+
// and continue with the success branch only if the fiber has
416417
// not been interrupted. If the bracket acquisition failed, we
417418
// should not run either.
418419
case BRACKET:
419420
bracket--;
420421
if (fail === null) {
421422
result = fromRight(step);
422-
attempts = new Aff(CONS, attempt._2(result), attempts._2);
423+
attempts = new Aff(CONS, new Aff(BRACKETED, attempt._2, result), attempts._2);
423424
if (interrupt === null || bracket > 0) {
424425
status = CONTINUE;
425426
step = attempt._3(result);
@@ -429,6 +430,19 @@ exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
429430
}
430431
break;
431432

433+
case BRACKETED:
434+
bracket++;
435+
attempts = new Aff(CONS, new Aff(FINALIZED, step), attempts._2);
436+
status = CONTINUE;
437+
if (interrupt !== null) {
438+
step = attempt._1.kill(fromLeft(interrupt))(attempt._2);
439+
} else if (fail !== null) {
440+
step = attempt._1.throw(fromLeft(fail))(attempt._2);
441+
} else {
442+
step = attempt._1.release(attempt._2);
443+
}
444+
break;
445+
432446
case FINALIZED:
433447
bracket--;
434448
attempts = attempts._2;
@@ -455,11 +469,11 @@ exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
455469
}
456470
}
457471
joins = tmp;
458-
// If we have an unhandled exception, and no other thread has joined
472+
// If we have an unhandled exception, and no other fiber has joined
459473
// then we need to throw the exception in a fresh stack.
460474
if (isLeft(step) && !joins) {
461475
setTimeout(function () {
462-
// Guard on joins because a completely synchronous thread can
476+
// Guard on joins because a completely synchronous fiber can
463477
// still have an observer.
464478
if (!joins) {
465479
throw fromLeft(step);
@@ -554,9 +568,9 @@ exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
554568

555569
exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff, par) {
556570
function runParAff(cb) {
557-
// Table of all forked threads.
558-
var threadId = 0;
559-
var threads = {};
571+
// Table of all forked fibers.
572+
var fiberId = 0;
573+
var fibers = {};
560574

561575
// Table of currently running cancelers, as a product of `Alt` behavior.
562576
var killId = 0;
@@ -572,7 +586,7 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
572586
var root = EMPTY;
573587

574588
// Walks a tree, invoking all the cancelers. Returns the table of pending
575-
// cancellation threads.
589+
// cancellation fibers.
576590
function kill(error, par, cb) {
577591
var step = par;
578592
var fail = null;
@@ -587,22 +601,22 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
587601

588602
switch (step.tag) {
589603
case FORKED:
590-
tmp = threads[step._1];
591-
// If we haven't forked the thread yet (such as with a sync Alt),
604+
tmp = fibers[step._1];
605+
// If we haven't forked the fiber yet (such as with a sync Alt),
592606
// then we should just remove it from the queue and continue.
593607
if (tmp.tag === THUNK) {
594-
delete threads[step._1];
608+
delete fibers[step._1];
595609
cb(right(void 0))();
596610
} else {
597611
// Again, we prime the effect but don't run it yet, so that we can
598-
// collect all the threads first.
612+
// collect all the fibers first.
599613
kills[count++] = runAff(function (result) {
600614
return function () {
601615
count--;
602616
if (fail === null && isLeft(result)) {
603617
fail = result;
604618
}
605-
// We can resolve the callback when all threads have died.
619+
// We can resolve the callback when all fibers have died.
606620
if (count === 0) {
607621
cb(fail || right(void 0))();
608622
}
@@ -646,7 +660,7 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
646660
return kills;
647661
}
648662

649-
// When a thread resolves, we need to bubble back up the tree with the
663+
// When a fiber resolves, we need to bubble back up the tree with the
650664
// result, computing the applicative nodes.
651665
function join(result, head, tail) {
652666
var fail, step, lhs, rhs, tmp, kid;
@@ -758,12 +772,12 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
758772
}
759773
}
760774

761-
function resolve(thread) {
775+
function resolve(fiber) {
762776
return function (result) {
763777
return function () {
764-
delete threads[thread._1];
765-
thread._3 = result;
766-
join(result, thread._2._1, thread._2._2);
778+
delete fibers[fiber._1];
779+
fiber._3 = result;
780+
join(result, fiber._2._1, fiber._2._2);
767781
};
768782
};
769783
}
@@ -779,11 +793,11 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
779793
var step = par;
780794
var head = null;
781795
var tail = null;
782-
var tmp, tid;
796+
var tmp, fid;
783797

784798
loop: while (1) {
785799
tmp = null;
786-
tid = null;
800+
fid = null;
787801

788802
switch (status) {
789803
case CONTINUE:
@@ -811,17 +825,17 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
811825
break;
812826
default:
813827
// When we hit a leaf value, we suspend the stack in the `FORKED`.
814-
// When the thread resolves, it can bubble back up the tree.
815-
tid = threadId++;
828+
// When the fiber resolves, it can bubble back up the tree.
829+
fid = fiberId++;
816830
status = RETURN;
817831
tmp = step;
818-
step = new Aff(FORKED, tid, new Aff(CONS, head, tail), EMPTY);
832+
step = new Aff(FORKED, fid, new Aff(CONS, head, tail), EMPTY);
819833
// We prime the effect, but don't immediately run it. We need to
820834
// walk the entire tree first before actually running effects
821835
// because they may all be synchronous and resolve immediately, at
822836
// which point it would attempt to resolve against an incomplete
823837
// tree.
824-
threads[tid] = new Aff(THUNK, runAff(resolve(step))(tmp));
838+
fibers[fid] = new Aff(THUNK, runAff(resolve(step))(tmp));
825839
}
826840
break;
827841
case RETURN:
@@ -852,12 +866,12 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
852866
// Keep a reference to the tree root so it can be cancelled.
853867
root = step;
854868

855-
// Walk the primed threads and fork them. We store the actual `Thread`
869+
// Walk the primed fibers and fork them. We store the actual `Fiber`
856870
// reference so we can cancel them when needed.
857-
for (tid = 0; tid < threadId; tid++) {
858-
tmp = threads[tid];
871+
for (fid = 0; fid < fiberId; fid++) {
872+
tmp = fibers[fid];
859873
if (tmp && tmp.tag === THUNK) {
860-
threads[tid] = new Aff(THREAD, tmp._1());
874+
fibers[fid] = new Aff(FIBER, tmp._1());
861875
}
862876
}
863877
}
@@ -872,7 +886,7 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
872886
function cancel(error, cb) {
873887
interrupt = left(error);
874888

875-
// We can drop the threads here because we are only canceling join
889+
// We can drop the fibers here because we are only canceling join
876890
// attempts, which are synchronous anyway.
877891
for (var kid = 0, n = killId; kid < n; kid++) {
878892
runAff(ignore, kills[kid].kill(error))();

0 commit comments

Comments
 (0)