Skip to content

Commit 3236449

Browse files
author
José Valim
committed
Ensure producer_consumer stops asking if demand is 0
1 parent 843b7d1 commit 3236449

File tree

2 files changed

+147
-70
lines changed

2 files changed

+147
-70
lines changed

lib/gen_stage.ex

Lines changed: 69 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1986,7 +1986,7 @@ defmodule GenStage do
19861986
type: :producer_consumer,
19871987
buffer: {:queue.new(), 0, init_wheel(buffer_size)},
19881988
buffer_config: {buffer_size, buffer_keep},
1989-
events: 0,
1989+
events: {:queue.new(), 0},
19901990
dispatcher_mod: dispatcher_mod,
19911991
dispatcher_state: dispatcher_state
19921992
}
@@ -2200,17 +2200,11 @@ defmodule GenStage do
22002200

22012201
def handle_info(
22022202
{:"$gen_consumer", {producer_pid, ref}, events},
2203-
%{type: :producer_consumer, events: demand_or_queue, producers: producers} = stage
2203+
%{type: :producer_consumer, events: {queue, counter}, producers: producers} = stage
22042204
)
22052205
when is_list(events) do
22062206
case producers do
22072207
%{^ref => _entry} ->
2208-
{counter, queue} =
2209-
case demand_or_queue do
2210-
demand when is_integer(demand) -> {demand, :queue.new()}
2211-
queue -> {0, queue}
2212-
end
2213-
22142208
queue = put_pc_events(events, ref, queue)
22152209
take_pc_events(queue, counter, stage)
22162210

@@ -2229,8 +2223,7 @@ defmodule GenStage do
22292223
case producers do
22302224
%{^ref => entry} ->
22312225
{batches, stage} = consumer_receive(from, entry, events, stage)
2232-
{_, reply} = consumer_dispatch(batches, from, mod, state, stage, 0, false)
2233-
reply
2226+
consumer_dispatch(batches, from, mod, state, stage, false)
22342227

22352228
_ ->
22362229
msg = {:"$gen_producer", {self(), ref}, {:cancel, :unknown_subscription}}
@@ -2489,27 +2482,25 @@ defmodule GenStage do
24892482

24902483
defp dispatcher_callback(callback, args, %{dispatcher_mod: dispatcher_mod} = stage) do
24912484
{:ok, counter, dispatcher_state} = apply(dispatcher_mod, callback, args)
2492-
stage = %{stage | dispatcher_state: dispatcher_state}
24932485

2494-
case take_from_buffer(counter, stage) do
2495-
{:ok, 0, stage} ->
2496-
{:noreply, stage}
2486+
case stage do
2487+
%{type: :producer_consumer, events: {queue, demand}} ->
2488+
counter = demand + counter
2489+
stage = %{stage | dispatcher_state: dispatcher_state, events: {queue, counter}}
2490+
{:ok, _, stage} = take_from_buffer(counter, stage)
2491+
%{events: {queue, counter}} = stage
2492+
take_pc_events(queue, counter, stage)
24972493

2498-
{:ok, counter, stage} when is_integer(counter) and counter > 0 ->
2499-
case stage do
2500-
# producer
2501-
%{events: :forward, state: state} ->
2494+
%{} ->
2495+
case take_from_buffer(counter, %{stage | dispatcher_state: dispatcher_state}) do
2496+
{:ok, 0, stage} ->
2497+
{:noreply, stage}
2498+
2499+
{:ok, counter, %{events: :forward, state: state} = stage} ->
25022500
noreply_callback(:handle_demand, [counter, state], stage)
25032501

