Skip to content

Commit 2d3dd2e

Browse files
author
José Valim
committed
Take buffer management to a separate module
1 parent b36c8b6 commit 2d3dd2e

File tree

6 files changed

+219
-152
lines changed

6 files changed

+219
-152
lines changed

lib/gen_stage.ex

Lines changed: 25 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ defmodule GenStage do
745745
:dispatcher_mod,
746746
:dispatcher_state,
747747
:buffer,
748-
:buffer_config,
748+
:buffer_keep,
749749
events: :forward,
750750
monitors: %{},
751751
producers: %{},
@@ -1637,8 +1637,9 @@ defmodule GenStage do
16371637

16381638
## Callbacks
16391639

1640-
require GenStage.Utils, as: Utils
16411640
@compile :inline_list_funcs
1641+
require GenStage.Utils, as: Utils
1642+
alias GenStage.Buffer
16421643

16431644
@doc false
16441645
def init({mod, args}) do
@@ -1685,8 +1686,8 @@ defmodule GenStage do
16851686
mod: mod,
16861687
state: state,
16871688
type: :producer,
1688-
buffer: {:queue.new(), 0, init_wheel(buffer_size)},
1689-
buffer_config: {buffer_size, buffer_keep},
1689+
buffer: Buffer.new(buffer_size),
1690+
buffer_keep: buffer_keep,
16901691
events: if(demand == :accumulate, do: [], else: :forward),
16911692
dispatcher_mod: dispatcher_mod,
16921693
dispatcher_state: dispatcher_state
@@ -1726,8 +1727,8 @@ defmodule GenStage do
17261727
mod: mod,
17271728
state: state,
17281729
type: :producer_consumer,
1729-
buffer: {:queue.new(), 0, init_wheel(buffer_size)},
1730-
buffer_config: {buffer_size, buffer_keep},
1730+
buffer: Buffer.new(buffer_size),
1731+
buffer_keep: buffer_keep,
17311732
events: {:queue.new(), 0},
17321733
dispatcher_mod: dispatcher_mod,
17331734
dispatcher_state: dispatcher_state
@@ -1982,14 +1983,13 @@ defmodule GenStage do
19821983
buffer: buffer,
19831984
dispatcher_mod: dispatcher_mod
19841985
}) do
1985-
{_, counter, _} = buffer
19861986
consumer_pids = for {_, {pid, _}} <- consumers, do: pid
19871987

19881988
[
19891989
{~c(Stage), :producer},
19901990
{~c(Dispatcher), dispatcher_mod},
19911991
{~c(Consumers), consumer_pids},
1992-
{~c(Buffer size), counter}
1992+
{~c(Buffer size), Buffer.estimate_size(buffer)}
19931993
]
19941994
end
19951995

@@ -2000,7 +2000,6 @@ defmodule GenStage do
20002000
buffer: buffer,
20012001
dispatcher_mod: dispatcher_mod
20022002
}) do
2003-
{_, counter, _} = buffer
20042003
producer_pids = for {_, {pid, _, _}} <- producers, do: pid
20052004
consumer_pids = for {_, {pid, _}} <- consumers, do: pid
20062005

@@ -2009,7 +2008,7 @@ defmodule GenStage do
20092008
{~c(Dispatcher), dispatcher_mod},
20102009
{~c(Producers), producer_pids},
20112010
{~c(Consumers), consumer_pids},
2012-
{~c(Buffer size), counter}
2011+
{~c(Buffer size), Buffer.estimate_size(buffer)}
20132012
]
20142013
end
20152014

@@ -2218,72 +2217,26 @@ defmodule GenStage do
22182217
buffer_events(events, stage)
22192218
end
22202219

2221-
defp take_from_buffer(counter, %{buffer: {_, buffer, _}} = stage)
2222-
when counter == 0
2223-
when buffer == 0 do
2224-
{:ok, counter, stage}
2225-
end
2226-
2227-
defp take_from_buffer(counter, %{buffer: {queue, buffer, infos}} = stage) do
2228-
{queue, events, new_counter, buffer, info, infos} =
2229-
take_from_buffer(queue, [], counter, buffer, infos)
2220+
defp take_from_buffer(counter, %{buffer: buffer} = stage) do
2221+
case Buffer.take_count_or_until_permanent(buffer, counter) do
2222+
:empty ->
2223+
{:ok, counter, stage}
22302224

