Skip to content

Commit 311efb6

Browse files
author
José Valim
committed
Add GenEvent.stream modes and make it sync by default
1 parent 272be2c commit 311efb6

File tree

2 files changed

+264
-201
lines changed

2 files changed

+264
-201
lines changed

lib/elixir/lib/gen_event.ex

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,9 @@ defmodule GenEvent do
174174
* `:id` - the event stream id for cancellation
175175
* `:timeout` - the timeout in between events, defaults to `:infinity`
176176
* `:duration` - the duration of the subscription, defaults to `:infinity`
177+
* `:mode` - if the subscription mode is sync or async, defaults to `:sync`
177178
"""
178-
defstruct manager: nil, id: nil, timeout: :infinity, duration: :infinity
179+
defstruct manager: nil, id: nil, timeout: :infinity, duration: :infinity, mode: :sync
179180

180181
@doc false
181182
defmacro __using__(_) do
@@ -267,18 +268,24 @@ defmodule GenEvent do
267268
protocol. The supported options are:
268269
269270
* `:id` - an id to identify all live stream instances; when an `:id` is
270-
given, existing streams can be called with via `cancel_streams`
271+
given, existing streams can be called with via `cancel_streams`.
271272
272-
* `:timeout` (Enumerable) - raises if no event arrives in X milliseconds
273+
* `:timeout` (Enumerable) - raises if no event arrives in X milliseconds.
273274
274275
* `:duration` (Enumerable) - only consume events during the X milliseconds
275-
from the streaming start
276+
from the streaming start.
277+
278+
* `:mode` - the mode to consume events, can be `:sync` (default) or
279+
`:async`. On sync, the event manager waits for the event to be consumed
280+
before moving on to the next event handler.
281+
276282
"""
277283
def stream(manager, options \\ []) do
278284
%GenEvent{manager: manager,
279285
id: Keyword.get(options, :id),
280286
timeout: Keyword.get(options, :timeout, :infinity),
281-
duration: Keyword.get(options, :duration, :infinity)}
287+
duration: Keyword.get(options, :duration, :infinity),
288+
mode: Keyword.get(options, :mode, :sync)}
282289
end
283290

284291
@doc """
@@ -449,24 +456,34 @@ defimpl Enumerable, for: GenEvent do
449456
use GenEvent
450457

451458
@doc false
452-
def init({mon_pid, pid, ref}) do
459+
def init({_mode, mon_pid, _pid, ref} = state) do
453460
# Tell the mon_pid we are good to go, and send self() so that this handler
454461
# can be removed later without using the managers name.
455462
send(mon_pid, {:UP, ref, self()})
456-
{:ok, {pid, ref}}
463+
{:ok, state}
457464
end
458465

459466
@doc false
460-
def handle_event(event, {pid, ref} = state) do
461-
send pid, {ref, event}
467+
def handle_event(event, {:sync, mon_pid, pid, ref} = state) do
468+
sync = Process.monitor(mon_pid)
469+
send pid, {ref, sync, event}
470+
receive do
471+
{^sync, :done} -> Process.demonitor(sync, [:flush])
472+
{:DOWN, ^sync, _, _, _} -> :ok
473+
end
474+
{:ok, state}
475+
end
476+
477+
def handle_event(event, {:async, _mon_pid, pid, ref} = state) do
478+
send pid, {ref, nil, event}
462479
{:ok, state}
463480
end
464481

465482
def reduce(stream, acc, fun) do
466483
start_fun = fn() -> start(stream) end
467484
next_fun = &next(stream, &1)
468485
stop_fun = &stop(stream, &1)
469-
Stream.resource(start_fun, next_fun, stop_fun).(acc, fun)
486+
Stream.resource(start_fun, next_fun, stop_fun).(acc, wrap_reducer(fun))
470487
end
471488

472489
def count(_stream) do
@@ -477,24 +494,37 @@ defimpl Enumerable, for: GenEvent do
477494
{:error, __MODULE__}
478495
end
479496

480-
defp start(%{manager: manager, id: id, duration: duration} = stream) do
481-
{mon_pid, mon_ref} = add_handler(manager, id, duration)
497+
defp wrap_reducer(fun) do
498+
fn
499+
{nil, _manager, event}, acc ->
500+
fun.(event, acc)
501+
{ref, manager, event}, acc ->
502+
acc = fun.(event, acc)
503+
send manager, {ref, :done}
504+
acc
505+
end
506+
end
507+
508+
defp start(%{manager: manager, id: id, duration: duration, mode: mode} = stream) do
509+
{mon_pid, mon_ref} = add_handler(mode, manager, id, duration)
482510
send mon_pid, {:UP, mon_ref, self()}
483511

484512
receive do
485513
# The subscription process gave us a go.
486514
{:UP, ^mon_ref, manager_pid} ->
487515
{mon_ref, manager_pid}
488-
# The subscription process died due to an abnormal reason.
516+
# The subscription process died due to an abnormal reason.
489517
{:DOWN, ^mon_ref, _, _, reason} ->
490518
exit({reason, {__MODULE__, :start, [stream]}})
491519
end
492520
end
493521

494-
defp next(%{timeout: timeout} = stream, {mon_ref, _manager_pid} = acc) do
522+
defp next(%{timeout: timeout} = stream, {mon_ref, manager_pid} = acc) do
495523
receive do
496-
{^mon_ref, event} -> {event, acc}
497-
{:DOWN, ^mon_ref, _, _, :normal} -> nil
524+
{^mon_ref, sync_ref, event} ->
525+
{{sync_ref, manager_pid, event}, acc}
526+
{:DOWN, ^mon_ref, _, _, :normal} ->
527+
nil
498528
{:DOWN, ^mon_ref, _, _, reason} ->
499529
exit({reason, {__MODULE__, :next, [stream, acc]}})
500530
after
@@ -508,7 +538,7 @@ defimpl Enumerable, for: GenEvent do
508538
flush_events(mon_ref)
509539
end
510540

511-
defp add_handler(manager, id, duration) do
541+
defp add_handler(mode, manager, id, duration) do
512542
parent = self()
513543

514544
# The subscription is managed by another process, that dies if
@@ -530,7 +560,7 @@ defimpl Enumerable, for: GenEvent do
530560

531561
cancel = cancel_ref(id, mon_ref)
532562
:ok = :gen_event.add_sup_handler(manager, {__MODULE__, cancel},
533-
{self(), parent, mon_ref})
563+
{mode, self(), parent, mon_ref})
534564

535565
receive do
536566
# This message is already in the mailbox if we got this far.
@@ -581,7 +611,8 @@ defimpl Enumerable, for: GenEvent do
581611

582612
defp flush_events(mon_ref) do
583613
receive do
584-
{^mon_ref, _} -> flush_events(mon_ref)
614+
{^mon_ref, _, _} ->
615+
flush_events(mon_ref)
585616
after
586617
0 -> :ok
587618
end

0 commit comments

Comments
 (0)