Skip to content

Commit 60fc908

Browse files
committed
Rework ParAff
1 parent 6e23d0a commit 60fc908

File tree

4 files changed

+147
-51
lines changed

4 files changed

+147
-51
lines changed

src/Control/Monad/Aff.purs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,11 @@ module Control.Monad.Aff
44
, liftEff'
55
, forkAff
66
, runAff
7-
, killThread
8-
, joinThread
97
) where
108

119
import Prelude
12-
import Control.Monad.Aff.Internal (ASYNC, Aff, Thread(..), launchAff, unsafeLaunchAff)
13-
import Control.Monad.Aff.Internal (Aff, ParAff, Thread, Canceler(..), ASYNC, bracket, delay, launchAff, makeAff, nonCanceler) as Internal
10+
import Control.Monad.Aff.Internal (ASYNC, Aff, Thread, launchAff, unsafeLaunchAff)
11+
import Control.Monad.Aff.Internal (Aff, ParAff, Thread, Canceler(..), ASYNC, bracket, delay, launchAff, makeAff, nonCanceler, joinThread, killThread) as Internal
1412
import Control.Monad.Eff (Eff)
1513
import Control.Monad.Eff.Class (liftEff)
1614
import Control.Monad.Eff.Exception (Error, EXCEPTION)
@@ -26,9 +24,3 @@ runAff k aff = void $ launchAff $ liftEff <<< k =<< try aff
2624

2725
forkAff eff a. Aff eff a Aff eff (Thread eff a)
2826
forkAff = liftEff <<< unsafeLaunchAff
29-
30-
killThread eff a. Error Thread eff a Aff eff Unit
31-
killThread e (Thread t) = t.kill e
32-
33-
joinThread eff a. Thread eff a Aff eff a
34-
joinThread (Thread t) = t.join

src/Control/Monad/Aff/Internal.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,6 @@ exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
396396
return function (error) {
397397
return new Aff(SYNC, function () {
398398
delete joins[jid];
399-
return right();
400399
});
401400
};
402401
}
@@ -429,7 +428,7 @@ exports._launchAff = function (isLeft, fromLeft, fromRight, left, right, aff) {
429428
status = RETURN;
430429
step = null;
431430
fail = null;
432-
run(runTick++);
431+
run(++runTick);
433432
}
434433
break;
435434
default:

src/Control/Monad/Aff/Internal.purs

Lines changed: 63 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ module Control.Monad.Aff.Internal
1010
, bracket
1111
, delay
1212
, unsafeLaunchAff
13+
, killThread
14+
, joinThread
1315
) where
1416

1517
import Prelude
@@ -89,21 +91,31 @@ derive instance newtypeParAff ∷ Newtype (ParAff eff a) _
8991
derive newtype instance functorParAffFunctor (ParAff eff)
9092

9193
instance applyParAffApply (ParAff eff) where
92-
apply (ParAff ff) (ParAff fa) = ParAff (makeAff go)
93-
where
94-
go k = do
95-
Thread t1 ← unsafeLaunchAff ff
96-
Thread t2 ← unsafeLaunchAff fa
97-
Thread t3 ← unsafeLaunchAff do
98-
f ← try t1.join
99-
a ← try t2.join
100-
liftEff (k (f <*> a))
101-
pure $ Canceler \err →
102-
parSequence_
103-
[ t3.kill err
104-
, t1.kill err
105-
, t2.kill err
106-
]
94+
apply (ParAff ff) (ParAff fa) = ParAff $ makeAff \k → do
95+
ref1 ← unsafeRunRef $ newRef Nothing
96+
ref2 ← unsafeRunRef $ newRef Nothing
97+
98+
t1 ← unsafeLaunchAff do
99+
f ← try ff
100+
liftEff do
101+
ma ← unsafeRunRef $ readRef ref2
102+
case ma of
103+
Nothing → unsafeRunRef $ writeRef ref1 (Just f)
104+
Just a → k (f <*> a)
105+
106+
t2 ← unsafeLaunchAff do
107+
a ← try fa
108+
liftEff do
109+
mf ← unsafeRunRef $ readRef ref1
110+
case mf of
111+
Nothing → unsafeRunRef $ writeRef ref2 (Just a)
112+
Just f → k (f <*> a)
113+
114+
pure $ Canceler \err →
115+
parSequence_
116+
[ killThread err t1
117+
, killThread err t2
118+
]
107119

108120
instance applicativeParAffApplicative (ParAff eff) where
109121
pure = ParAff <<< pure
@@ -114,34 +126,41 @@ instance semigroupParAff ∷ Semigroup a ⇒ Semigroup (ParAff eff a) where
114126
instance monoidParAffMonoid a Monoid (ParAff eff a) where
115127
mempty = pure mempty
116128