2231-
# Update the buffer because dispatch events may
2232-
# trigger more events to be buffered.
2233-
stage = %{stage | buffer: {queue, buffer, infos}}
2234-
stage = dispatch_events(events, counter - new_counter, stage)
2235-
2236-
case info do
2237-
{:ok, msg} ->
2238-
take_from_buffer(new_counter, dispatch_info(msg, stage))
2239-
2240-
:error ->
2225+
{:ok, buffer, new_counter, temps, perms} ->
2226+
# Update the buffer because dispatch events may
2227+
# trigger more events to be buffered.
2228+
stage = dispatch_events(temps, counter - new_counter, %{stage | buffer: buffer})
2229+
stage = :lists.foldl(&dispatch_info/2, stage, perms)
22412230
take_from_buffer(new_counter, stage)
22422231
end
22432232
end
22442233

2245-
defp take_from_buffer(queue, events, 0, buffer, infos) do
2246-
{queue, :lists.reverse(events), 0, buffer, :error, infos}
2247-
end
2248-
2249-
defp take_from_buffer(queue, events, counter, 0, infos) do
2250-
{queue, :lists.reverse(events), counter, 0, :error, infos}
2251-
end
2252-
2253-
defp take_from_buffer(queue, events, counter, buffer, infos) when is_reference(infos) do
2254-
{{:value, value}, queue} = :queue.out(queue)
2255-
2256-
case value do
2257-
{^infos, msg} ->
2258-
{queue, :lists.reverse(events), counter, buffer - 1, {:ok, msg}, infos}
2259-
2260-
val ->
2261-
take_from_buffer(queue, [val | events], counter - 1, buffer - 1, infos)
2262-
end
2263-
end
2264-
2265-
defp take_from_buffer(queue, events, counter, buffer, wheel) do
2266-
{{:value, value}, queue} = :queue.out(queue)
2267-
2268-
case pop_and_increment_wheel(wheel) do
2269-
{:ok, msg, wheel} ->
2270-
{queue, :lists.reverse([value | events]), counter - 1, buffer - 1, {:ok, msg}, wheel}
2271-
2272-
{:error, wheel} ->
2273-
take_from_buffer(queue, [value | events], counter - 1, buffer - 1, wheel)
2274-
end
2275-
end
2276-
22772234
defp buffer_events([], stage) do
22782235
stage
22792236
end
22802237

2281-
defp buffer_events(
2282-
events,
2283-
%{buffer: {queue, counter, infos}, buffer_config: {max, keep}} = stage
2284-
) do
2285-
{{excess, queue, counter}, pending, infos} =
2286-
queue_events(keep, events, queue, counter, max, infos)
2238+
defp buffer_events(events, %{buffer: buffer, buffer_keep: keep} = stage) do
2239+
{buffer, excess, perms} = Buffer.store_temporary(buffer, events, keep)
22872240

22882241
case excess do
22892242
0 ->
@@ -2294,57 +2247,11 @@ defmodule GenStage do
22942247
:error_logger.warning_msg(error_msg, [Utils.self_name(), excess])
22952248
end
22962249

