Skip to content

Commit 1ec5d26

Browse files
authored
Add callback to allow instrumenting discarded count and function to get estimated buffer size (#257)
1 parent e2c45e6 commit 1ec5d26

File tree

2 files changed

+187
-12
lines changed

2 files changed

+187
-12
lines changed

lib/gen_stage.ex

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,8 @@ defmodule GenStage do
397397
we will serve the existing demand, otherwise the event will be queued in
398398
`GenStage`'s internal buffer. In case events are being queued and not being
399399
consumed, a log message will be emitted when we exceed the `:buffer_size`
400-
configuration.
400+
configuration. This behavior can be customized by implementing the optional
401+
`c:format_discarded/2` callback.
401402
402403
While the implementation above is enough to solve the constraints above,
403404
a more robust implementation would have tighter control over the events
@@ -974,6 +975,15 @@ defmodule GenStage do
974975
| {:stop, reason, new_state}
975976
when new_state: term, reason: term
976977

978+
@doc """
979+
Invoked when items are discarded from the buffer.
980+
981+
It receives the number of excess (discarded) items from this invocation.
982+
This callback returns a boolean that controls whether the default error log for discarded items is printed or not.
983+
Return true to print the log, return false to skip the log.
984+
"""
985+
@callback format_discarded(discarded :: non_neg_integer, state :: term) :: boolean
986+
977987
@doc """
978988
Invoked when a consumer is no longer subscribed to a producer.
979989
@@ -1122,6 +1132,7 @@ defmodule GenStage do
11221132
handle_cancel: 3,
11231133
handle_demand: 2,
11241134
handle_events: 3,
1135+
format_discarded: 2,
11251136

11261137
# GenServer
11271138
code_change: 3,
@@ -1693,6 +1704,14 @@ defmodule GenStage do
16931704
"GenStage.stream/1 expects a list of subscriptions, got: #{inspect(subscriptions)}"
16941705
end
16951706

1707+
@doc """
1708+
Returns the estimated number of buffered items for a producer.
1709+
"""
1710+
@spec estimate_buffered_count(stage, timeout) :: non_neg_integer
1711+
def estimate_buffered_count(stage, timeout \\ 5000) do
1712+
call(stage, :"$estimate_buffered_count", timeout)
1713+
end
1714+
16961715
## Callbacks
16971716

16981717
@compile :inline_list_funcs
@@ -1822,15 +1841,19 @@ defmodule GenStage do
18221841
consumer_subscribe(current, to, opts, stage)
18231842
end
18241843

1844+
def handle_call(:"$estimate_buffered_count", _from, stage) do
1845+
producer_estimate_buffered_count(stage)
1846+
end
1847+
18251848
def handle_call(msg, from, %{mod: mod, state: state} = stage) do
18261849
case mod.handle_call(msg, from, state) do
18271850
{:reply, reply, events, state} when is_list(events) ->
1828-
stage = dispatch_events(events, length(events), stage)
1829-
{:reply, reply, %{stage | state: state}}
1851+
stage = dispatch_events(events, length(events), %{stage | state: state})
1852+
{:reply, reply, stage}
18301853

18311854
{:reply, reply, events, state, :hibernate} when is_list(events) ->
1832-
stage = dispatch_events(events, length(events), stage)
1833-
{:reply, reply, %{stage | state: state}, :hibernate}
1855+
stage = dispatch_events(events, length(events), %{stage | state: state})
1856+
{:reply, reply, stage, :hibernate}
18341857

18351858
{:stop, reason, reply, state} ->
18361859
{:stop, reason, reply, %{stage | state: state}}
@@ -2106,12 +2129,12 @@ defmodule GenStage do
21062129
defp handle_noreply_callback(return, stage) do
21072130
case return do
21082131
{:noreply, events, state} when is_list(events) ->
2109-
stage = dispatch_events(events, length(events), stage)
2110-
{:noreply, %{stage | state: state}}
2132+
stage = dispatch_events(events, length(events), %{stage | state: state})
2133+
{:noreply, stage}
21112134

21122135
{:noreply, events, state, :hibernate} when is_list(events) ->
2113-
stage = dispatch_events(events, length(events), stage)
2114-
{:noreply, %{stage | state: state}, :hibernate}
2136+
stage = dispatch_events(events, length(events), %{stage | state: state})
2137+
{:noreply, stage, :hibernate}
21152138

21162139
{:stop, reason, state} ->
21172140
{:stop, reason, %{stage | state: state}}
@@ -2210,6 +2233,14 @@ defmodule GenStage do
22102233
{:noreply, stage}
22112234
end
22122235

2236+
defp maybe_format_discarded(mod, excess, state) do
2237+
if function_exported?(mod, :format_discarded, 2) do
2238+
mod.format_discarded(excess, state)
2239+
else
2240+
true
2241+
end
2242+
end
2243+
22132244
defp producer_cancel(ref, kind, reason, stage) do
22142245
%{consumers: consumers, monitors: monitors, state: state} = stage
22152246

@@ -2313,21 +2344,41 @@ defmodule GenStage do
23132344
stage
23142345
end
23152346

2316-
defp buffer_events(events, %{buffer: buffer, buffer_keep: keep} = stage) do
2347+
defp buffer_events(
2348+
events,
2349+
%{
2350+
mod: mod,
2351+
buffer: buffer,
2352+
buffer_keep: keep,
2353+
state: state
2354+
} = stage
2355+
) do
23172356
{buffer, excess, perms} = Buffer.store_temporary(buffer, events, keep)
23182357

23192358
case excess do
23202359
0 ->
23212360
:ok
23222361

23232362
excess ->
2324-
error_msg = 'GenStage producer ~tp has discarded ~tp events from buffer'
2325-
:error_logger.warning_msg(error_msg, [Utils.self_name(), excess])
2363+
if maybe_format_discarded(mod, excess, state) do
2364+
error_msg = 'GenStage producer ~tp has discarded ~tp events from buffer'
2365+
:error_logger.warning_msg(error_msg, [Utils.self_name(), excess])
2366+
end
23262367
end
23272368

23282369
:lists.foldl(&dispatch_info/2, %{stage | buffer: buffer}, perms)
23292370
end
23302371

2372+
defp producer_estimate_buffered_count(%{type: :consumer} = stage) do
2373+
error_msg = 'Buffered count can only be requested for producers, GenStage ~tp is a consumer'
2374+
:error_logger.error_msg(error_msg, [Utils.self_name()])
2375+
{:reply, 0, stage}
2376+
end
2377+
2378+
defp producer_estimate_buffered_count(%{buffer: buffer} = stage) do
2379+
{:reply, Buffer.estimate_size(buffer), stage}
2380+
end
2381+
23312382
## Info helpers
23322383

23332384
defp producer_info(msg, %{type: :consumer} = stage) do

test/gen_stage_test.exs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,43 @@ defmodule GenStageTest do
263263
end
264264
end
265265

266+
defmodule DiscardedBufferLogger do
267+
@moduledoc """
268+
Logs about any discarded items
269+
"""
270+
271+
use GenStage
272+
273+
def start_link(init, opts \\ []) do
274+
GenStage.start_link(__MODULE__, init, opts)
275+
end
276+
277+
def init(init) do
278+
init
279+
end
280+
281+
def sync_queue(stage, events) do
282+
GenStage.call(stage, {:queue, events})
283+
end
284+
285+
def handle_call({:queue, events}, _from, state) do
286+
{:reply, :ok, events, state}
287+
end
288+
289+
def format_discarded(discarded, %{log_discarded: log_discarded}) do
290+
:error_logger.info_msg("DiscardedBufferLogger has discarded ~tp events from buffer", [
291+
discarded
292+
])
293+
294+
log_discarded
295+
end
296+
297+
def handle_demand(_demand, state) do
298+
# We don't care about the demand
299+
{:noreply, [], state}
300+
end
301+
end
302+
266303
test "generates child_spec/1" do
267304
assert Counter.child_spec([:hello]) == %{
268305
id: Counter,
@@ -743,6 +780,93 @@ defmodule GenStageTest do
743780
assert_receive {:consumed, [:a, :b, :c, :d]}
744781
assert_receive {:consumed, [:e, :f, :g, :h]}
745782
end
783+
784+
test "calls optional format_discarded callback with discarded count when it exceeds configured size" do
785+
{:ok, producer} =
786+
DiscardedBufferLogger.start_link(
787+
{:producer, %{log_discarded: false}, buffer_size: 5, buffer_keep: :first}
788+
)
789+
790+
log =
791+
capture_log(fn ->
792+
DiscardedBufferLogger.sync_queue(producer, [:a, :b, :c, :d, :e, :f, :g, :h])
793+
end)
794+
795+
assert log =~ "DiscardedBufferLogger has discarded 3 events from buffer"
796+
797+
log =
798+
capture_log(fn ->
799+
{:ok, consumer} = Forwarder.start_link({:consumer, self()})
800+
:ok = GenStage.async_subscribe(consumer, to: producer, max_demand: 4, min_demand: 0)
801+
assert_receive {:consumed, [:a, :b, :c, :d]}
802+
assert_receive {:consumed, [:e]}
803+
end)
804+
805+
assert log == ""
806+
end
807+
808+
test "format_discarded can allow printing the default log when items are discarded" do
809+
{:ok, producer} =
810+
DiscardedBufferLogger.start_link(
811+
{:producer, %{log_discarded: true}, buffer_size: 5, buffer_keep: :first}
812+
)
813+
814+
log =
815+
capture_log(fn ->
816+
DiscardedBufferLogger.sync_queue(producer, [:a, :b, :c, :d, :e, :f, :g, :h])
817+
end)
818+
819+
assert log =~ "GenStage producer #{inspect(producer)} has discarded 3 events from buffer"
820+
821+
log =
822+
capture_log(fn ->
823+
{:ok, consumer} = Forwarder.start_link({:consumer, self()})
824+
:ok = GenStage.async_subscribe(consumer, to: producer, max_demand: 5, min_demand: 0)
825+
assert_receive {:consumed, [:a, :b, :c, :d, :e]}
826+
end)
827+
828+
assert log == ""
829+
end
830+
831+
test "format_discarded can prevent printing the default log when items are discarded" do
832+
{:ok, producer} =
833+
DiscardedBufferLogger.start_link(
834+
{:producer, %{log_discarded: false}, buffer_size: 5, buffer_keep: :first}
835+
)
836+
837+
log =
838+
capture_log(fn ->
839+
DiscardedBufferLogger.sync_queue(producer, [:a, :b, :c, :d, :e, :f, :g, :h])
840+
end)
841+
842+
assert not (log =~
843+
"GenStage producer #{inspect(producer)} has discarded 3 events from buffer")
844+
845+
log =
846+
capture_log(fn ->
847+
{:ok, consumer} = Forwarder.start_link({:consumer, self()})
848+
:ok = GenStage.async_subscribe(consumer, to: producer, max_demand: 5, min_demand: 0)
849+
assert_receive {:consumed, [:a, :b, :c, :d, :e]}
850+
end)
851+
852+
assert log == ""
853+
end
854+
855+
test "returns the correct buffer count when polled" do
856+
{:ok, producer} = Counter.start_link({:producer, 0, buffer_size: :infinity})
857+
0 = Counter.sync_queue(producer, [:a, :b, :c, :d, :e])
858+
assert 5 == GenStage.estimate_buffered_count(producer)
859+
860+
0 = Counter.sync_queue(producer, [:f, :g, :h])
861+
assert 8 == GenStage.estimate_buffered_count(producer)
862+
863+
{:ok, consumer} = Forwarder.start_link({:consumer, self()})
864+
:ok = GenStage.async_subscribe(consumer, to: producer, max_demand: 4, min_demand: 0)
865+
assert_receive {:consumed, [:a, :b, :c, :d]}
866+
assert_receive {:consumed, [:e, :f, :g, :h]}
867+
868+
assert 0 == GenStage.estimate_buffered_count(producer)
869+
end
746870
end
747871

748872
describe "info" do

0 commit comments

Comments
 (0)