Skip to content

Commit a09aa15

Browse files
evnujosevalim
authored andcommitted
Make broadcast dispatcher subscribe idempotent (#225)
1 parent c1fab76 commit a09aa15

File tree

5 files changed

+224
-87
lines changed

5 files changed

+224
-87
lines changed

lib/gen_stage.ex

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1901,7 +1901,14 @@ defmodule GenStage do
19011901
mon_ref = Process.monitor(consumer_pid)
19021902
stage = put_in(stage.monitors[mon_ref], ref)
19031903
stage = put_in(stage.consumers[ref], {consumer_pid, mon_ref})
1904-
producer_subscribe(opts, from, stage)
1904+
1905+
case producer_subscribe(opts, from, stage) do
1906+
{:error, :already_subscribed} ->
1907+
producer_cancel(ref, :cancel, :already_subscribed, stage)
1908+
1909+
other ->
1910+
other
1911+
end
19051912

19061913
other ->
19071914
other
@@ -2221,8 +2228,14 @@ defmodule GenStage do
22212228
end
22222229

22232230
defp dispatcher_callback(callback, args, %{dispatcher_mod: dispatcher_mod} = stage) do
2224-
{:ok, counter, dispatcher_state} = apply(dispatcher_mod, callback, args)
2231+
dispatcher_mod |> apply(callback, args) |> handle_callback_result(stage)
2232+
end
2233+
2234+
defp handle_callback_result(e = {:error, _reason}, _stage) do
2235+
e
2236+
end
22252237

2238+
defp handle_callback_result({:ok, counter, dispatcher_state}, stage) do
22262239
case stage do
22272240
%{type: :producer_consumer, events: {queue, demand}} ->
22282241
counter = demand + counter
@@ -2500,11 +2513,17 @@ defmodule GenStage do
25002513
{:noreply, stage}
25012514
when mode == :permanent
25022515
when mode == :transient and not Utils.is_transient_shutdown(reason) ->
2503-
error_msg =
2504-
'GenStage consumer ~tp is stopping after receiving cancel from producer ~tp with reason: ~tp~n'
2516+
case reason do
2517+
:already_subscribed ->
2518+
{:noreply, stage}
25052519

2506-
:error_logger.info_msg(error_msg, [Utils.self_name(), pid, reason])
2507-
{:stop, reason, stage}
2520+
_other ->
2521+
error_msg =
2522+
'GenStage consumer ~tp is stopping after receiving cancel from producer ~tp with reason: ~tp~n'
2523+
2524+
:error_logger.info_msg(error_msg, [Utils.self_name(), pid, reason])
2525+
{:stop, reason, stage}
2526+
end
25082527

25092528
other ->
25102529
other

lib/gen_stage/dispatcher.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ defmodule GenStage.Dispatcher do
4141
Called every time the producer gets a new subscriber.
4242
"""
4343
@callback subscribe(opts :: keyword(), from :: {pid, reference}, state :: term) ::
44-
{:ok, demand :: non_neg_integer, new_state}
44+
{:ok, demand :: non_neg_integer, new_state} | {:error, term}
4545
when new_state: term
4646

4747
@doc """

lib/gen_stage/dispatchers/broadcast_dispatcher.ex

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@ defmodule GenStage.BroadcastDispatcher do
3333

3434
@behaviour GenStage.Dispatcher
3535

36+
require Logger
37+
3638
@doc false
3739
def init(_opts) do
38-
{:ok, {[], 0}}
40+
{:ok, {[], 0, MapSet.new()}}
3941
end
4042

4143
@doc false
@@ -45,36 +47,48 @@ defmodule GenStage.BroadcastDispatcher do
4547
end
4648

4749
@doc false
48-
def subscribe(opts, {pid, ref}, {demands, waiting}) do
50+
def subscribe(opts, {pid, ref}, {demands, waiting, subscribed_processes}) do
4951
selector = validate_selector(opts)
50-
{:ok, 0, {add_demand(-waiting, pid, ref, selector, demands), waiting}}
52+
53+
if subscribed?(subscribed_processes, pid) do
54+
Logger.error(fn ->
55+
"#{inspect(pid)} is already registered with #{inspect(self())}. " <>
56+
"This subscription has been discared."
57+
end)
58+
59+
{:error, :already_subscribed}
60+
else
61+
subscribed_processes = add_subscriber(subscribed_processes, pid)
62+
{:ok, 0, {add_demand(-waiting, pid, ref, selector, demands), waiting, subscribed_processes}}
63+
end
5164
end
5265

5366
@doc false
54-
def cancel({_, ref}, {demands, waiting}) do
67+
def cancel({pid, ref}, {demands, waiting, subscribed_processes}) do
5568
# Since we may have removed the process we were waiting on,
5669
# cancellation may actually generate demand!
5770
demands = delete_demand(ref, demands)
5871
new_min = get_min(demands)
5972
demands = adjust_demand(new_min, demands)
60-
{:ok, new_min, {demands, waiting + new_min}}
73+
subscribed_processes = delete_subscriber(subscribed_processes, pid)
74+
{:ok, new_min, {demands, waiting + new_min, subscribed_processes}}
6175
end
6276

6377
@doc false
64-
def ask(counter, {pid, ref}, {demands, waiting}) do
78+
def ask(counter, {pid, ref}, {demands, waiting, subscribed_processes}) do
6579
{current, selector, demands} = pop_demand(ref, demands)
6680
demands = add_demand(current + counter, pid, ref, selector, demands)
6781
new_min = get_min(demands)
6882
demands = adjust_demand(new_min, demands)
69-
{:ok, new_min, {demands, waiting + new_min}}
83+
{:ok, new_min, {demands, waiting + new_min, subscribed_processes}}
7084
end
7185

7286
@doc false
73-
def dispatch(events, _length, {demands, 0}) do
74-
{:ok, events, {demands, 0}}
87+
def dispatch(events, _length, {demands, 0, subscribed_processes}) do
88+
{:ok, events, {demands, 0, subscribed_processes}}
7589
end
7690

77-
def dispatch(events, length, {demands, waiting}) do
91+
def dispatch(events, length, {demands, waiting, subscribed_processes}) do
7892
{deliver_now, deliver_later, waiting} = split_events(events, length, waiting)
7993

8094
for {_, pid, ref, selector} <- demands do
@@ -92,7 +106,7 @@ defmodule GenStage.BroadcastDispatcher do
92106
:ok
93107
end
94108

95-
{:ok, deliver_later, {demands, waiting}}
109+
{:ok, deliver_later, {demands, waiting, subscribed_processes}}
96110
end
97111

98112
defp filter_and_count(messages, nil) do
@@ -168,4 +182,16 @@ defmodule GenStage.BroadcastDispatcher do
168182
defp delete_demand(ref, demands) do
169183
List.keydelete(demands, ref, 2)
170184
end
185+
186+
defp add_subscriber(subscribed_processes, pid) do
187+
MapSet.put(subscribed_processes, pid)
188+
end
189+
190+
defp delete_subscriber(subscribed_processes, pid) do
191+
MapSet.delete(subscribed_processes, pid)
192+
end
193+
194+
defp subscribed?(subscribed_processes, pid) do
195+
MapSet.member?(subscribed_processes, pid)
196+
end
171197
end

0 commit comments

Comments
 (0)