2297-
stage = %{stage | buffer: {queue, counter, infos}}
2298-
:lists.foldl(&dispatch_info/2, stage, pending)
2299-
end
2300-
2301-
defp queue_events(_keep, events, _queue, 0, :infinity, infos),
2302-
do: {{0, :queue.from_list(events), length(events)}, [], infos}
2303-
2304-
defp queue_events(_keep, events, queue, counter, :infinity, infos),
2305-
do: {queue_infinity(events, queue, counter), [], infos}
2306-
2307-
defp queue_events(:first, events, queue, counter, max, infos),
2308-
do: {queue_first(events, queue, counter, max), [], infos}
2309-
2310-
defp queue_events(:last, events, queue, counter, max, infos),
2311-
do: queue_last(events, queue, 0, counter, max, [], infos)
2312-
2313-
defp queue_infinity([], queue, counter), do: {0, queue, counter}
2314-
2315-
defp queue_infinity([event | events], queue, counter),
2316-
do: queue_infinity(events, :queue.in(event, queue), counter + 1)
2317-
2318-
defp queue_first([], queue, counter, _max), do: {0, queue, counter}
2319-
defp queue_first(events, queue, max, max), do: {length(events), queue, max}
2320-
2321-
defp queue_first([event | events], queue, counter, max),
2322-
do: queue_first(events, :queue.in(event, queue), counter + 1, max)
2323-
2324-
defp queue_last([], queue, excess, counter, _max, pending, wheel),
2325-
do: {{excess, queue, counter}, :lists.reverse(pending), wheel}
2326-
2327-
defp queue_last([event | events], queue, excess, max, max, pending, wheel) do
2328-
queue = :queue.in(event, :queue.drop(queue))
2329-
2330-
case pop_and_increment_wheel(wheel) do
2331-
{:ok, info, wheel} ->
2332-
queue_last(events, queue, excess + 1, max, max, [info | pending], wheel)
2333-
2334-
{:error, wheel} ->
2335-
queue_last(events, queue, excess + 1, max, max, pending, wheel)
2336-
end
2250+
:lists.foldl(&dispatch_info/2, %{stage | buffer: buffer}, perms)
23372251
end
23382252

2339-
defp queue_last([event | events], queue, excess, counter, max, pending, wheel),
2340-
do: queue_last(events, :queue.in(event, queue), excess, counter + 1, max, pending, wheel)
2341-
23422253
## Info helpers
23432254

2344-
# We use a wheel unless the queue is infinity.
2345-
defp init_wheel(:infinity), do: make_ref()
2346-
defp init_wheel(_), do: nil
2347-
23482255
defp producer_info(msg, %{type: :consumer} = stage) do
23492256
send(self(), msg)
23502257
{:reply, :ok, stage}
@@ -2365,43 +2272,10 @@ defmodule GenStage do
23652272
{:reply, :ok, buffer_or_dispatch_info(msg, stage)}
23662273
end
23672274

2368-
defp buffer_or_dispatch_info(msg, stage) do
2369-
%{buffer: {queue, count, infos}, buffer_config: {max, _keep}} = stage
2370-
2371-
case count do
2372-
0 ->
2373-
dispatch_info(msg, stage)
2374-
2375-
_ when is_reference(infos) ->
2376-
queue = :queue.in({infos, msg}, queue)
2377-
%{stage | buffer: {queue, count + 1, infos}}
2378-
2379-
_ ->
2380-
wheel = put_wheel(infos, count, max, msg)
2381-
%{stage | buffer: {queue, count, wheel}}
2382-
end
2383-
end
2384-
2385-
defp put_wheel(nil, count, max, contents), do: {0, max, %{(count - 1) => contents}}
2386-
2387-
defp put_wheel({pos, _, wheel}, count, max, contents),
2388-
do: {pos, max, Map.put(wheel, rem(pos + count - 1, max), contents)}
2389-
2390-
defp pop_and_increment_wheel(nil), do: {:error, nil}
2391-
2392-
defp pop_and_increment_wheel({pos, limit, wheel}) do
2393-
new_pos = rem(pos + 1, limit)
2394-
2395-
# TODO: Use :maps.take/2
2396-
case wheel do
2397-
%{^pos => info} when map_size(wheel) == 1 ->
2398-
{:ok, info, nil}
2399-
2400-
%{^pos => info} ->
2401-
{:ok, info, {new_pos, limit, Map.delete(wheel, pos)}}
2402-
2403-
%{} ->
2404-
{:error, {new_pos, limit, wheel}}
2275+
defp buffer_or_dispatch_info(msg, %{buffer: buffer} = stage) do
2276+
case Buffer.store_permanent_unless_empty(buffer, msg) do
2277+
:empty -> dispatch_info(msg, stage)
2278+
{:ok, buffer} -> %{stage | buffer: buffer}
24052279
end
24062280
end
24072281

0 commit comments

Comments
 (0)