Skip to content

Commit 88e19ab

Browse files
authored
Allow explicitly specifying :max_demand in DemandDispatcher (#291)
1 parent 8aa6f77 commit 88e19ab

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

lib/gen_stage/dispatchers/demand_dispatcher.ex

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ defmodule GenStage.DemandDispatcher do
1111
The demand dispatcher accepts the following options
1212
on initialization:
1313
14-
* `:shuffle_demands_on_first_dispatch` - when true, shuffle the initial demands list
14+
* `:shuffle_demands_on_first_dispatch` - when `true`, shuffle the initial demands list
1515
which is constructed on subscription before first dispatch. It prevents overloading
16-
the first consumer on first dispatch. Defaults to `false`.
16+
the first consumer on first dispatch. Defaults to `false`.
17+
18+
* `:max_demand` - the maximum demand expected on `GenStage.ask/3`.
19+
Defaults to the first demand asked.
1720
1821
### Examples
1922
@@ -27,8 +30,9 @@ defmodule GenStage.DemandDispatcher do
2730
@doc false
2831
def init(opts) do
2932
shuffle_demand = Keyword.get(opts, :shuffle_demands_on_first_dispatch, false)
33+
max_demand = Keyword.get(opts, :max_demand)
3034

31-
{:ok, {[], 0, nil, shuffle_demand}}
35+
{:ok, {[], 0, max_demand, shuffle_demand}}
3236
end
3337

3438
@doc false
@@ -102,7 +106,7 @@ defmodule GenStage.DemandDispatcher do
102106
{now, later, length - counter, 0}
103107
end
104108

105-
defp add_demand(counter, pid, ref, [{c, _, _} | _] = demands) when counter > c do
109+
defp add_demand(counter, pid, ref, [{current, _, _} | _] = demands) when counter > current do
106110
[{counter, pid, ref} | demands]
107111
end
108112

test/gen_stage/demand_dispatcher_test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ defmodule GenStage.DemandDispatcherTest do
88

99
defp dispatcher(opts) do
1010
shuffle_demand = Keyword.get(opts, :shuffle_demands_on_first_dispatch, false)
11-
{:ok, {[], 0, nil, ^shuffle_demand} = state} = D.init(opts)
11+
max_demand = Keyword.get(opts, :max_demand)
12+
{:ok, {[], 0, ^max_demand, ^shuffle_demand} = state} = D.init(opts)
1213
state
1314
end
1415

@@ -191,14 +192,13 @@ defmodule GenStage.DemandDispatcherTest do
191192
pid = self()
192193
ref1 = make_ref()
193194
ref2 = make_ref()
194-
disp = dispatcher([])
195+
disp = dispatcher(max_demand: 3)
195196

196197
{:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp)
197198
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)
198199

199200
log =
200201
capture_log(fn ->
201-
{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)
202202
{:ok, 4, disp} = D.ask(4, {pid, ref2}, disp)
203203
disp
204204
end)

0 commit comments

Comments
 (0)