Skip to content

Commit 355e9d6

Browse files
author
José Valim
committed
Add GenStage.demand/1
1 parent f07399e commit 355e9d6

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

lib/gen_stage.ex

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,6 +1231,16 @@ defmodule GenStage do
12311231
cast(stage, {:"$info", msg})
12321232
end
12331233

1234+
@doc """
1235+
Returns the demand mode for a producer.
1236+
1237+
It is either `:forward` or `:accumulate`. See `demand/2`.
1238+
"""
1239+
@spec demand(stage) :: :forward | :accumulate
1240+
def demand(stage) do
1241+
call(stage, :"$demand")
1242+
end
1243+
12341244
@doc """
12351245
Sets the demand mode for a producer.
12361246
@@ -1784,6 +1794,10 @@ defmodule GenStage do
17841794
producer_info(msg, stage)
17851795
end
17861796

1797+
def handle_call(:"$demand", _from, stage) do
1798+
producer_demand(stage)
1799+
end
1800+
17871801
def handle_call({:"$subscribe", current, to, opts}, _from, stage) do
17881802
consumer_subscribe(current, to, opts, stage)
17891803
end
@@ -2089,6 +2103,14 @@ defmodule GenStage do
20892103

20902104
## Producer helpers
20912105

2106+
defp producer_demand(%{events: :forward} = stage) do
2107+
{:reply, :forward, stage}
2108+
end
2109+
2110+
defp producer_demand(%{events: events} = stage) when is_list(events) do
2111+
{:reply, :accumulate, stage}
2112+
end
2113+
20922114
defp producer_demand(:forward, %{type: :producer_consumer} = stage) do
20932115
# That's the only mode on producer consumers.
20942116
{:noreply, stage}

test/gen_stage_test.exs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,26 +627,33 @@ defmodule GenStageTest do
627627
describe "demand" do
628628
test "can be set to :accumulate on init" do
629629
{:ok, producer} = Counter.start_link({:producer, 0, demand: :accumulate})
630+
assert GenStage.demand(producer) == :accumulate
630631
{:ok, consumer} = Forwarder.start_link({:consumer, self()})
631632
GenStage.sync_subscribe(consumer, to: producer, max_demand: 4, min_demand: 0)
632633
refute_received {:consumed, [0, 1, 2, 3]}
633634
GenStage.demand(producer, :forward)
635+
assert GenStage.demand(producer) == :forward
634636
assert_receive {:consumed, [0, 1, 2, 3]}
635637
end
636638

637639
test "can be set to :accumulate via API" do
638640
{:ok, producer} = Counter.start_link({:producer, 0})
641+
assert GenStage.demand(producer) == :forward
639642
GenStage.demand(producer, :accumulate)
643+
assert GenStage.demand(producer) == :accumulate
640644
{:ok, consumer} = Forwarder.start_link({:consumer, self()})
641645
GenStage.sync_subscribe(consumer, to: producer, max_demand: 4, min_demand: 0)
642646
refute_received {:consumed, [0, 1, 2, 3]}
643647
GenStage.demand(producer, :forward)
648+
assert GenStage.demand(producer) == :forward
644649
assert_receive {:consumed, [0, 1, 2, 3]}
645650
end
646651

647652
test "can be set to :forward via API before subscriptions" do
648653
{:ok, producer} = Counter.start_link({:producer, 0, demand: :accumulate})
654+
assert GenStage.demand(producer) == :accumulate
649655
GenStage.demand(producer, :forward)
656+
assert GenStage.demand(producer) == :forward
650657
{:ok, consumer} = Forwarder.start_link({:consumer, self()})
651658
GenStage.sync_subscribe(consumer, to: producer, max_demand: 4, min_demand: 0)
652659
assert_receive {:consumed, [0, 1, 2, 3]}

0 commit comments

Comments
 (0)