Skip to content

Commit 70b863e

Browse files
authored
Rewrite to use async queue consumers (#2)
* Moved from Effect to an internal Aff queue
1 parent 38665f3 commit 70b863e

File tree

7 files changed

+181
-67
lines changed

7 files changed

+181
-67
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
"description": "",
55
"main": "index.js",
66
"scripts": {
7-
"build": "spago build"
7+
"build": "spago build",
8+
"test": "spago test"
89
},
910
"author": "",
1011
"license": "ISC",

spago.dhall

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ You can edit this file as you like.
55
{ name = "wire"
66
, dependencies =
77
[ "arrays"
8-
, "effect"
8+
, "avar"
9+
, "console"
910
, "filterable"
10-
, "js-timers"
1111
, "profunctor"
1212
, "refs"
13+
, "strings"
1314
, "unsafe-reference"
1415
]
1516
, packages = ./packages.dhall

src/Wire/Event.purs

Lines changed: 89 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,37 @@
11
module Wire.Event where
22

33
import Prelude
4-
import Control.Alt (class Alt)
5-
import Control.Alternative (class Alternative, class Plus)
4+
import Control.Alt (class Alt, alt)
5+
import Control.Alternative (class Alternative, class Plus, empty)
66
import Control.Apply (lift2)
7+
import Control.Monad.Rec.Class (Step(..), tailRecM)
78
import Data.Array as Array
8-
import Data.Either (either, hush)
9+
import Data.Either (Either(..), either, hush)
910
import Data.Filterable (class Compactable, class Filterable, filterMap, partitionMap)
10-
import Data.Foldable (class Foldable, sequence_, traverse_)
11+
import Data.Foldable (class Foldable, for_, sequence_, traverse_)
1112
import Data.Maybe (Maybe(..), fromJust, isJust)
1213
import Effect (Effect)
14+
import Effect.Aff (Milliseconds(..))
15+
import Effect.Aff as Aff
16+
import Effect.Class (liftEffect)
1317
import Effect.Ref as Ref
1418
import Partial.Unsafe (unsafePartial)
1519
import Unsafe.Reference (unsafeRefEq)
20+
import Wire.Event.Queue as Queue
1621

1722
newtype Event a
18-
= Event (Subscribe a)
23+
= Event (Subscriber a -> Effect Canceller)
1924

20-
type Subscribe a
21-
= (a -> Effect Unit) -> Effect Canceler
25+
type Subscriber a
26+
= a -> Effect Unit
2227

23-
type Canceler
28+
type Canceller
2429
= Effect Unit
2530

26-
create :: forall a. Effect { event :: Event a, push :: a -> Effect Unit }
31+
create :: forall a. Effect { event :: Event a, push :: a -> Effect Unit, cancel :: Effect Unit }
2732
create = do
2833
subscribers <- Ref.new []
34+
queue <- Queue.create \a -> Ref.read subscribers >>= traverse_ \k -> k a
2935
let
3036
event =
3137
Event \emit -> do
@@ -36,14 +42,12 @@ create = do
3642
pure do
3743
Ref.write true unsubscribing
3844
Ref.modify_ (Array.deleteBy unsafeRefEq subscriber) subscribers
45+
pure { event, push: (queue.push <<< pure), cancel: queue.kill }
3946

40-
push a = Ref.read subscribers >>= traverse_ \emit -> emit a
41-
pure { event, push }
42-
43-
makeEvent :: forall a. Subscribe a -> Event a
47+
makeEvent :: forall a. (Subscriber a -> Effect Canceller) -> Event a
4448
makeEvent = Event
4549

46-
subscribe :: forall a. Event a -> Subscribe a
50+
subscribe :: forall a. Event a -> Subscriber a -> Effect Canceller
4751
subscribe (Event event) = event
4852

4953
filter :: forall a. (a -> Boolean) -> Event a -> Event a
@@ -62,16 +66,16 @@ share source = do
6266
shared <- create
6367
let
6468
incrementCount = do
65-
count <- Ref.modify (_ + 1) subscriberCount
69+
count <- liftEffect do Ref.modify (_ + 1) subscriberCount
6670
when (count == 1) do
67-
cancel <- subscribe source shared.push
68-
Ref.write (Just cancel) cancelSource
71+
cancel <- subscribe source do liftEffect <<< shared.push
72+
liftEffect do Ref.write (Just cancel) cancelSource
6973

7074
decrementCount = do
71-
count <- Ref.modify (_ - 1) subscriberCount
75+
count <- liftEffect do Ref.modify (_ - 1) subscriberCount
7276
when (count == 0) do
73-
Ref.read cancelSource >>= sequence_
74-
Ref.write Nothing cancelSource
77+
liftEffect (Ref.read cancelSource) >>= sequence_
78+
liftEffect do Ref.write Nothing cancelSource
7579

7680
event =
7781
Event \emit -> do
@@ -90,34 +94,72 @@ distinct (Event event) =
9094
Ref.write (pure a) latest
9195
emit a
9296

93-
bufferUntil :: forall a b. Event b -> Event a -> Event (Array a)
94-
bufferUntil (Event flush) (Event event) =
95-
Event \emit -> do
96-
buffer <- Ref.new []
97-
cancelEvent <- event \a -> Ref.modify_ (flip Array.snoc a) buffer
98-
cancelFlush <- flush \_ -> Ref.modify' { state: [], value: _ } buffer >>= emit
99-
pure do cancelEvent *> cancelFlush
97+
bufferUntil :: forall b a. Event b -> Event a -> Event (Array a)
98+
bufferUntil flush source =
99+
alt (Nothing <$ flush) (Just <$> source)
100+
# fold
101+
( \{ buffer } -> case _ of
102+
Nothing -> { buffer: [], output: Just buffer }
103+
Just a -> { buffer: Array.snoc buffer a, output: Nothing }
104+
)
105+
{ buffer: [], output: Nothing }
106+
# filterMap _.output
100107

101108
fromFoldable :: forall a f. Foldable f => f a -> Event a
102-
fromFoldable xs = Event \emit -> traverse_ emit xs *> mempty
109+
fromFoldable xs =
110+
Event \emit -> do
111+
fiber <-
112+
Aff.launchAff do
113+
for_ xs \x -> do
114+
liftEffect do emit x
115+
Aff.delay (Milliseconds 0.0)
116+
pure do
117+
Aff.launchAff_ do Aff.killFiber (Aff.error "cancelled") fiber
118+
119+
range :: Int -> Int -> Event Int
120+
range start end =
121+
Event \emit -> do
122+
let
123+
go pos
124+
| pos /= end = do
125+
liftEffect do emit pos
126+
Aff.delay (Milliseconds 0.0)
127+
pure (Loop (pos + step))
128+
129+
go _ = do
130+
liftEffect do emit end
131+
pure (Done unit)
132+
fiber <- Aff.launchAff do tailRecM go start
133+
pure do
134+
Aff.launchAff_ do Aff.killFiber (Aff.error "cancelled") fiber
135+
where
136+
step = if start < end then 1 else -1
137+
138+
times :: Int -> Event Int
139+
times n
140+
| n > 0 = range 1 n
141+
142+
times _ = empty
103143

104144
instance functorEvent :: Functor Event where
105-
map f (Event event) = Event \emit -> event \a -> emit (f a)
145+
map f (Event event) =
146+
Event \emit -> do
147+
queue <- Queue.create (emit <<< f)
148+
cancel <- event (queue.push <<< pure)
149+
pure do
150+
cancel
151+
queue.kill
106152

107153
instance applyEvent :: Apply Event where
108-
apply (Event eventF) (Event eventA) =
109-
Event \emitB -> do
110-
latestF <- Ref.new Nothing
111-
latestA <- Ref.new Nothing
112-
cancelF <-
113-
eventF \f -> do
114-
Ref.write (Just f) latestF
115-
Ref.read latestA >>= traverse_ \a -> emitB (f a)
116-
cancelA <-
117-
eventA \a -> do
118-
Ref.write (Just a) latestA
119-
Ref.read latestF >>= traverse_ \f -> emitB (f a)
120-
pure do cancelF *> cancelA
154+
apply eventF eventA =
155+
alt (Left <$> eventF) (Right <$> eventA)
156+
# fold
157+
( \{ left, right } -> case _ of
158+
Left l -> { left: Just l, right }
159+
Right r -> { left, right: Just r }
160+
)
161+
{ left: Nothing, right: Nothing }
162+
# filterMap (\{ left, right } -> apply left right)
121163

122164
instance applicativeEvent :: Applicative Event where
123165
pure a = Event \emit -> emit a *> mempty
@@ -127,10 +169,11 @@ instance bindEvent :: Bind Event where
127169
Event \emit -> do
128170
cancelInner <- Ref.new Nothing
129171
cancelOuter <-
130-
outer \a -> do
131-
Ref.read cancelInner >>= sequence_
132-
cancel <- subscribe (f a) emit
133-
Ref.write (Just cancel) cancelInner
172+
outer \a ->
173+
liftEffect do
174+
Ref.read cancelInner >>= sequence_
175+
cancel <- subscribe (f a) emit
176+
Ref.write (Just cancel) cancelInner
134177
pure do
135178
Ref.read cancelInner >>= sequence_
136179
cancelOuter

src/Wire/Event/Queue.purs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
module Wire.Event.Queue where
2+
3+
import Prelude
4+
import Control.Monad.Rec.Class (forever)
5+
import Effect (Effect)
6+
import Effect.AVar as AVar
7+
import Effect.Aff (Aff)
8+
import Effect.Aff as Aff
9+
import Effect.Aff.AVar as AffVar
10+
import Effect.Class (liftEffect)
11+
12+
create :: forall a b. (a -> Effect b) -> Effect { push :: Aff a -> Effect Unit, kill :: Effect Unit }
13+
create consumer = do
14+
queue <- AVar.empty
15+
fiber <-
16+
(Aff.launchAff <<< forever) do
17+
aff <- AffVar.take queue
18+
a <- aff
19+
liftEffect do consumer a
20+
let
21+
killFiber = Aff.launchAff_ do Aff.killFiber (Aff.error "killing queue consumer") fiber
22+
23+
killQueue = AVar.kill (Aff.error "killing queue") queue
24+
pure
25+
{ push: \a -> Aff.launchAff_ do AffVar.put a queue
26+
, kill: killFiber *> killQueue
27+
}

src/Wire/Event/Time.purs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,36 @@ module Wire.Event.Time where
22

33
import Prelude
44
import Control.Alt ((<|>))
5-
import Effect.Ref as Ref
6-
import Effect.Timer as Timer
5+
import Control.Monad.Rec.Class (forever)
6+
import Data.Time.Duration (class Duration, fromDuration)
7+
import Effect.Aff as Aff
8+
import Effect.Class (liftEffect)
79
import Wire.Event (Event)
810
import Wire.Event as Event
11+
import Wire.Event.Queue as Queue
912

10-
delay :: forall a. Int -> Event a -> Event a
11-
delay ms event =
13+
delay :: forall offset a. Duration offset => offset -> Event a -> Event a
14+
delay offset event = do
15+
let
16+
ms = fromDuration offset
1217
Event.makeEvent \emit -> do
13-
canceled <- Ref.new false
14-
cancel <-
15-
Event.subscribe event \a -> do
16-
_ <- Timer.setTimeout ms do unlessM (Ref.read canceled) do emit a
17-
pure unit
18+
queue <- Queue.create emit
19+
cancel <- Event.subscribe event \a -> queue.push do Aff.delay ms *> pure a
1820
pure do
19-
Ref.write true canceled
2021
cancel
22+
queue.kill
2123

22-
interval :: Int -> Event Unit
23-
interval ms =
24+
interval :: forall spacing. Duration spacing => spacing -> Event Unit
25+
interval spacing = do
26+
let
27+
ms = fromDuration spacing
2428
Event.makeEvent \emit -> do
25-
intervalId <- Timer.setInterval ms do emit unit
26-
pure do Timer.clearInterval intervalId
29+
fiber <-
30+
(Aff.launchAff <<< forever) do
31+
Aff.delay ms
32+
liftEffect do emit unit
33+
pure do
34+
Aff.launchAff_ do Aff.killFiber (Aff.error "cancelling") fiber
2735

28-
timer :: Int -> Int -> Event Unit
29-
timer after ms = delay after do pure unit <|> interval ms
36+
timer :: forall offset spacing. Duration offset => Duration spacing => offset -> spacing -> Event Unit
37+
timer offset spacing = delay offset do pure unit <|> interval spacing

src/Wire/Signal.purs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import Wire.Event as Event
88

99
type Signal a
1010
= { event :: Event a
11+
, modify :: (a -> a) -> Effect Unit
1112
, read :: Effect a
1213
, write :: a -> Effect Unit
13-
, modify :: (a -> a) -> Effect Unit
14+
, cancel :: Effect Unit
1415
}
1516

1617
create :: forall a. a -> Effect (Signal a)
@@ -28,4 +29,4 @@ create init = do
2829
Event.makeEvent \emit -> do
2930
Ref.read value >>= emit
3031
Event.subscribe inner.event emit
31-
pure { event, read, write, modify }
32+
pure { event, read, write, modify, cancel: inner.cancel }

test/Main.purs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
module Test.Main where
2+
3+
import Prelude
4+
import Control.Alt ((<|>))
5+
import Data.Array as Array
6+
import Data.FoldableWithIndex (foldlWithIndex)
7+
import Data.Int as Int
8+
import Data.List.Lazy (range)
9+
import Data.String.CodeUnits as CodeUnits
10+
import Effect (Effect)
11+
import Effect.Class.Console as Console
12+
import Wire.Event (Event)
13+
import Wire.Event as Event
14+
15+
main :: Effect Unit
16+
main = do
17+
void $ Event.subscribe (Event.distinct (sumFromOneToOneHundred <|> sumFromOneToOneHundred) >>= pure <<< formatNumber <<< show) do Console.log
18+
19+
sumFromOneToOneHundred :: Event Number
20+
sumFromOneToOneHundred =
21+
range 1 100
22+
# Event.fromFoldable
23+
# map Int.toNumber
24+
# Event.fold (+) 0.0
25+
26+
formatNumber :: String -> String
27+
formatNumber =
28+
CodeUnits.dropRight 2
29+
>>> CodeUnits.toCharArray
30+
>>> Array.reverse
31+
>>> foldlWithIndex (\i o c -> if i /= 0 && i `mod` 3 == 0 then o <> [ ',', c ] else o <> [ c ]) []
32+
>>> Array.reverse
33+
>>> CodeUnits.fromCharArray

0 commit comments

Comments
 (0)