2504-
%{events: events} when is_list(events) ->
2502+
{:ok, counter, %{events: events} = stage} when is_list(events) ->
25052503
{:noreply, %{stage | events: [counter | events]}}
2506-
2507-
# producer_consumer
2508-
%{events: events} when is_integer(events) ->
2509-
{:noreply, %{stage | events: counter + events}}
2510-
2511-
%{events: queue} ->
2512-
take_pc_events(queue, counter, stage)
25132504
end
25142505
end
25152506
end
@@ -2534,7 +2525,24 @@ defmodule GenStage do
25342525
defp dispatch_events(events, length, stage) do
25352526
%{dispatcher_mod: dispatcher_mod, dispatcher_state: dispatcher_state} = stage
25362527
{:ok, events, dispatcher_state} = dispatcher_mod.dispatch(events, length, dispatcher_state)
2537-
buffer_events(events, %{stage | dispatcher_state: dispatcher_state})
2528+
2529+
stage =
2530+
case stage do
2531+
%{type: :producer_consumer, events: {queue, demand}} ->
2532+
if demand < length - length(events) do
2533+
IO.puts(Exception.format_stacktrace())
2534+
IO.inspect({demand, length, length(events)})
2535+
end
2536+
2537+
demand = demand - (length - length(events))
2538+
2539+
%{stage | dispatcher_state: dispatcher_state, events: {queue, max(demand, 0)}}
2540+
2541+
%{} ->
2542+
%{stage | dispatcher_state: dispatcher_state}
2543+
end
2544+
2545+
buffer_events(events, stage)
25382546
end
25392547

25402548
defp take_from_buffer(counter, %{buffer: {_, buffer, _}} = stage)
@@ -2669,14 +2677,12 @@ defmodule GenStage do
26692677
{:reply, :ok, stage}
26702678
end
26712679

2672-
defp producer_info(msg, %{type: :producer_consumer, events: demand_or_queue} = stage) do
2680+
defp producer_info(msg, %{type: :producer_consumer, events: {queue, demand}} = stage) do
26732681
stage =
2674-
case demand_or_queue do
2675-
demand when is_integer(demand) ->
2676-
buffer_or_dispatch_info(msg, stage)
2677-
2678-
queue ->
2679-
%{stage | events: :queue.in({:info, msg}, queue)}
2682+
if :queue.is_empty(queue) do
2683+
buffer_or_dispatch_info(msg, stage)
2684+
else
2685+
%{stage | events: {:queue.in({:info, msg}, queue), demand}}
26802686
end
26812687

26822688
{:reply, :ok, stage}
@@ -2798,32 +2804,32 @@ defmodule GenStage do
27982804
defp split_events([event | events], limit, counter, acc),
27992805
do: split_events(events, limit, counter + 1, [event | acc])
28002806

2801-
defp consumer_dispatch([{batch, ask} | batches], from, mod, state, stage, count, _hibernate?) do
2807+
defp consumer_dispatch([{batch, ask} | batches], from, mod, state, stage, _hibernate?) do
28022808
case mod.handle_events(batch, from, state) do
28032809
{:noreply, events, state} when is_list(events) ->
28042810
stage = dispatch_events(events, length(events), stage)
28052811
ask(from, ask, [:noconnect])
2806-
consumer_dispatch(batches, from, mod, state, stage, count + length(events), false)
2812+
consumer_dispatch(batches, from, mod, state, stage, false)
28072813

28082814
{:noreply, events, state, :hibernate} when is_list(events) ->
28092815
stage = dispatch_events(events, length(events), stage)
28102816
ask(from, ask, [:noconnect])
2811-
consumer_dispatch(batches, from, mod, state, stage, count + length(events), true)
2817+
consumer_dispatch(batches, from, mod, state, stage, true)
28122818

28132819
{:stop, reason, state} ->
2814-
{count, {:stop, reason, %{stage | state: state}}}
2820+
{:stop, reason, %{stage | state: state}}
28152821

28162822
other ->
2817-
{count, {:stop, {:bad_return_value, other}, %{stage | state: state}}}
2823+
{:stop, {:bad_return_value, other}, %{stage | state: state}}
28182824
end
28192825
end
28202826

2821-
defp consumer_dispatch([], _from, _mod, state, stage, count, false) do
2822-
{count, {:noreply, %{stage | state: state}}}
2827+
defp consumer_dispatch([], _from, _mod, state, stage, false) do
2828+
{:noreply, %{stage | state: state}}
28232829
end
28242830

