Skip to content

Commit e0caaa4

Browse files
committed
Synchronous ParAff Alt handling
1 parent 2c6284e commit e0caaa4

File tree

2 files changed

+42
-18
lines changed

2 files changed

+42
-18
lines changed

src/Control/Monad/Aff.js

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,22 @@ data ParAff eff a
3333
= forall b. Map (b -> a) (ParAff eff b)
3434
| forall b. Apply (ParAff eff (b -> a)) (ParAff eff b)
3535
| Alt (ParAff eff a) (ParAff eff a)
36-
| Par (Aff eff a)
36+
| ?Par (Aff eff a)
3737
3838
*/
3939
var MAP = "Map"
4040
var APPLY = "Apply"
4141
var ALT = "Alt"
4242

43-
// These are constructors used to implement the recover stack. We still use the
44-
// Aff constructor so that property offsets can always inline.
45-
var CONS = "Cons"; // Cons-list
46-
var RECOVER = "Recover"; // Continue with `Either Error a` (via attempt)
43+
// Various constructors used in interpretation
44+
var CONS = "Cons"; // Cons-list, for stacks
45+
var RECOVER = "Recover"; // Continue with error handler
4746
var RESUME = "Resume"; // Continue indiscriminately
4847
var FINALIZED = "Finalized"; // Marker for finalization
49-
var THREAD = "Thread";
48+
49+
var FORKED = "Forked"; // Reference to a forked thread, with resumption stack
50+
var THREAD = "Thread"; // Actual thread reference
51+
var THUNK = "Thunk"; // Primed effect, ready to invoke
5052

5153
function Aff(tag, _1, _2, _3) {
5254
this.tag = tag;
@@ -570,7 +572,7 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
570572
var root = EMPTY;
571573

572574
// Walks the applicative tree, substituting non-applicative nodes with
573-
// `THREAD` nodes. In this tree, all applicative nodes use the `_3` slot
575+
// `FORKED` nodes. In this tree, all applicative nodes use the `_3` slot
574576
// as a mutable slot for memoization. In an unresolved state, the `_3`
575577
// slot is `EMPTY`. In the cases of `ALT` and `APPLY`, we always walk
576578
// the left side first, because both operations are left-associative. As
@@ -611,18 +613,18 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
611613
step = step._1;
612614
break;
613615
default:
614-
// When we hit a leaf value, we suspend the stack in the `THREAD`.
616+
// When we hit a leaf value, we suspend the stack in the `FORKED`.
615617
// When the thread resolves, it can bubble back up the tree.
616618
tid = threadId++;
617619
status = RETURN;
618620
tmp = step;
619-
step = new Aff(THREAD, tid, new Aff(CONS, head, tail), EMPTY);
621+
step = new Aff(FORKED, tid, new Aff(CONS, head, tail), EMPTY);
620622
// We prime the effect, but don't immediately run it. We need to
621623
// walk the entire tree first before actually running effects
622624
// because they may all be synchronous and resolve immediately, at
623625
// which point it would attempt to resolve against an incomplete
624626
// tree.
625-
threads[tid] = runAff(resolve(step))(tmp);
627+
threads[tid] = new Aff(THUNK, runAff(resolve(step))(tmp));
626628
}
627629
break;
628630
case RETURN:
@@ -656,7 +658,10 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
656658
// Walk the primed threads and fork them. We store the actual `Thread`
657659
// reference so we can cancel them when needed.
658660
for (tid = 0; tid < threadId; tid++) {
659-
threads[tid] = threads[tid]();
661+
tmp = threads[tid];
662+
if (tmp && tmp.tag === THUNK) {
663+
threads[tid] = new Aff(THREAD, tmp._1());
664+
}
660665
}
661666
}
662667

@@ -747,7 +752,6 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
747752
head._3 = step;
748753
tmp = true;
749754
kid = killId++;
750-
751755
// Once a side has resolved, we need to cancel the side that is still
752756
// pending before we can continue.
753757
kills[kid] = kill(early, step === lhs ? head._2 : head._1, function (killResult) {
@@ -796,16 +800,19 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
796800

797801
loop: while (1) {
798802
tmp = null;
799-
kid = null;
800803

801804
switch (step.tag) {
802-
case THREAD:
805+
case FORKED:
803806
tmp = threads[step._1];
804-
kid = count++;
805-
if (tmp) {
807+
// If we haven't forked the thread yet (such as with a sync Alt),
808+
// then we should just remove it from the queue and continue.
809+
if (tmp.tag === THUNK) {
810+
delete threads[step._1];
811+
cb(right(void 0))();
812+
} else {
806813
// Again, we prime the effect but don't run it yet, so that we can
807814
// collect all the threads first.
808-
kills[kid] = runAff(function (result) {
815+
kills[count++] = runAff(function (result) {
809816
return function () {
810817
count--;
811818
if (fail === null && isLeft(result)) {
@@ -816,7 +823,7 @@ exports._sequential = function (isLeft, fromLeft, fromRight, left, right, runAff
816823
cb(fail || right(void 0))();
817824
}
818825
};
819-
})(tmp.kill(error));
826+
})(tmp._1.kill(error));
820827
}
821828
// Terminal case.
822829
if (head === null) {

test/Test/Main.purs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,22 @@ test_parallel_alt = assert "parallel/alt" do
327327
r2 ← joinThread t1
328328
pure (r1 == "bar" && r2 == "bar")
329329

330+
test_parallel_alt_sync eff. TestAff eff Unit
331+
test_parallel_alt_sync = assert "kill/parallel/alt/sync" do
332+
ref ← newRef ""
333+
let
334+
action s = do
335+
bracket
336+
(pure unit)
337+
(\_ → modifyRef ref (_ <> "killed" <> s))
338+
(\_ → modifyRef ref (_ <> s) $> s)
339+
r1 ← sequential $
340+
parallel (action "foo")
341+
<|> parallel (action "bar")
342+
<|> parallel (action "baz")
343+
r2 ← readRef ref
344+
pure (r1 == "foo" && r2 == "fookilledfoo")
345+
330346
test_kill_parallel_alt eff. TestAff eff Unit
331347
test_kill_parallel_alt = assert "kill/parallel/alt" do
332348
ref ← newRef ""
@@ -416,6 +432,7 @@ main = do
416432
test_parallel
417433
test_kill_parallel
418434
test_parallel_alt
435+
test_parallel_alt_sync
419436
test_kill_parallel_alt
420437
test_thread_map
421438
test_thread_apply

0 commit comments

Comments
 (0)