Skip to content

Commit 081e3b3

Browse files
author
José Valim
committed
Allow only one process handler per process
1 parent d521d79 commit 081e3b3

File tree

2 files changed

+49
-61
lines changed

2 files changed

+49
-61
lines changed

lib/elixir/lib/gen_event.ex

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -732,57 +732,33 @@ defmodule GenEvent do
732732

733733
defp server_add_handler({module, id}, args, handlers) do
734734
handler = handler(module: module, id: {module, id})
735-
server_add_handler(module, handler, args, handlers)
735+
do_add_handler(module, handler, args, handlers, :ok)
736736
end
737737

738738
defp server_add_handler(module, args, handlers) do
739739
handler = handler(module: module, id: module)
740-
server_add_handler(module, handler, args, handlers)
741-
end
742-
743-
defp server_add_handler(module, handler, arg, handlers) do
744-
case :lists.keyfind(handler(handler, :id), handler(:id) + 1, handlers) do
745-
false ->
746-
case do_handler(module, :init, [arg]) do
747-
{:ok, res} ->
748-
case res do
749-
{:ok, state} ->
750-
{false, :ok, [handler(handler, state: state)|handlers]}
751-
{:ok, state, :hibernate} ->
752-
{true, :ok, [handler(handler, state: state)|handlers]}
753-
{:error, _} = error ->
754-
{false, error, handlers}
755-
other ->
756-
{false, {:error, {:bad_return_value, other}}, handlers}
757-
end
758-
{:error, _} = error ->
759-
{false, error, handlers}
760-
end
761-
_ ->
762-
{false, {:error, :already_added}, handlers}
763-
end
740+
do_add_handler(module, handler, args, handlers, :ok)
764741
end
765742

766743
defp server_add_mon_handler({module, id}, args, handlers, pid) do
767744
ref = Process.monitor(pid)
768745
handler = handler(module: module, id: {module, id}, pid: pid, ref: ref)
769-
server_add_handler(module, handler, args, handlers)
746+
do_add_handler(module, handler, args, handlers, :ok)
770747
end
771748

772749
defp server_add_mon_handler(module, args, handlers, pid) do
773750
ref = Process.monitor(pid)
774751
handler = handler(module: module, id: module, pid: pid, ref: ref)
775-
server_add_handler(module, handler, args, handlers)
752+
do_add_handler(module, handler, args, handlers, :ok)
776753
end
777754

778755
defp server_add_process_handler(pid, notify, handlers) do
779756
ref = Process.monitor(pid)
780-
{:ok, state} = GenEvent.Stream.init({pid, ref})
781757
# Notice the pid is set only when notifications
782758
# are explicitly required.
783-
handler = handler(module: GenEvent.Stream, id: {GenEvent.Stream, ref},
784-
pid: if(notify, do: pid), ref: ref, state: state)
785-
{false, ref, [handler|handlers]}
759+
handler = handler(module: GenEvent.Stream, id: {GenEvent.Stream, pid},
760+
pid: if(notify, do: pid), ref: ref)
761+
do_add_handler(GenEvent.Stream, handler, {pid, ref}, handlers, {self(), ref})
786762
end
787763

788764
defp server_remove_handler(module, args, handlers, name) do
@@ -813,7 +789,7 @@ defmodule GenEvent do
813789

814790
defp server_split_process_handlers(mode, event, [handler|t], handlers, streams) do
815791
case handler(handler, :id) do
816-
{GenEvent.Stream, _ref} ->
792+
{GenEvent.Stream, _pid} ->
817793
server_process_notify(mode, event, handler)
818794
server_split_process_handlers(mode, event, t, handlers, [handler|streams])
819795
_ ->
@@ -965,6 +941,29 @@ defmodule GenEvent do
965941
end
966942
end
967943

