@@ -3,35 +3,12 @@ defmodule GenStageTest do
3
3
4
4
import ExUnit.CaptureLog
5
5
6
- defmodule EventProducer do
7
- @ moduledoc """
8
- Produce events when receives a cast
9
- """
10
- use GenStage
11
-
12
- def start_link ( init ) do
13
- GenStage . start_link ( __MODULE__ , init )
14
- end
15
-
16
- @ impl GenStage
17
- @ doc false
18
- def init ( init ) , do: init
19
-
20
- @ impl GenStage
21
- @ doc false
22
- def handle_cast ( event , state ) , do: { :noreply , [ event ] , state }
23
-
24
- @ impl GenStage
25
- @ doc false
26
- def handle_demand ( _demand , state ) , do: { :noreply , [ ] , state }
27
- end
28
-
29
6
defmodule Counter do
30
7
@ moduledoc """
31
8
A producer that works as a counter in batches.
32
9
It also supports events to be queued via sync
33
- and async calls. A negative counter disables
34
- the counting behaviour.
10
+ and async calls. A pid disables the counting
11
+ behaviour.
35
12
"""
36
13
37
14
use GenStage
@@ -698,7 +675,7 @@ defmodule GenStageTest do
698
675
699
676
test "can be set to :accumulate via API using broadcast" do
700
677
{ :ok , producer } =
701
- EventProducer . start_link ( { :producer , [ ] , dispatcher: GenStage.BroadcastDispatcher } )
678
+ Counter . start_link ( { :producer , self ( ) , dispatcher: GenStage.BroadcastDispatcher } )
702
679
703
680
assert GenStage . demand ( producer ) == :forward
704
681
{ :ok , consumer1 } = Forwarder . start_link ( { :consumer , self ( ) , subscribe_to: [ producer ] } )
@@ -709,10 +686,10 @@ defmodule GenStageTest do
709
686
GenStage . stop ( consumer1 )
710
687
GenStage . stop ( consumer2 )
711
688
712
- assert :ok = GenStage . cast ( producer , 1 )
713
- assert :ok = GenStage . cast ( producer , 2 )
714
- assert :ok = GenStage . cast ( producer , 3 )
715
- assert :ok = GenStage . cast ( producer , 4 )
689
+ assert :ok = Counter . async_queue ( producer , [ 1 ] )
690
+ assert :ok = Counter . async_queue ( producer , [ 2 ] )
691
+ assert :ok = Counter . async_queue ( producer , [ 3 ] )
692
+ assert :ok = Counter . async_queue ( producer , [ 4 ] )
716
693
717
694
Process . sleep ( 200 )
718
695
{ :ok , _consumer1 } = Forwarder . start_link ( { :consumer , self ( ) , subscribe_to: [ producer ] } )
@@ -1384,7 +1361,7 @@ defmodule GenStageTest do
1384
1361
1385
1362
test "handle_call/3 may shut stage down" do
1386
1363
Process . flag ( :trap_exit , true )
1387
- { :ok , producer } = Counter . start_link ( { :producer , - 1 } )
1364
+ { :ok , producer } = Counter . start_link ( { :producer , self ( ) } )
1388
1365
assert Counter . stop ( producer ) == :ok
1389
1366
assert_receive { :EXIT , ^ producer , :shutdown }
1390
1367
end
0 commit comments