Skip to content

Commit 77485c8

Browse files
author
José Valim
committed
Handle error tuples only on subscribe callback
1 parent a09aa15 commit 77485c8

File tree

1 file changed

+13
-18
lines changed

1 file changed

+13
-18
lines changed

lib/gen_stage.ex

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1901,14 +1901,7 @@ defmodule GenStage do
19011901
mon_ref = Process.monitor(consumer_pid)
19021902
stage = put_in(stage.monitors[mon_ref], ref)
19031903
stage = put_in(stage.consumers[ref], {consumer_pid, mon_ref})
1904-
1905-
case producer_subscribe(opts, from, stage) do
1906-
{:error, :already_subscribed} ->
1907-
producer_cancel(ref, :cancel, :already_subscribed, stage)
1908-
1909-
other ->
1910-
other
1911-
end
1904+
producer_subscribe(opts, from, stage)
19121905

19131906
other ->
19141907
other
@@ -2171,13 +2164,19 @@ defmodule GenStage do
21712164
end
21722165

21732166
defp producer_subscribe(opts, from, stage) do
2174-
%{mod: mod, state: state, dispatcher_state: dispatcher_state} = stage
2167+
%{mod: mod, state: state, dispatcher_mod: dispatcher_mod, dispatcher_state: dispatcher_state} =
2168+
stage
21752169

21762170
case maybe_subscribe(mod, :consumer, opts, from, state) do
21772171
{:automatic, state} ->
2178-
# Call the dispatcher after since it may generate demand and the
2179-
# main module must know the consumer is subscribed.
2180-
dispatcher_callback(:subscribe, [opts, from, dispatcher_state], %{stage | state: state})
2172+
stage = %{stage | state: state}
2173+
2174+
# Call the dispatcher after since it may generate demand
2175+
# and the main module must know the consumer is subscribed.
2176+
case dispatcher_mod.subscribe(opts, from, dispatcher_state) do
2177+
{:ok, _, _} = ok -> handle_dispatcher_result(ok, stage)
2178+
{:error, term} -> producer_cancel(elem(from, 1), :cancel, term, stage)
2179+
end
21812180

21822181
{:stop, reason, state} ->
21832182
{:stop, reason, %{stage | state: state}}
@@ -2228,14 +2227,10 @@ defmodule GenStage do
22282227
end
22292228

22302229
defp dispatcher_callback(callback, args, %{dispatcher_mod: dispatcher_mod} = stage) do
2231-
dispatcher_mod |> apply(callback, args) |> handle_callback_result(stage)
2232-
end
2233-
2234-
defp handle_callback_result(e = {:error, _reason}, _stage) do
2235-
e
2230+
dispatcher_mod |> apply(callback, args) |> handle_dispatcher_result(stage)
22362231
end
22372232

2238-
defp handle_callback_result({:ok, counter, dispatcher_state}, stage) do
2233+
defp handle_dispatcher_result({:ok, counter, dispatcher_state}, stage) do
22392234
case stage do
22402235
%{type: :producer_consumer, events: {queue, demand}} ->
22412236
counter = demand + counter

0 commit comments

Comments
 (0)