129+
data AltStatus a
130+
= Pending
131+
| Completed a
132+
117133
instance altParAffAlt (ParAff eff) where
118-
alt (ParAff a1) (ParAff a2) = ParAff (makeAff go)
134+
alt = runAlt
119135
where
120-
go k = do
136+
runAlt a. ParAff eff a ParAff eff a ParAff eff a
137+
runAlt (ParAff a1) (ParAff a2) = ParAff $ makeAff \k → do
121138
ref ← unsafeRunRef $ newRef Nothing
122-
Thread t1 ← unsafeLaunchAff a1
123-
Thread t2 ← unsafeLaunchAff a2
139+
t1 ← unsafeLaunchAff a1
140+
t2 ← unsafeLaunchAff a2
124141

125142
let
126-
earlyError =
127-
error "Alt ParAff: early exit"
128-
129-
runK t r = do
130-
res ← liftEff $ unsafeRunRef $ readRef ref
131-
case res, r of
132-
Nothing, Left _ → liftEff $ unsafeRunRef $ writeRef ref (Just r)
133-
Nothing, Right _ → t.kill earlyError *> liftEff (k r)
134-
Just r', _ → t.kill earlyError *> liftEff (k r')
135-
136-
Thread t3 ← unsafeLaunchAff $ runK t2 =<< try t1.join
137-
Thread t4 ← unsafeLaunchAff $ runK t1 =<< try t2.join
143+
completed Thread eff a Either Error a Aff eff Unit
144+
completed t res = do
145+
val ← liftEff $ unsafeRunRef $ readRef ref
146+
case val, res of
147+
_, Right _ → do
148+
killThread (error "Alt ParAff: early exit") t
149+
liftEff (k res)
150+
Nothing, _ →
151+
liftEff $ unsafeRunRef $ writeRef ref (Just res)
152+
Just res', _ →
153+
liftEff (k res')
154+
155+
t3 ← unsafeLaunchAff $ completed t2 =<< try (joinThread t1)
156+
t4 ← unsafeLaunchAff $ completed t1 =<< try (joinThread t2)
138157

139158
pure $ Canceler \err →
140159
parSequence_
141-
[ t3.kill earlyError
142-
, t4.kill earlyError
143-
, t1.kill earlyError
144-
, t2.kill earlyError
160+
[ killThread err t3
161+
, killThread err t4
162+
, killThread err t1
163+
, killThread err t2
145164
]
146165

147166
instance plusParAffPlus (ParAff e) where
@@ -161,6 +180,12 @@ newtype Thread eff a = Thread
161180
instance functorThreadFunctor (Thread eff) where
162181
map f (Thread { kill, join }) = Thread { kill, join: f <$> join }
163182

183+
killThread eff a. Error Thread eff a Aff eff Unit
184+
killThread e (Thread t) = t.kill e
185+
186+
joinThread eff a. Thread eff a Aff eff a
187+
joinThread (Thread t) = t.join
188+
164189
newtype Canceler eff = Canceler (Error Aff eff Unit)
165190

166191
derive instance newtypeCancelerNewtype (Canceler eff) _
@@ -173,7 +198,7 @@ instance monoidCanceler ∷ Monoid (Canceler eff) where
173198
mempty = nonCanceler
174199

175200
nonCanceler eff. Canceler eff
176-
nonCanceler = Canceler k where k _ = pure unit
201+
nonCanceler = Canceler (const (pure unit))
177202

178203
launchAff eff a. Aff eff a Eff (async ASYNC | eff) (Thread eff a)
179204
launchAff aff = Fn.runFn6 _launchAff isLeft unsafeFromLeft unsafeFromRight Left Right aff
@@ -207,8 +232,8 @@ foreign import _launchAff
207232

208233
unsafeFromLeft x y. Either x y x
209234
unsafeFromLeft = case _ of
210-
Left a → a
211-
Right _ → unsafeCrashWith "unsafeFromLeft: Right"
235+
Left a → a
236+
Right _ → unsafeCrashWith "unsafeFromLeft: Right"
212237

213238
unsafeFromRight x y. Either x y y
214239
unsafeFromRight = case _ of

test/Test/Main.purs

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
module Test.Main where
22

33
import Prelude
4+
import Control.Alt ((<|>))
45
import Control.Monad.Aff (Aff, Canceler(..), ASYNC, nonCanceler, runAff, launchAff, makeAff, try, bracket, delay, forkAff, joinThread, killThread)
56
import Control.Monad.Eff (Eff)
67
import Control.Monad.Eff.Class (class MonadEff, liftEff)
@@ -10,6 +11,7 @@ import Control.Monad.Eff.Exception (Error, EXCEPTION, throwException, error, mes
1011
import Control.Monad.Eff.Ref (REF, Ref)
1112
import Control.Monad.Eff.Ref as Ref
1213
import Control.Monad.Error.Class (throwError)
14+
import Control.Parallel (parallel, sequential)
1315
import Data.Bifunctor (lmap)
1416
import Data.Either (Either(..), isLeft)
1517
import Data.Foldable (sum)
@@ -40,7 +42,7 @@ assertEff s = case _ of
4042
Console.log ("[Error] " <> s)
4143
throwException err
4244
Right r → do
43-
assert' s r
45+
assert' ("Assertion failure " <> s) r
4446
Console.log ("[OK] " <> s)
4547

4648
runAssert eff. String TestAff eff Boolean TestEff eff Unit
@@ -270,6 +272,80 @@ test_kill_bracket_nested = assert "kill/bracket/nested" do
270272
, "foo/bar/run/release/bar/release"
271273
]
272274

275+
test_parallel eff. TestAff eff Unit
276+
test_parallel = assert "parallel" do
277+
ref ← newRef ""
278+
let
279+
action s = do
280+
delay (Milliseconds 10.0)
281+
modifyRef ref (_ <> s)
282+
pure s
283+
t1 ← forkAff $ sequential $
284+
{ a: _, b: _ }
285+
<$> parallel (action "foo")
286+
<*> parallel (action "bar")
287+
delay (Milliseconds 10.0)
288+
r1 ← readRef ref
289+
r2 ← joinThread t1
290+
pure (r1 == "foobar" && r2.a == "foo" && r2.b == "bar")
291+
292+
test_kill_parallel eff. TestAff eff Unit
293+
test_kill_parallel = assert "kill/parallel" do
294+
ref ← newRef ""
295+
let
296+
action s = do
297+
bracket
298+
(pure unit)
299+
(\_ → modifyRef ref (_ <> "killed" <> s))
300+
(\_ → do
301+
delay (Milliseconds 10.0)
302+
modifyRef ref (_ <> s))
303+
t1 ← forkAff $ sequential $
304+
parallel (action "foo") *> parallel (action "bar")
305+
t2 ← forkAff do
306+
delay (Milliseconds 5.0)
307+
killThread (error "Nope") t1
308+
modifyRef ref (_ <> "done")
309+
_ ← try $ joinThread t1
310+
_ ← try $ joinThread t2
311+
eq "killedfookilledbardone" <$> readRef ref
312+
313+
test_parallel_alt eff. TestAff eff Unit
314+
test_parallel_alt = assert "parallel/alt" do
315+
ref ← newRef ""
316+
let
317+
action n s = do
318+
delay (Milliseconds n)
319+
modifyRef ref (_ <> s)
320+
pure s
321+
t1 ← forkAff $ sequential $
322+
parallel (action 10.0 "foo") <|> parallel (action 5.0 "bar")
323+
delay (Milliseconds 10.0)
324+
r1 ← readRef ref
325+
r2 ← joinThread t1
326+
pure (r1 == "bar" && r2 == "bar")
327+
328+
test_kill_parallel_alt eff. TestAff eff Unit
329+
test_kill_parallel_alt = assert "kill/parallel/alt" do
330+
ref ← newRef ""
331+
let
332+
action n s = do
333+
bracket
334+
(pure unit)
335+
(\_ → modifyRef ref (_ <> "killed" <> s))
336+
(\_ → do
337+
delay (Milliseconds n)
338+
modifyRef ref (_ <> s))
339+
t1 ← forkAff $ sequential $
340+
parallel (action 10.0 "foo") <|> parallel (action 20.0 "bar")
341+
t2 ← forkAff do
342+
delay (Milliseconds 5.0)
343+
killThread (error "Nope") t1
344+
modifyRef ref (_ <> "done")
345+
_ ← try $ joinThread t1
346+
_ ← try $ joinThread t2
347+
eq "killedfookilledbardone" <$> readRef ref
348+
273349
main TestEff () Unit
274350
main = do
275351
test_pure
@@ -292,3 +368,7 @@ main = do
292368
test_kill_canceler
293369
test_kill_bracket
294370
test_kill_bracket_nested
371+
test_parallel
372+
test_kill_parallel
373+
test_parallel_alt
374+
test_kill_parallel_alt

0 commit comments

Comments
 (0)