Skip to content

Commit 27c492d

Browse files
author
José Valim
committed
Make parallel: true opt-in in Registry.dispatch/3, closes #6226
1 parent e0480b6 commit 27c492d

File tree

3 files changed

+78
-19
lines changed

3 files changed

+78
-19
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ This release brings further improvements to Calendar types. It adds arithmetic a
165165
* [Process] Add `Process.cancel_timer/2`
166166
* [Protocol] Show available implementations on `Protocol.UndefinedError` if the protocol has been consolidated
167167
* [Registry] Support ETS guard conditions in `Registry.match/3`
168+
* [Registry] Support `parallel: true` in `Registry.dispatch/3`
168169
* [Supervisor] Add `Supervisor.init/2` and `Supervisor.child_spec/2`
169170
* [Supervisor] Allow `module` and `{module, arg}` to be given to `Supervisor.start_link/2` and invoke `module.child_spec(arg)` on each argument
170171
* [Task] Support `:on_timeout` in `Task.async_stream` to control how tasks are terminated

lib/elixir/lib/registry.ex

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ defmodule Registry do
109109
110110
In this example, we will also set the number of partitions to the number of
111111
schedulers online, which will make the registry more performant on highly
112-
concurrent environments as each partition will spawn a new process, allowing
113-
dispatching to happen in parallel:
112+
concurrent environments:
114113
115114
{:ok, _} = Registry.start_link(keys: :duplicate, name: Registry.PubSubTest,
116115
partitions: System.schedulers_online)
@@ -362,17 +361,16 @@ defmodule Registry do
362361
associated to the pid. If there are no entries for the given key,
363362
the callback is never invoked.
364363
365-
If the registry is not partitioned, the callback is invoked in the process
366-
that calls `dispatch/3`. If the registry is partitioned, the callback is
367-
invoked concurrently per partition by starting a task linked to the
368-
caller. The callback, however, is only invoked if there are entries for that
369-
partition.
364+
If the registry is partitioned, the callback is invoked multiple times
365+
per partition. If the registry is partitioned and `parallel: true` is
366+
given as an option, the dispatching happens in parallel. In both cases,
367+
the callback is only invoked if there are entries for that partition.
370368
371369
See the module documentation for examples of using the `dispatch/3`
372370
function for building custom dispatching or a pubsub system.
373371
"""
374-
@spec dispatch(registry, key, (entries :: [{pid, value}] -> term)) :: :ok
375-
def dispatch(registry, key, mfa_or_fun)
372+
@spec dispatch(registry, key, (entries :: [{pid, value}] -> term), keyword) :: :ok
373+
def dispatch(registry, key, mfa_or_fun, opts \\ [])
376374
when is_atom(registry) and is_function(mfa_or_fun, 1)
377375
when is_atom(registry) and tuple_size(mfa_or_fun) == 3 do
378376
case key_info!(registry) do
@@ -386,17 +384,33 @@ defmodule Registry do
386384
|> safe_lookup_second(key)
387385
|> apply_non_empty_to_mfa_or_fun(mfa_or_fun)
388386
{:duplicate, partitions, _} ->
389-
registry
390-
|> dispatch_task(key, mfa_or_fun, partitions)
391-
|> Enum.each(&Task.await(&1, :infinity))
387+
if Keyword.get(opts, :parallel, false) do
388+
registry
389+
|> dispatch_parallel(key, mfa_or_fun, partitions)
390+
|> Enum.each(&Task.await(&1, :infinity))
391+
else
392+
dispatch_serial(registry, key, mfa_or_fun, partitions)
393+
end
392394
end
393395
:ok
394396
end
395397

396-
defp dispatch_task(_registry, _key, _mfa_or_fun, 0) do
398+
defp dispatch_serial(_registry, _key, _mfa_or_fun, 0) do
399+
:ok
400+
end
401+
defp dispatch_serial(registry, key, mfa_or_fun, partition) do
402+
partition = partition - 1
403+
registry
404+
|> key_ets!(partition)
405+
|> safe_lookup_second(key)
406+
|> apply_non_empty_to_mfa_or_fun(mfa_or_fun)
407+
dispatch_serial(registry, key, mfa_or_fun, partition)
408+
end
409+
410+
defp dispatch_parallel(_registry, _key, _mfa_or_fun, 0) do
397411
[]
398412
end
399-
defp dispatch_task(registry, key, mfa_or_fun, partition) do
413+
defp dispatch_parallel(registry, key, mfa_or_fun, partition) do
400414
partition = partition - 1
401415
parent = self()
402416
task = Task.async(fn ->
@@ -407,7 +421,7 @@ defmodule Registry do
407421
Process.unlink(parent)
408422
:ok
409423
end)
410-
[task | dispatch_task(registry, key, mfa_or_fun, partition)]
424+
[task | dispatch_parallel(registry, key, mfa_or_fun, partition)]
411425
end
412426

413427
defp apply_non_empty_to_mfa_or_fun([], _mfa_or_fun) do

lib/elixir/test/elixir/registry_test.exs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,28 +220,72 @@ defmodule RegistryTest do
220220
assert Registry.keys(registry, self()) |> Enum.sort() == [1, 1.0]
221221
end
222222

223-
test "dispatches to multiple keys", %{registry: registry} do
223+
test "dispatches to multiple keys in serial", %{registry: registry} do
224224
Process.flag(:trap_exit, true)
225+
parent = self()
225226

226227
assert Registry.dispatch(registry, "hello", fn _ ->
227228
raise "will never be invoked"
228-
end) == :ok
229+
end, parallel: false) == :ok
229230

230231
{:ok, _} = Registry.register(registry, "hello", :value1)
231232
{:ok, _} = Registry.register(registry, "hello", :value2)
232233
{:ok, _} = Registry.register(registry, "world", :value3)
233234

234235
assert Registry.dispatch(registry, "hello", fn entries ->
236+
assert parent == self()
235237
for {pid, value} <- entries, do: send(pid, {:dispatch, value})
236-
end)
238+
end, parallel: false)
237239

238240
assert_received {:dispatch, :value1}
239241
assert_received {:dispatch, :value2}
240242
refute_received {:dispatch, :value3}
241243

242244
assert Registry.dispatch(registry, "world", fn entries ->
245+
assert parent == self()
243246
for {pid, value} <- entries, do: send(pid, {:dispatch, value})
244-
end)
247+
end, parallel: false)
248+
249+
refute_received {:dispatch, :value1}
250+
refute_received {:dispatch, :value2}
251+
assert_received {:dispatch, :value3}
252+
253+
refute_received {:EXIT, _, _}
254+
end
255+
256+
test "dispatches to multiple keys in parallel", %{registry: registry, partitions: partitions} do
257+
Process.flag(:trap_exit, true)
258+
parent = self()
259+
260+
assert Registry.dispatch(registry, "hello", fn _ ->
261+
raise "will never be invoked"
262+
end, parallel: true) == :ok
263+
264+
{:ok, _} = Registry.register(registry, "hello", :value1)
265+
{:ok, _} = Registry.register(registry, "hello", :value2)
266+
{:ok, _} = Registry.register(registry, "world", :value3)
267+
268+
assert Registry.dispatch(registry, "hello", fn entries ->
269+
if partitions == 8 do
270+
assert parent != self()
271+
else
272+
assert parent == self()
273+
end
274+
for {pid, value} <- entries, do: send(pid, {:dispatch, value})
275+
end, parallel: true)
276+
277+
assert_received {:dispatch, :value1}
278+
assert_received {:dispatch, :value2}
279+
refute_received {:dispatch, :value3}
280+
281+
assert Registry.dispatch(registry, "world", fn entries ->
282+
if partitions == 8 do
283+
assert parent != self()
284+
else
285+
assert parent == self()
286+
end
287+
for {pid, value} <- entries, do: send(pid, {:dispatch, value})
288+
end, parallel: true)
245289

246290
refute_received {:dispatch, :value1}
247291
refute_received {:dispatch, :value2}

0 commit comments

Comments
 (0)