Skip to content

Commit ae0a6c6

Browse files
mgwidmannJosé Valim
authored andcommitted
Missing handle info (#245)
Log any extra messages received and ignore them. Fixes #238.
1 parent ee272d3 commit ae0a6c6

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

lib/gen_stage.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,9 +1605,10 @@ defmodule GenStage do
16051605
"""
16061606
@spec from_enumerable(Enumerable.t(), keyword) :: GenServer.on_start()
16071607
def from_enumerable(stream, opts \\ []) do
1608+
{:current_stacktrace, [_info_call | stack]} = Process.info(self(), :current_stacktrace)
16081609
case Keyword.pop(opts, :link, true) do
1609-
{true, opts} -> start_link(GenStage.Streamer, {stream, opts}, opts)
1610-
{false, opts} -> start(GenStage.Streamer, {stream, opts}, opts)
1610+
{true, opts} -> start_link(GenStage.Streamer, {{stream, opts}, stack}, opts)
1611+
{false, opts} -> start(GenStage.Streamer, {{stream, opts}, stack}, opts)
16111612
end
16121613
end
16131614

lib/gen_stage/streamer.ex

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,42 @@ defmodule GenStage.Streamer do
33
use GenStage
44

55
def start_link({_, opts} = pair) do
6-
GenStage.start_link(__MODULE__, pair, opts)
6+
{:current_stacktrace, [_info_call | stack]} = Process.info(self(), :current_stacktrace)
7+
GenStage.start_link(__MODULE__, {pair, stack}, opts)
78
end
89

9-
def init({stream, opts}) do
10+
def init({{stream, opts}, stack}) do
1011
continuation =
1112
&Enumerable.reduce(stream, &1, fn
1213
x, {acc, 1} -> {:suspend, {[x | acc], 0}}
1314
x, {acc, counter} -> {:cont, {[x | acc], counter - 1}}
1415
end)
1516

16-
{:producer, continuation, Keyword.take(opts, [:dispatcher, :demand])}
17+
{:producer, {stack, continuation}, Keyword.take(opts, [:dispatcher, :demand])}
1718
end
1819

19-
def handle_demand(_demand, continuation) when is_atom(continuation) do
20-
{:noreply, [], continuation}
20+
def handle_demand(_demand, {stack, continuation}) when is_atom(continuation) do
21+
{:noreply, [], {stack, continuation}}
2122
end
2223

23-
def handle_demand(demand, continuation) when demand > 0 do
24+
def handle_demand(demand, {stack, continuation}) when demand > 0 do
2425
case continuation.({:cont, {[], demand}}) do
2526
{:suspended, {list, 0}, continuation} ->
26-
{:noreply, :lists.reverse(list), continuation}
27+
{:noreply, :lists.reverse(list), {stack, continuation}}
2728

2829
{status, {list, _}} ->
2930
GenStage.async_info(self(), :stop)
30-
{:noreply, :lists.reverse(list), status}
31+
{:noreply, :lists.reverse(list), {stack, status}}
3132
end
3233
end
3334

3435
def handle_info(:stop, state) do
3536
{:stop, :normal, state}
3637
end
38+
39+
def handle_info(msg, {stack, continuation}) do
40+
log = '** Undefined handle_info in ~tp~n** Unhandled message: ~tp~n** Stream started at:~n~ts'
41+
:error_logger.warning_msg(log, [inspect(__MODULE__), msg, Exception.format_stacktrace(stack)])
42+
{:noreply, [], {stack, continuation}}
43+
end
3744
end

0 commit comments

Comments
 (0)