Skip to content

Commit 46c23b1

Browse files
author
José Valim
committed
Merge pull request #2423 from fishcakez/genevent_stream_stop
Improve GenEvent.stream/2 cleanup
2 parents 78adb5b + 4d607ac commit 46c23b1

File tree

1 file changed

+49
-28
lines changed

1 file changed

+49
-28
lines changed

lib/elixir/lib/gen_event.ex

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -519,18 +519,22 @@ defimpl Enumerable, for: GenEvent do
519519
receive do
520520
# The subscription process gave us a go.
521521
{:UP, ^mon_ref, manager_pid} ->
522-
{mon_ref, manager_pid}
522+
{mon_ref, mon_pid, manager_pid}
523523
# The subscription process died due to an abnormal reason.
524524
{:DOWN, ^mon_ref, _, _, reason} ->
525525
exit({reason, {__MODULE__, :start, [stream]}})
526526
end
527527
end
528528

529-
defp next(%{timeout: timeout} = stream, {mon_ref, manager_pid} = acc) do
529+
defp next(%{timeout: timeout} = stream, {mon_ref, mon_pid, manager_pid} = acc) do
530+
# If :DOWN is received must resend it to self so that stop/2 can receive it
531+
# and know that the handler has been removed.
530532
receive do
531533
{:DOWN, ^mon_ref, _, _, :normal} ->
534+
send(self(), {:DOWN, mon_ref, :process, mon_pid, :normal})
532535
nil
533536
{:DOWN, ^mon_ref, _, _, reason} ->
537+
send(self(), {:DOWN, mon_ref, :process, mon_pid, :normal})
534538
exit({reason, {__MODULE__, :next, [stream, acc]}})
535539
{^mon_ref, sync_ref, event} ->
536540
{{sync_ref, manager_pid, event}, acc}
@@ -540,9 +544,15 @@ defimpl Enumerable, for: GenEvent do
540544
end
541545
end
542546

543-
defp stop(%{id: id}, {mon_ref, manager_pid}) do
544-
remove_handler(mon_ref, manager_pid, id)
545-
flush_events(mon_ref)
547+
defp stop(%{mode: mode} = stream, {mon_ref, mon_pid, manager_pid} = acc) do
548+
case remove_handler(mon_ref, mon_pid, manager_pid) do
549+
:ok when mode == :async ->
550+
flush_events(mon_ref)
551+
:ok ->
552+
:ok
553+
{:error, reason} ->
554+
exit({reason, {__MODULE__, :stop, [stream, acc]}})
555+
end
546556
end
547557

548558
defp add_handler(mode, manager, id, duration) do
@@ -574,6 +584,10 @@ defimpl Enumerable, for: GenEvent do
574584
{:UP, ^mon_ref, manager_pid} ->
575585
send(parent, {:UP, mon_ref, manager_pid})
576586
receive do
587+
# The stream has finished, remove the handler.
588+
{:DONE, ^mon_ref} ->
589+
exit_handler(manager_pid, parent_ref, cancel)
590+
577591
# If the parent died, we can exit normally.
578592
{:DOWN, ^parent_ref, _, _, _} ->
579593
exit(:normal)
@@ -599,36 +613,43 @@ defimpl Enumerable, for: GenEvent do
599613
defp cancel_ref(nil, mon_ref), do: mon_ref
600614
defp cancel_ref(id, mon_ref), do: {id, mon_ref}
601615

602-
defp remove_handler(mon_ref, manager_pid, id) do
603-
Process.demonitor(mon_ref, [:flush])
604-
handler = {__MODULE__, cancel_ref(id, mon_ref)}
605-
606-
{_pid, ref} = spawn_monitor fn ->
607-
try do
608-
# handler may nolonger be there, if it is the removal will cause the monitor
609-
# process to exit. If this returns successfuly then no more events will be
610-
# forwarded.
611-
_ = :gen_event.delete_handler(manager_pid, handler, :remove_handler)
612-
catch
613-
# Do not want to overide the exit reason of the mon_pid so catch errors.
614-
# However if the exit is due to a disconnection, exit because messages could
615-
# leak if the nodes are reconnected before the manager on the other node
616-
# removes the handler. In this case it is very likely that the mon_pid
617-
# exited with the same reason.
618-
:exit, reason when reason != {:nodedown, node(manager_pid)} ->
619-
:ok
620-
end
616+
defp exit_handler(manager_pid, parent_ref, cancel) do
617+
# Send exit signal so manager removes handler.
618+
Process.exit(manager_pid, :shutdown)
619+
receive do
620+
# If the parent died, we can exit normally.
621+
{:DOWN, ^parent_ref, _, _, _} ->
622+
exit(:normal)
623+
624+
# Probably the reason is :shutdown, which occurs when the manager receives
625+
# an exit signal from a handler supervising process. However whatever the
626+
# reason the handler has been removed so it is ok.
627+
{:gen_event_EXIT, {__MODULE__, ^cancel}, _} ->
628+
exit(:normal)
629+
630+
# The connection broke, perhaps the handler might try to forward events
631+
# before it removes the handler, so must exit abnormally.
632+
{:EXIT, ^manager_pid, :noconnection} ->
633+
exit({:nodedown, node(manager_pid)})
634+
635+
# The manager has exited but don't exit abnormally as the handler has died
636+
# with the manager and all expected events have been handled. This is ok.
637+
{:EXIT, ^manager_pid, _} ->
638+
exit(:normal)
621639
end
640+
end
622641

642+
defp remove_handler(mon_ref, mon_pid, manager_pid) do
643+
send(mon_pid, {:DONE, mon_ref})
623644
receive do
624645
{^mon_ref, sync, _} when sync != nil ->
625646
send(manager_pid, {sync, :done})
626-
Process.demonitor(ref, [:flush])
647+
Process.demonitor(mon_ref, [:flush])
627648
:ok
628-
{:DOWN, ^ref, _, _, :normal} ->
649+
{:DOWN, ^mon_ref, _, _, :normal} ->
629650
:ok
630-
{:DOWN, ^ref, _, _, other} ->
631-
exit(other)
651+
{:DOWN, ^mon_ref, _, _, reason} ->
652+
{:error, reason}
632653
end
633654
end
634655

0 commit comments

Comments
 (0)