Skip to content

Commit b5d1772

Browse files
authored
Make :subscribe_to option type less restrictive (#261)
1 parent 55c25b9 commit b5d1772

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

lib/gen_stage.ex

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,7 @@ defmodule GenStage do
792792

793793
@typedoc "Option values used by the `init*` common to `:consumer` and `:producer_consumer` types"
794794
@type consumer_and_producer_consumer_option ::
795-
{:subscribe_to, [module | {module, subscription_options}]}
795+
{:subscribe_to, [atom | pid | {GenServer.server(), subscription_options}]}
796796

797797
@typedoc "Option values used by the `init*` functions when stage type is `:producer`"
798798
@type producer_option :: producer_only_option | producer_and_producer_consumer_option
@@ -2419,6 +2419,8 @@ defmodule GenStage do
24192419
defp consumer_init_subscribe(producers, stage) do
24202420
fold_fun = fn
24212421
to, {:ok, stage} ->
2422+
maybe_print_subscribe_to_deprecation_warning(to)
2423+
24222424
case consumer_subscribe(to, stage) do
24232425
{:reply, _, stage} -> {:ok, stage}
24242426
{:stop, reason, _, _} -> {:stop, reason}
@@ -2432,6 +2434,21 @@ defmodule GenStage do
24322434
:lists.foldl(fold_fun, {:ok, stage}, producers)
24332435
end
24342436

2437+
defp maybe_print_subscribe_to_deprecation_warning(to) do
2438+
log = ':subscribe_to value with type ~ts is deprecated. Change ~tp to {~tp, []} instead.'
2439+
2440+
case to do
2441+
{:global, _} ->
2442+
:error_logger.warning_msg(log, ['{:global, term()}', to, to])
2443+
2444+
{:via, _, _} ->
2445+
:error_logger.warning_msg(log, ['{:via, module(), term()}', to, to])
2446+
2447+
_ ->
2448+
:ok
2449+
end
2450+
end
2451+
24352452
defp consumer_receive({_, ref} = from, {producer_id, cancel, {demand, min, max}}, events, stage) do
24362453
{demand, batches} = Utils.split_batches(events, from, min, max, demand)
24372454
stage = put_in(stage.producers[ref], {producer_id, cancel, {demand, min, max}})

test/gen_stage_test.exs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1868,4 +1868,51 @@ defmodule GenStageTest do
18681868
{:registered_name, :gen_stage_from_enumerable}
18691869
end
18701870
end
1871+
1872+
describe "subscribe_to names" do
1873+
test "can be pid" do
1874+
{:ok, producer} = Counter.start_link({:producer, 0})
1875+
{:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [producer]})
1876+
assert_receive {:consumed, _}
1877+
end
1878+
1879+
test "can be an atom name" do
1880+
producer_name = :producer_atom_name
1881+
{:ok, _} = Counter.start_link({:producer, 0}, name: producer_name)
1882+
{:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [producer_name]})
1883+
assert_receive {:consumed, _}
1884+
end
1885+
1886+
test "can be a via name if passed with options to avoid ambiguity" do
1887+
producer_name = {:via, :global, {:producer, :name}}
1888+
{:ok, _} = Counter.start_link({:producer, 0}, name: producer_name)
1889+
{:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [{producer_name, []}]})
1890+
assert_receive {:consumed, _}
1891+
end
1892+
1893+
test "can be a global name if passed with options to avoid ambiguity" do
1894+
producer_name = {:global, {:producer, :name}}
1895+
{:ok, _} = Counter.start_link({:producer, 0}, name: producer_name)
1896+
{:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [{producer_name, []}]})
1897+
assert_receive {:consumed, _}
1898+
end
1899+
1900+
test "logs warning about ambiguity if global name is passed without options" do
1901+
producer_name = {:global, {:producer, :name}}
1902+
{:ok, _} = Counter.start_link({:producer, 0}, name: producer_name)
1903+
1904+
assert capture_log(fn ->
1905+
{:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [producer_name]})
1906+
end) =~ ":subscribe_to value with type {:global, term()} is deprecated."
1907+
end
1908+
1909+
test "logs warning about ambiguity if via name is passed without options" do
1910+
producer_name = {:via, :global, {:producer, :name}}
1911+
{:ok, _} = Counter.start_link({:producer, 0}, name: producer_name)
1912+
1913+
assert capture_log(fn ->
1914+
{:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [producer_name]})
1915+
end) =~ ":subscribe_to value with type {:via, module(), term()} is deprecated."
1916+
end
1917+
end
18711918
end

0 commit comments

Comments
 (0)