944+
defp do_add_handler(module, handler, arg, handlers, succ) do
945+
case :lists.keyfind(handler(handler, :id), handler(:id) + 1, handlers) do
946+
false ->
947+
case do_handler(module, :init, [arg]) do
948+
{:ok, res} ->
949+
case res do
950+
{:ok, state} ->
951+
{false, succ, [handler(handler, state: state)|handlers]}
952+
{:ok, state, :hibernate} ->
953+
{true, succ, [handler(handler, state: state)|handlers]}
954+
{:error, _} = error ->
955+
{false, error, handlers}
956+
other ->
957+
{false, {:error, {:bad_return_value, other}}, handlers}
958+
end
959+
{:error, _} = error ->
960+
{false, error, handlers}
961+
end
962+
_ ->
963+
{false, {:error, :already_added}, handlers}
964+
end
965+
end
966+
968967
defp do_swap(handler, args1, module2, args2, name, handlers) do
969968
pid = handler(handler, :pid)
970969
state = do_terminate(handler,

lib/elixir/lib/gen_event/stream.ex

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -79,44 +79,30 @@ defimpl Enumerable, for: GenEvent.Stream do
7979
end
8080

8181
defp start(%{manager: manager} = stream) do
82-
{pid, ref, mon_ref} =
83-
try do
84-
pid = whereis(manager)
85-
ref = Process.monitor(pid)
86-
{:ok, msg_ref} = :gen.call(pid, self(), {:add_process_handler, self(), true}, :infinity)
87-
{pid, msg_ref, ref}
88-
catch
89-
:exit, reason -> exit({reason, {__MODULE__, :start, [stream]}})
90-
end
91-
92-
{pid, ref, mon_ref}
93-
end
94-
95-
defp whereis(pid) when is_pid(pid), do: pid
96-
defp whereis(atom) when is_atom(atom), do: Process.whereis(atom) || exit(:noproc)
97-
defp whereis({:global, name}), do: :global.whereis_name(name)
98-
defp whereis({:via, module, name}), do: module.whereis_name(name)
99-
defp whereis({atom, node}) do
100-
case :rpc.call(node, :erlang, :whereis, [atom]) do
101-
:undefined -> exit(:noproc)
102-
{:badrpc, :nodedown} -> exit({:nodedown, node})
103-
pid when is_pid(pid) -> pid
82+
try do
83+
{:ok, {pid, ref}} = :gen.call(manager, self(), {:add_process_handler, self(), true}, :infinity)
84+
mon_ref = Process.monitor(pid)
85+
{pid, ref, mon_ref}
86+
catch
87+
:exit, reason -> exit({reason, {__MODULE__, :start, [stream]}})
10488
end
10589
end
10690

10791
defp next(%{timeout: timeout} = stream, {pid, ref, mon_ref} = acc) do
92+
self = self()
93+
10894
receive do
10995
# The handler was removed. Stop iteration, resolve the
11096
# event later. We need to demonitor now, otherwise DOWN
11197
# appears with higher priority in the shutdown process.
112-
{:gen_event_EXIT, {GenEvent.Stream, ^ref}, _reason} = event ->
98+
{:gen_event_EXIT, {GenEvent.Stream, ^self}, _reason} = event ->
11399
Process.demonitor(mon_ref, [:flush])
114-
send(self(), event)
100+
send(self, event)
115101
{:halt, {:removed, acc}}
116102

117103
# The manager died. Stop iteration, resolve the event later.
118104
{:DOWN, ^mon_ref, _, _, _} = event ->
119-
send(self(), event)
105+
send(self, event)
120106
{:halt, {:removed, acc}}
121107

122108
# Got an async event.
@@ -149,23 +135,26 @@ defimpl Enumerable, for: GenEvent.Stream do
149135

150136
# If we reach this branch, the handler was not removed yet,
151137
# so we trigger a request for doing so.
152-
defp stop(stream, {pid, ref, _} = acc) do
153-
_ = Task.start(fn -> GenEvent.remove_handler(pid, {GenEvent.Stream, ref}, :shutdown) end)
138+
defp stop(stream, {pid, _, _} = acc) do
139+
parent = self()
140+
_ = Task.start(fn -> GenEvent.remove_handler(pid, {GenEvent.Stream, parent}, :shutdown) end)
154141
stop(stream, {:removed, acc})
155142
end
156143

157144
defp wait_for_handler_removal(pid, ref, mon_ref) do
145+
self = self()
146+
158147
receive do
159148
{_from, {^pid, ^ref}, {notify, _event}} when notify in [:ack_notify, :sync_notify] ->
160149
send pid, {ref, :done}
161150
wait_for_handler_removal(pid, ref, mon_ref)
162-
{:gen_event_EXIT, {GenEvent.Stream, ^ref}, reason}
151+
{:gen_event_EXIT, {GenEvent.Stream, ^self}, reason}
163152
when reason == :normal
164153
when reason == :shutdown
165154
when tuple_size(reason) == 3 and elem(reason, 0) == :swapped ->
166155
Process.demonitor(mon_ref, [:flush])
167156
:ok
168-
{:gen_event_EXIT, {GenEvent.Stream, ^ref}, reason} ->
157+
{:gen_event_EXIT, {GenEvent.Stream, ^self}, reason} ->
169158
Process.demonitor(mon_ref, [:flush])
170159
{:error, reason}
171160
{:DOWN, ^mon_ref, _, _, reason} ->

0 commit comments

Comments
 (0)