@@ -59,22 +59,15 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
5959 defmodule Config do
6060 @ moduledoc false
6161
62- @ type t ( ) :: % __MODULE__ {
63- key: String . t ( ) ,
64- type: Astarte.Flow.Blocks.RandomProducer . supported_types ( ) ,
65- min: number ( ) | nil ,
66- max: number ( ) | nil ,
67- p: float ( ) | nil ,
68- delay_ms: integer ( ) | nil
69- }
70-
7162 defstruct [
7263 :key ,
7364 :type ,
7465 :min ,
7566 :max ,
7667 :p ,
77- :delay_ms
68+ :delay_ms ,
69+ :pending_demand ,
70+ :queue
7871 ]
7972 end
8073
@@ -104,19 +97,57 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
10497
10598 with { :ok , type } <- validate_type ( type ) ,
10699 { :ok , state } <- init_state ( key , type , opts ) do
107- delay_ms = Keyword . get ( opts , :delay_ms )
108- { :producer , % Config { state | delay_ms: delay_ms } }
100+ delay_ms = Keyword . get ( opts , :delay_ms ) || 0
101+ send ( self ( ) , :poll )
102+
103+ { :producer , % Config { state | delay_ms: delay_ms , pending_demand: 0 , queue: :queue . new ( ) } ,
104+ dispatcher: GenStage.BroadcastDispatcher }
109105 else
110106 { :error , reason } ->
111107 { :stop , reason }
112108 end
113109 end
114110
115111 @ impl true
116- def handle_demand ( demand , config ) when demand > 0 do
117- messages = for _ <- 1 .. demand , do: generate_message ( config )
112+ def handle_demand ( incoming_demand , % Config { pending_demand: demand } = config ) do
113+ dispatch_messages ( % { config | pending_demand: demand + incoming_demand } , [
114+ generate_message ( config )
115+ ] )
116+ end
117+
118+ @ impl true
119+ def handle_info ( :poll , config ) do
120+ % Config {
121+ delay_ms: delay_ms ,
122+ queue: queue
123+ } = config
124+
125+ # Schedule next polling
126+ _ = Process . send_after ( self ( ) , :poll , delay_ms )
127+
128+ new_queue =
129+ generate_message ( config )
130+ |> :queue . in ( queue )
131+
132+ new_state = % { config | queue: new_queue }
133+ dispatch_messages ( new_state , [ ] )
134+ end
135+
136+ defp dispatch_messages ( % Config { pending_demand: 0 } = state , messages ) do
137+ { :noreply , Enum . reverse ( messages ) , state }
138+ end
139+
140+ defp dispatch_messages ( % Config { pending_demand: demand , queue: queue } = state , messages ) do
141+ case :queue . out ( queue ) do
142+ { { :value , message } , updated_queue } ->
143+ updated_state = % { state | pending_demand: demand - 1 , queue: updated_queue }
144+ updated_messages = [ message | messages ]
118145
119- { :noreply , messages , config }
146+ dispatch_messages ( updated_state , updated_messages )
147+
148+ { :empty , _queue } ->
149+ { :noreply , Enum . reverse ( messages ) , state }
150+ end
120151 end
121152
122153 defp validate_type ( type ) when type in [ :integer , :real , :boolean ] do
@@ -166,8 +197,6 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
166197 defp generate_message ( % Config { key: key , type: type , delay_ms: delay_ms } = state ) do
167198 data = generate_data ( state )
168199
169- maybe_sleep ( delay_ms )
170-
171200 % Message {
172201 key: key ,
173202 type: type ,
@@ -176,9 +205,6 @@ defmodule Astarte.Flow.Blocks.RandomProducer do
176205 }
177206 end
178207
179- defp maybe_sleep ( nil ) , do: :ok
180- defp maybe_sleep ( delay_ms ) when is_integer ( delay_ms ) , do: :timer . sleep ( delay_ms )
181-
182208 defp generate_data ( % Config { type: :integer , min: min , max: max } ) do
183209 Enum . random ( min .. max )
184210 end
0 commit comments