2825-
defp consumer_dispatch([], _from, _mod, state, stage, count, true) do
2826-
{count, {:noreply, %{stage | state: state}, :hibernate}}
2831+
defp consumer_dispatch([], _from, _mod, state, stage, true) do
2832+
{:noreply, %{stage | state: state}, :hibernate}
28272833
end
28282834

28292835
defp consumer_subscribe({to, opts}, stage) when is_list(opts),
@@ -2903,14 +2909,13 @@ defmodule GenStage do
29032909
mode,
29042910
kind_reason,
29052911
pid_ref,
2906-
%{type: :producer_consumer, events: demand_or_queue} = stage
2912+
%{type: :producer_consumer, events: {queue, demand}} = stage
29072913
) do
2908-
case demand_or_queue do
2909-
demand when is_integer(demand) ->
2910-
invoke_cancel(mode, kind_reason, pid_ref, stage)
2911-
2912-
queue ->
2913-
{:noreply, %{stage | events: :queue.in({:cancel, mode, kind_reason, pid_ref}, queue)}}
2914+
if :queue.is_empty(queue) do
2915+
invoke_cancel(mode, kind_reason, pid_ref, stage)
2916+
else
2917+
queue = :queue.in({:cancel, mode, kind_reason, pid_ref}, queue)
2918+
{:noreply, %{stage | events: {queue, demand}}}
29142919
end
29152920
end
29162921

@@ -2946,11 +2951,11 @@ defmodule GenStage do
29462951
{producer_id, _, _} = entry
29472952
from = {producer_id, ref}
29482953
{batches, stage} = consumer_receive(from, entry, events, stage)
2949-
consumer_dispatch(batches, from, mod, state, stage, 0, false)
2954+
consumer_dispatch(batches, from, mod, state, stage, false)
29502955

29512956
%{} ->
29522957
# We queued but producer was removed
2953-
consumer_dispatch([{events, 0}], {:pid, ref}, mod, state, stage, 0, false)
2958+
consumer_dispatch([{events, 0}], {:pid, ref}, mod, state, stage, false)
29542959
end
29552960
end
29562961

@@ -2972,27 +2977,27 @@ defmodule GenStage do
29722977
end
29732978

29742979
{{:value, {events, ref}}, queue} ->
2975-
case send_pc_events(events, ref, stage) do
2976-
{sent, {:noreply, stage}} ->
2977-
take_pc_events(queue, counter - sent, stage)
2980+
case send_pc_events(events, ref, %{stage | events: {queue, counter}}) do
2981+
{:noreply, %{events: {queue, counter}} = stage} ->
2982+
take_pc_events(queue, counter, stage)
29782983

2979-
{sent, {:noreply, stage, :hibernate}} ->
2980-
take_pc_events(queue, counter - sent, stage)
2984+
{:noreply, %{events: {queue, counter}} = stage, :hibernate} ->
2985+
take_pc_events(queue, counter, stage)
29812986

2982-
{_, {:stop, _, _} = stop} ->
2987+
{:stop, _, _} = stop ->
29832988
stop
29842989
end
29852990

2986-
{:empty, _queue} ->
2987-
{:noreply, %{stage | events: counter}}
2991+
{:empty, queue} ->
2992+
{:noreply, %{stage | events: {queue, counter}}}
29882993
end
29892994
end
29902995

29912996
# It is OK to send more events than the consumer has
2992-
# asked because those will always be buffered. Once
2993-
# we have taken from the buffer, the event queue will
2997+
# asked (counter < 0) because those will always be buffered.
2998+
# Once we have taken from the buffer, the event queue will
29942999
# be adjusted again.
2995-
defp take_pc_events(queue, _counter, stage) do
2996-
{:noreply, %{stage | events: queue}}
3000+
defp take_pc_events(queue, counter, stage) do
3001+
{:noreply, %{stage | events: {queue, counter}}}
29973002
end
29983003
end

