@@ -1637,7 +1637,7 @@ defmodule GenStage do
1637
1637
1638
1638
## Callbacks
1639
1639
1640
- import GenStage.Utils
1640
+ require GenStage.Utils , as: Utils
1641
1641
@ compile :inline_list_funcs
1642
1642
1643
1643
@ doc false
@@ -1673,12 +1673,14 @@ defmodule GenStage do
1673
1673
end
1674
1674
1675
1675
defp init_producer ( mod , opts , state ) do
1676
- with { :ok , dispatcher_mod , dispatcher_state , opts } <- validate_dispatcher ( opts ) ,
1676
+ with { :ok , dispatcher_mod , dispatcher_state , opts } <- init_dispatcher ( opts ) ,
1677
1677
{ :ok , buffer_size , opts } <-
1678
- validate_integer ( opts , :buffer_size , 10000 , 0 , :infinity , true ) ,
1679
- { :ok , buffer_keep , opts } <- validate_in ( opts , :buffer_keep , :last , [ :first , :last ] ) ,
1680
- { :ok , demand , opts } <- validate_in ( opts , :demand , :forward , [ :accumulate , :forward ] ) ,
1681
- :ok <- validate_no_opts ( opts ) do
1678
+ Utils . validate_integer ( opts , :buffer_size , 10000 , 0 , :infinity , true ) ,
1679
+ { :ok , buffer_keep , opts } <-
1680
+ Utils . validate_in ( opts , :buffer_keep , :last , [ :first , :last ] ) ,
1681
+ { :ok , demand , opts } <-
1682
+ Utils . validate_in ( opts , :demand , :forward , [ :accumulate , :forward ] ) ,
1683
+ :ok <- Utils . validate_no_opts ( opts ) do
1682
1684
stage = % GenStage {
1683
1685
mod: mod ,
1684
1686
state: state ,
@@ -1696,7 +1698,7 @@ defmodule GenStage do
1696
1698
end
1697
1699
end
1698
1700
1699
- defp validate_dispatcher ( opts ) do
1701
+ defp init_dispatcher ( opts ) do
1700
1702
case Keyword . pop ( opts , :dispatcher , GenStage.DemandDispatcher ) do
1701
1703
{ dispatcher , opts } when is_atom ( dispatcher ) ->
1702
1704
{ :ok , dispatcher_state } = dispatcher . init ( [ ] )
@@ -1713,12 +1715,13 @@ defmodule GenStage do
1713
1715
end
1714
1716
1715
1717
defp init_producer_consumer ( mod , opts , state ) do
1716
- with { :ok , dispatcher_mod , dispatcher_state , opts } <- validate_dispatcher ( opts ) ,
1717
- { :ok , subscribe_to , opts } <- validate_list ( opts , :subscribe_to , [ ] ) ,
1718
+ with { :ok , dispatcher_mod , dispatcher_state , opts } <- init_dispatcher ( opts ) ,
1719
+ { :ok , subscribe_to , opts } <- Utils . validate_list ( opts , :subscribe_to , [ ] ) ,
1718
1720
{ :ok , buffer_size , opts } <-
1719
- validate_integer ( opts , :buffer_size , :infinity , 0 , :infinity , true ) ,
1720
- { :ok , buffer_keep , opts } <- validate_in ( opts , :buffer_keep , :last , [ :first , :last ] ) ,
1721
- :ok <- validate_no_opts ( opts ) do
1721
+ Utils . validate_integer ( opts , :buffer_size , :infinity , 0 , :infinity , true ) ,
1722
+ { :ok , buffer_keep , opts } <-
1723
+ Utils . validate_in ( opts , :buffer_keep , :last , [ :first , :last ] ) ,
1724
+ :ok <- Utils . validate_no_opts ( opts ) do
1722
1725
stage = % GenStage {
1723
1726
mod: mod ,
1724
1727
state: state ,
@@ -1737,8 +1740,8 @@ defmodule GenStage do
1737
1740
end
1738
1741
1739
1742
defp init_consumer ( mod , opts , state ) do
1740
- with { :ok , subscribe_to , opts } <- validate_list ( opts , :subscribe_to , [ ] ) ,
1741
- :ok <- validate_no_opts ( opts ) do
1743
+ with { :ok , subscribe_to , opts } <- Utils . validate_list ( opts , :subscribe_to , [ ] ) ,
1744
+ :ok <- Utils . validate_no_opts ( opts ) do
1742
1745
stage = % GenStage { mod: mod , state: state , type: :consumer }
1743
1746
consumer_init_subscribe ( subscribe_to , stage )
1744
1747
else
@@ -1819,7 +1822,7 @@ defmodule GenStage do
1819
1822
1820
1823
def handle_info ( { :"$gen_producer" , _ , _ } = msg , % { type: :consumer } = stage ) do
1821
1824
error_msg = 'GenStage consumer ~tp received $gen_producer message: ~tp~n'
1822
- :error_logger . error_msg ( error_msg , [ self_name ( ) , msg ] )
1825
+ :error_logger . error_msg ( error_msg , [ Utils . self_name ( ) , msg ] )
1823
1826
{ :noreply , stage }
1824
1827
end
1825
1828
@@ -1830,7 +1833,7 @@ defmodule GenStage do
1830
1833
case consumers do
1831
1834
% { ^ ref => _ } ->
1832
1835
error_msg = 'GenStage producer ~tp received duplicated subscription from: ~tp~n'
1833
- :error_logger . error_msg ( error_msg , [ self_name ( ) , from ] )
1836
+ :error_logger . error_msg ( error_msg , [ Utils . self_name ( ) , from ] )
1834
1837
1835
1838
msg = { :"$gen_consumer" , { self ( ) , ref } , { :cancel , :duplicated_subscription } }
1836
1839
send_noconnect ( consumer_pid , msg )
@@ -1877,7 +1880,7 @@ defmodule GenStage do
1877
1880
1878
1881
def handle_info ( { :"$gen_consumer" , _ , _ } = msg , % { type: :producer } = stage ) do
1879
1882
error_msg = 'GenStage producer ~tp received $gen_consumer message: ~tp~n'
1880
- :error_logger . error_msg ( error_msg , [ self_name ( ) , msg ] )
1883
+ :error_logger . error_msg ( error_msg , [ Utils . self_name ( ) , msg ] )
1881
1884
{ :noreply , stage }
1882
1885
end
1883
1886
@@ -2066,7 +2069,7 @@ defmodule GenStage do
2066
2069
2067
2070
defp producer_demand ( _mode , % { type: type } = stage ) when type != :producer do
2068
2071
error_msg = 'Demand mode can only be set for producers, GenStage ~tp is a ~ts'
2069
- :error_logger . error_msg ( error_msg , [ self_name ( ) , type ] )
2072
+ :error_logger . error_msg ( error_msg , [ Utils . self_name ( ) , type ] )
2070
2073
{ :noreply , stage }
2071
2074
end
2072
2075
@@ -2189,7 +2192,7 @@ defmodule GenStage do
2189
2192
error_msg =
2190
2193
'GenStage consumer ~tp cannot dispatch events (an empty list must be returned): ~tp~n'
2191
2194
2192
- :error_logger . error_msg ( error_msg , [ self_name ( ) , events ] )
2195
+ :error_logger . error_msg ( error_msg , [ Utils . self_name ( ) , events ] )
2193
2196
stage
2194
2197
end
2195
2198
@@ -2288,7 +2291,7 @@ defmodule GenStage do
2288
2291
2289
2292
excess ->
2290
2293
error_msg = 'GenStage producer ~tp has discarded ~tp events from buffer'
2291
- :error_logger . warning_msg ( error_msg , [ self_name ( ) , excess ] )
2294
+ :error_logger . warning_msg ( error_msg , [ Utils . self_name ( ) , excess ] )
2292
2295
end
2293
2296
2294
2297
stage = % { stage | buffer: { queue , counter , infos } }
@@ -2427,7 +2430,7 @@ defmodule GenStage do
2427
2430
end
2428
2431
2429
2432
defp consumer_receive ( { _ , ref } = from , { producer_id , cancel , { demand , min , max } } , events , stage ) do
2430
- { demand , batches } = split_batches ( events , from , min , max , demand )
2433
+ { demand , batches } = Utils . split_batches ( events , from , min , max , demand )
2431
2434
stage = put_in ( stage . producers [ ref ] , { producer_id , cancel , { demand , min , max } } )
2432
2435
{ batches , stage }
2433
2436
end
@@ -2471,15 +2474,16 @@ defmodule GenStage do
2471
2474
2472
2475
defp consumer_subscribe ( _cancel , to , _opts , % { type: :producer } = stage ) do
2473
2476
error_msg = 'GenStage producer ~tp cannot be subscribed to another stage: ~tp~n'
2474
- :error_logger . error_msg ( error_msg , [ self_name ( ) , to ] )
2477
+ :error_logger . error_msg ( error_msg , [ Utils . self_name ( ) , to ] )
2475
2478
{ :reply , { :error , :not_a_consumer } , stage }
2476
2479
end
2477
2480
2478
2481
defp consumer_subscribe ( current , to , opts , stage ) do
2479
- with { :ok , cancel , _ } <-
2480
- validate_in ( opts , :cancel , :permanent , [ :temporary , :transient , :permanent ] ) ,
2481
- { :ok , max , _ } <- validate_integer ( opts , :max_demand , 1000 , 1 , :infinity , false ) ,
2482
- { :ok , min , _ } <- validate_integer ( opts , :min_demand , div ( max , 2 ) , 0 , max - 1 , false ) do
2482
+ with { :ok , max , _ } <- Utils . validate_integer ( opts , :max_demand , 1000 , 1 , :infinity , false ) ,
2483
+ { :ok , min , _ } <-
2484
+ Utils . validate_integer ( opts , :min_demand , div ( max , 2 ) , 0 , max - 1 , false ) ,
2485
+ { :ok , cancel , _ } <-
2486
+ Utils . validate_in ( opts , :cancel , :permanent , [ :temporary , :transient , :permanent ] ) do
2483
2487
producer_pid = GenServer . whereis ( to )
2484
2488
2485
2489
cond do
@@ -2498,7 +2502,7 @@ defmodule GenStage do
2498
2502
else
2499
2503
{ :error , message } ->
2500
2504
error_msg = 'GenStage consumer ~tp subscribe received invalid option: ~ts~n'
2501
- :error_logger . error_msg ( error_msg , [ self_name ( ) , message ] )
2505
+ :error_logger . error_msg ( error_msg , [ Utils . self_name ( ) , message ] )
2502
2506
{ :reply , { :error , { :bad_opts , message } } , stage }
2503
2507
end
2504
2508
end
@@ -2559,11 +2563,11 @@ defmodule GenStage do
2559
2563
case noreply_callback ( :handle_cancel , [ kind_reason , pid_ref , state ] , stage ) do
2560
2564
{ :noreply , stage }
2561
2565
when mode == :permanent
2562
- when mode == :transient and not is_transient_shutdown ( reason ) ->
2566
+ when mode == :transient and not Utils . is_transient_shutdown ( reason ) ->
2563
2567
error_msg =
2564
2568
'GenStage consumer ~tp is stopping after receiving cancel from producer ~tp with reason: ~tp~n'
2565
2569
2566
- :error_logger . info_msg ( error_msg , [ self_name ( ) , pid , reason ] )
2570
+ :error_logger . info_msg ( error_msg , [ Utils . self_name ( ) , pid , reason ] )
2567
2571
{ :stop , reason , stage }
2568
2572
2569
2573
other ->
0 commit comments