Skip to content

Commit 78adb5b

Browse files
author
José Valim
committed
Ensure handler does not get stuck and properly clean up mailbox
1 parent 311efb6 commit 78adb5b

File tree

2 files changed

+67
-37
lines changed

2 files changed

+67
-37
lines changed

lib/elixir/lib/gen_event.ex

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -468,10 +468,15 @@ defimpl Enumerable, for: GenEvent do
468468
sync = Process.monitor(mon_pid)
469469
send pid, {ref, sync, event}
470470
receive do
471-
{^sync, :done} -> Process.demonitor(sync, [:flush])
472-
{:DOWN, ^sync, _, _, _} -> :ok
471+
{^sync, :done} ->
472+
Process.demonitor(sync, [:flush])
473+
:remove_handler
474+
{^sync, :next} ->
475+
Process.demonitor(sync, [:flush])
476+
{:ok, state}
477+
{:DOWN, ^sync, _, _, _} ->
478+
{:ok, state}
473479
end
474-
{:ok, state}
475480
end
476481

477482
def handle_event(event, {:async, _mon_pid, pid, ref} = state) do
@@ -499,9 +504,11 @@ defimpl Enumerable, for: GenEvent do
499504
{nil, _manager, event}, acc ->
500505
fun.(event, acc)
501506
{ref, manager, event}, acc ->
502-
acc = fun.(event, acc)
503-
send manager, {ref, :done}
504-
acc
507+
try do
508+
fun.(event, acc)
509+
after
510+
send manager, {ref, :next}
511+
end
505512
end
506513
end
507514

@@ -521,12 +528,12 @@ defimpl Enumerable, for: GenEvent do
521528

522529
defp next(%{timeout: timeout} = stream, {mon_ref, manager_pid} = acc) do
523530
receive do
524-
{^mon_ref, sync_ref, event} ->
525-
{{sync_ref, manager_pid, event}, acc}
526531
{:DOWN, ^mon_ref, _, _, :normal} ->
527532
nil
528533
{:DOWN, ^mon_ref, _, _, reason} ->
529534
exit({reason, {__MODULE__, :next, [stream, acc]}})
535+
{^mon_ref, sync_ref, event} ->
536+
{{sync_ref, manager_pid, event}, acc}
530537
after
531538
timeout ->
532539
exit({:timeout, {__MODULE__, :next, [stream, acc]}})
@@ -595,18 +602,34 @@ defimpl Enumerable, for: GenEvent do
595602
defp remove_handler(mon_ref, manager_pid, id) do
596603
Process.demonitor(mon_ref, [:flush])
597604
handler = {__MODULE__, cancel_ref(id, mon_ref)}
598-
# handler may nolonger be there, if it is the removal will cause the monitor
599-
# process to exit. If this returns successfuly then no more events will be
600-
# forwarded.
601-
_ = :gen_event.delete_handler(manager_pid, handler, :remove_handler)
602-
catch
603-
# Do not want to overide the exit reason of the mon_pid so catch errors.
604-
# However if the exit is due to a disconnection, exit because messages could
605-
# leak if the nodes are reconnected before the manager on the other node
606-
# removes the handler. In this case it is very likely that the mon_pid
607-
# exited with the same reason.
608-
:exit, reason when reason !== {:nodedown, node(manager_pid)} ->
609-
:ok
605+
606+
{_pid, ref} = spawn_monitor fn ->
607+
try do
608+
# handler may nolonger be there, if it is the removal will cause the monitor
609+
# process to exit. If this returns successfuly then no more events will be
610+
# forwarded.
611+
_ = :gen_event.delete_handler(manager_pid, handler, :remove_handler)
612+
catch
613+
# Do not want to overide the exit reason of the mon_pid so catch errors.
614+
# However if the exit is due to a disconnection, exit because messages could
615+
# leak if the nodes are reconnected before the manager on the other node
616+
# removes the handler. In this case it is very likely that the mon_pid
617+
# exited with the same reason.
618+
:exit, reason when reason != {:nodedown, node(manager_pid)} ->
619+
:ok
620+
end
621+
end
622+
623+
receive do
624+
{^mon_ref, sync, _} when sync != nil ->
625+
send(manager_pid, {sync, :done})
626+
Process.demonitor(ref, [:flush])
627+
:ok
628+
{:DOWN, ^ref, _, _, :normal} ->
629+
:ok
630+
{:DOWN, ^ref, _, _, other} ->
631+
exit(other)
632+
end
610633
end
611634

612635
defp flush_events(mon_ref) do

lib/elixir/test/elixir/gen_event_test.exs

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ defmodule GenEventTest do
2222
defmodule SlowHandler do
2323
use GenEvent
2424

25-
def handle_event(_event, state) do
26-
:timer.sleep(300)
27-
{:ok, state}
25+
def handle_event(_event, _state) do
26+
:timer.sleep(100)
27+
:remove_handler
2828
end
2929
end
3030

@@ -116,12 +116,11 @@ defmodule GenEventTest do
116116

117117
wait_for_handlers(pid, 1)
118118

119-
for i <- 1..10 do
119+
for i <- 1..6 do
120120
GenEvent.notify(pid, i)
121121
end
122122

123-
assert Process.info(pid, :message_queue_len) ==
124-
{:message_queue_len, 10}
123+
wait_for_queue_length(pid, 5)
125124
end
126125

127126
test "async stream/2" do
@@ -349,27 +348,28 @@ defmodule GenEventTest do
349348
wait_for_handlers(pid, 0)
350349
end
351350

352-
test "#{mode} stream/2 with slow handler" do
351+
test "#{mode} stream/2 flushes events on abort" do
353352
# Start a manager and subscribers
354353
{:ok, pid} = GenEvent.start_link()
355-
stream = GenEvent.stream(pid, duration: 200, mode: unquote(mode))
356354

357355
spawn_link fn ->
358-
# Wait for stream to start
359-
wait_for_handlers(pid, 1)
356+
wait_for_handlers(pid, 2)
360357
GenEvent.notify(pid, 1)
361-
362-
# Add slow handler so that the second or
363-
# third event arrives after duration of 200.
364-
GenEvent.add_handler(pid, SlowHandler, [], link: true)
365358
GenEvent.notify(pid, 2)
366359
GenEvent.notify(pid, 3)
367360
end
368361

369-
# Evaluate stream.
370-
_ = Enum.to_list(stream)
362+
GenEvent.add_handler(pid, SlowHandler, [])
363+
stream = GenEvent.stream(pid, mode: unquote(mode))
364+
365+
try do
366+
Enum.each stream, fn _ -> throw :done end
367+
catch
368+
:done -> :ok
369+
end
371370

372-
# Wait for the slow handler to be removed so all events have been handled.
371+
# Wait for the slow handler to be removed
372+
# so all events have been handled
373373
wait_for_handlers(pid, 0)
374374

375375
# Check no messages leaked.
@@ -382,4 +382,11 @@ defmodule GenEventTest do
382382
wait_for_handlers(pid, count)
383383
end
384384
end
385+
386+
defp wait_for_queue_length(pid, count) do
387+
{:message_queue_len, n} = Process.info(pid, :message_queue_len)
388+
unless n == count do
389+
wait_for_queue_length(pid, count)
390+
end
391+
end
385392
end

0 commit comments

Comments
 (0)