test/gen_stage_test.exs

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,35 @@ defmodule GenStageTest do
141141
end
142142
end
143143

144+
defmodule Postponer do
145+
@moduledoc """
146+
Discards all events.
147+
"""
148+
149+
use GenStage
150+
151+
def start_link(init, opts \\ []) do
152+
GenStage.start_link(__MODULE__, init, opts)
153+
end
154+
155+
def init(init) do
156+
init
157+
end
158+
159+
def handle_events(events, _from, recipient) do
160+
send(self(), {:postponed, events})
161+
{:noreply, [], recipient}
162+
end
163+
164+
def handle_info({:postponed, events}, recipient) do
165+
send(recipient, {:postponed, events})
166+
{:noreply, events, recipient}
167+
end
168+
end
169+
144170
defmodule Discarder do
145171
@moduledoc """
146-
Multiples every event by two.
172+
Discards all events.
147173
"""
148174

149175
use GenStage
@@ -157,14 +183,36 @@ defmodule GenStageTest do
157183
end
158184

159185
def handle_events(events, _from, recipient) do
160-
send(recipient, {:producer_consumed, events})
186+
send(recipient, {:discarded, events})
187+
{:noreply, [], recipient}
188+
end
189+
end
190+
191+
defmodule Sleeper do
192+
@moduledoc """
193+
Sleeps after the first batch.
194+
"""
195+
196+
use GenStage
197+
198+
def start_link(init, opts \\ []) do
199+
GenStage.start_link(__MODULE__, init, opts)
200+
end
201+
202+
def init(init) do
203+
init
204+
end
205+
206+
def handle_events(events, _from, recipient) do
207+
send(recipient, {:sleep, events})
208+
Process.sleep(:infinity)
161209
{:noreply, [], recipient}
162210
end
163211
end
164212

165213
defmodule Forwarder do
166214
@moduledoc """
167-
A consumer that forwards messages to the given process.
215+
Forwards messages to the given process.
168216
"""
169217

170218
use GenStage
@@ -440,6 +488,30 @@ defmodule GenStageTest do
440488
assert_receive {:producer_consumed, ^batch}
441489
end
442490

491+
test "stops asking when consumer stops asking" do
492+
{:ok, producer} = Counter.start_link({:producer, 0})
493+
494+
{:ok, doubler} =
495+
Postponer.start_link(
496+
{:producer_consumer, self(),
497+
subscribe_to: [{producer, max_demand: 10, min_demand: 8}]}
498+
)
499+
500+
{:ok, _} =
501+
Sleeper.start_link(
502+
{:consumer, self(), subscribe_to: [{doubler, max_demand: 10, min_demand: 5}]}
503+
)
504+
505+
assert_receive {:postponed, [0, 1]}
506+
assert_receive {:sleep, [0, 1]}
507+
assert_receive {:postponed, [2, 3]}
508+
assert_receive {:postponed, [4, 5]}
509+
assert_receive {:postponed, [6, 7]}
510+
assert_receive {:postponed, [8, 9]}
511+
refute_receive {:sleep, [2, 3]}
512+
refute_receive {:postponed, [10, 11]}
513+
end
514+
443515
test "keeps emitting events even when discarded" do
444516
{:ok, producer} = Counter.start_link({:producer, 0})
445517

@@ -455,11 +527,11 @@ defmodule GenStageTest do
455527
)
456528

457529
batch = Enum.to_list(0..19)
458-
assert_receive {:producer_consumed, ^batch}
530+
assert_receive {:discarded, ^batch}
459531
batch = Enum.to_list(100..119)
460-
assert_receive {:producer_consumed, ^batch}
532+
assert_receive {:discarded, ^batch}
461533
batch = Enum.to_list(1000..1019)
462-
assert_receive {:producer_consumed, ^batch}
534+
assert_receive {:discarded, ^batch}
463535
end
464536

465537
test "with shared (broadcast) demand" do

0 commit comments

Comments
 (0)