Skip to content

Commit 4bec785

Browse files
author
José Valim
committed
Also support :sync mode in GenEvent, make :ack one the default
1 parent e3273f7 commit 4bec785

File tree

2 files changed

+69
-16
lines changed

2 files changed

+69
-16
lines changed

lib/elixir/lib/gen_event.ex

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ defmodule GenEvent.Stream do
99
* `:id` - the event stream id for cancellation
1010
* `:timeout` - the timeout in between events, defaults to `:infinity`
1111
* `:duration` - the duration of the subscription, defaults to `:infinity`
12-
* `:mode` - if the subscription mode is sync or async, defaults to `:sync`
12+
* `:mode` - if the subscription mode is ack, sync or async, defaults to `:ack`
1313
"""
14-
defstruct manager: nil, id: nil, timeout: :infinity, duration: :infinity, mode: :sync
14+
defstruct manager: nil, id: nil, timeout: :infinity, duration: :infinity, mode: :ack
1515

1616
@typedoc "The stream mode"
17-
@type mode :: :sync | :async
17+
@type mode :: :ack | :sync | :async
1818

1919
@type t :: %__MODULE__{
2020
manager: GenEvent.manager,
@@ -277,7 +277,7 @@ defmodule GenEvent do
277277
The stream is a `GenEvent` struct that implements the `Enumerable`
278278
protocol. Consumption of events only begins when enumeration starts.
279279
280-
`The supported options are:
280+
## Options
281281
282282
* `:id` - an id to identify all live stream instances; when an `:id` is
283283
given, existing streams can be called with via `cancel_streams`.
@@ -287,9 +287,23 @@ defmodule GenEvent do
287287
* `:duration` (Enumerable) - only consume events during the X milliseconds
288288
from the streaming start.
289289
290-
* `:mode` - the mode to consume events, can be `:sync` (default) or
291-
`:async`. On sync, the event manager waits for the event to be consumed
292-
before moving on to the next event handler.
290+
* `:mode` - the mode to consume events, can be `:ack` (default), `:sync`
291+
or `:async`. See modes below.
292+
293+
## Modes
294+
295+
GenEvent stream supports three different modes.
296+
297+
On `:ack`, the stream acknowledges each event, providing back pressure,
298+
but processing of the message happens asynchronously, allowing the event
299+
manager to move on to the next handler as soon as the event is
300+
acknowledged.
301+
302+
On `:sync`, the event manager waits for the event to be consumed
303+
before moving on to the next event handler.
304+
305+
On `:async`, all events are processed asynchronously but there is no
306+
ack (which means there is no backpressure).
293307
294308
"""
295309
@spec stream(manager, Keyword.t) :: GenEvent.Stream.t
@@ -299,7 +313,7 @@ defmodule GenEvent do
299313
id: Keyword.get(options, :id),
300314
timeout: Keyword.get(options, :timeout, :infinity),
301315
duration: Keyword.get(options, :duration, :infinity),
302-
mode: Keyword.get(options, :mode, :sync)}
316+
mode: Keyword.get(options, :mode, :ack)}
303317
end
304318

305319
@doc """
@@ -478,7 +492,7 @@ defimpl Enumerable, for: GenEvent.Stream do
478492
end
479493

480494
@doc false
481-
def handle_event(event, {:sync, mon_pid, pid, ref} = state) do
495+
def handle_event(event, {mode, mon_pid, pid, ref} = state) when mode in [:sync, :ack] do
482496
sync = Process.monitor(mon_pid)
483497
send pid, {ref, sync, event}
484498
receive do
@@ -515,11 +529,17 @@ defimpl Enumerable, for: GenEvent.Stream do
515529

516530
defp wrap_reducer(fun) do
517531
fn
518-
{nil, _manager, event}, acc ->
519-
fun.(event, acc)
520-
{ref, manager, event}, acc ->
532+
{:ack, ref, manager, event}, acc ->
521533
send manager, {ref, :next}
522534
fun.(event, acc)
535+
{:async, _, _manager, event}, acc ->
536+
fun.(event, acc)
537+
{:sync, ref, manager, event}, acc ->
538+
try do
539+
fun.(event, acc)
540+
after
541+
send manager, {ref, :next}
542+
end
523543
end
524544
end
525545

@@ -548,7 +568,7 @@ defimpl Enumerable, for: GenEvent.Stream do
548568
send(self(), {:DOWN, mon_ref, :process, mon_pid, :normal})
549569
exit({reason, {__MODULE__, :next, [stream, acc]}})
550570
{^mon_ref, sync_ref, event} ->
551-
{{sync_ref, manager_pid, event}, acc}
571+
{{stream.mode, sync_ref, manager_pid, event}, acc}
552572
after
553573
timeout ->
554574
exit({:timeout, {__MODULE__, :next, [stream, acc]}})

lib/elixir/test/elixir/gen_event_test.exs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,12 @@ defmodule GenEventTest do
101101
Logger.add_backend(:console, flush: true)
102102
end
103103

104-
test "sync stream/2" do
104+
test "ack stream/2" do
105105
{:ok, pid} = GenEvent.start_link()
106106
parent = self()
107107

108108
spawn_link fn ->
109-
send parent, Enum.take(GenEvent.stream(pid, mode: :sync), 3)
109+
send parent, Enum.take(GenEvent.stream(pid, mode: :ack), 3)
110110
end
111111

112112
wait_for_handlers(pid, 1)
@@ -120,7 +120,7 @@ defmodule GenEventTest do
120120
wait_for_handlers(pid, 0)
121121

122122
spawn_link fn ->
123-
Enum.each(GenEvent.stream(pid, mode: :sync), fn _ ->
123+
Enum.each(GenEvent.stream(pid, mode: :ack), fn _ ->
124124
:timer.sleep(:infinity)
125125
end)
126126
end
@@ -136,6 +136,39 @@ defmodule GenEventTest do
136136
wait_for_queue_length(pid, 4)
137137
end
138138

139+
test "sync stream/2" do
140+
{:ok, pid} = GenEvent.start_link()
141+
parent = self()
142+
143+
spawn_link fn ->
144+
send parent, Enum.take(GenEvent.stream(pid, mode: :sync), 3)
145+
end
146+
147+
wait_for_handlers(pid, 1)
148+
149+
for i <- 1..3 do
150+
GenEvent.sync_notify(pid, i)
151+
end
152+
153+
# Receive one of the results
154+
assert_receive [1, 2, 3], @receive_timeout
155+
wait_for_handlers(pid, 0)
156+
157+
spawn_link fn ->
158+
Enum.each(GenEvent.stream(pid, mode: :sync), fn _ ->
159+
:timer.sleep(:infinity)
160+
end)
161+
end
162+
163+
wait_for_handlers(pid, 1)
164+
165+
for i <- 1..6 do
166+
GenEvent.notify(pid, i)
167+
end
168+
169+
wait_for_queue_length(pid, 5)
170+
end
171+
139172
test "async stream/2" do
140173
{:ok, pid} = GenEvent.start_link()
141174
parent = self()

0 commit comments

Comments
 (0)