Skip to content

Commit 1572aac

Browse files
committed
Add Registry.lock/3
1 parent 5bbe055 commit 1572aac

File tree

2 files changed

+299
-6
lines changed

2 files changed

+299
-6
lines changed

lib/elixir/lib/registry.ex

Lines changed: 131 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,64 @@ defmodule Registry do
609609
end
610610
end
611611

612+
@doc """
613+
Out-of-band locking of the given `lock_key` for the duration of `function`.
614+
615+
Only one function can execute under the same `lock_key` at a given
616+
time. The given function always runs in the caller process.
617+
618+
The `lock_key` has its own namespace and therefore does not clash or
619+
overlap with the regular registry keys. In other words, locking works
620+
out-of-band from the regular Registry operations. See the "Use cases"
621+
section below.
622+
623+
Locking behaves the same regardless of the registry type.
624+
625+
## Use cases
626+
627+
The Registry is safe and concurrent out-of-the-box. You are not required
628+
to use this function when interacting with the Registry. Furthermore,
629+
`Registry` with `:unique` keys can already act as a process-lock for any
630+
given key. For example, you can ensure only one process runs at a given
631+
time for a given `:key` by doing:
632+
633+
name = {:via, Registry, {MyApp.Registry, :key, :value}}
634+
635+
# Do not attempt to start if we are already running
636+
if pid = GenServer.whereis(name) do
637+
pid
638+
else
639+
case GenServer.start_link(__MODULE__, :ok, name: name) do
640+
{:ok, pid} -> pid
641+
{:error, {:already_started, pid}} -> pid
642+
end
643+
end
644+
645+
Process locking gives you plenty of flexibility and fault isolation and
646+
is enough for most cases.
647+
648+
This function is useful only when spawning processes is not an option,
649+
for example, when copying the data to another process could be too
650+
expensive. Or when the work must be done within the current process
651+
for other reasons.
652+
653+
## Examples
654+
655+
iex> Registry.start_link(keys: :unique, name: Registry.LockTest)
656+
iex> Registry.lock(Registry.LockTest, :hello, fn -> :ok end)
657+
:ok
658+
iex> Registry.lock(Registry.LockTest, :world, fn -> self() end)
659+
self()
660+
661+
"""
662+
@doc since: "1.18.0"
663+
def lock(registry, lock_key, function)
664+
when is_atom(registry) and is_function(function, 0) do
665+
{_kind, partitions, _, pid_ets, _} = info!(registry)
666+
{pid_server, _pid_ets} = pid_ets || pid_ets!(registry, key, partitions)
667+
Registry.Partition.lock(pid_server, key, function)
668+
end
669+
612670
@doc """
613671
Returns `{pid, value}` pairs under the given `key` in `registry` that match `pattern`.
614672
@@ -1500,7 +1558,7 @@ defmodule Registry.Partition do
15001558
@moduledoc false
15011559

15021560
# This process owns the equivalent key and pid ETS tables
1503-
# and is responsible for monitoring processes that map to
1561+
# and is responsible for linking to processes that map to
15041562
# its own pid table.
15051563
use GenServer
15061564
@all_info -1
@@ -1525,13 +1583,25 @@ defmodule Registry.Partition do
15251583
@doc """
15261584
Starts the registry partition.
15271585
1528-
The process is only responsible for monitoring, demonitoring
1529-
and cleaning up when monitored processes crash.
1586+
The process is only responsible for linking and cleaning up when processes crash.
15301587
"""
15311588
def start_link(registry, arg) do
15321589
GenServer.start_link(__MODULE__, arg, name: registry)
15331590
end
15341591

1592+
@doc """
1593+
Runs function with a lock.
1594+
"""
1595+
def lock(pid, key, lock) do
1596+
{:ok, ref} = GenServer.call(pid, {:lock, key})
1597+
1598+
try do
1599+
lock.()
1600+
after
1601+
send(pid, {:unlock, key, ref})
1602+
end
1603+
end
1604+
15351605
## Callbacks
15361606

15371607
def init({kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed}) do
@@ -1552,7 +1622,7 @@ defmodule Registry.Partition do
15521622
true = :ets.insert(registry, {i, key_ets, {self(), pid_ets}})
15531623
end
15541624

1555-
{:ok, pid_ets}
1625+
{:ok, {pid_ets, %{}}}
15561626
end
15571627

15581628
# The key partition is a set for unique keys,
@@ -1586,7 +1656,21 @@ defmodule Registry.Partition do
15861656
{:reply, :ok, state}
15871657
end
15881658

1589-
def handle_info({:EXIT, pid, _reason}, ets) do
1659+
def handle_call({:lock, key}, from, {ets, lock}) do
1660+
lock =
1661+
case lock do
1662+
%{^key => queue} ->
1663+
Map.put(lock, key, :queue.in(from, queue))
1664+
1665+
%{} ->
1666+
go(from, key)
1667+
Map.put(lock, key, :queue.new())
1668+
end
1669+
1670+
{:noreply, {ets, lock}}
1671+
end
1672+
1673+
def handle_info({:EXIT, pid, _reason}, {ets, lock}) do
15901674
entries = :ets.take(ets, pid)
15911675

15921676
for {_pid, key, key_ets, _counter} <- entries do
@@ -1607,6 +1691,47 @@ defmodule Registry.Partition do
16071691
end
16081692
end
16091693

1610-
{:noreply, ets}
1694+
{:noreply, {ets, lock}}
1695+
end
1696+
1697+
def handle_info({{:unlock, key}, _ref, :process, _pid, _reason}, state) do
1698+
unlock(key, state)
1699+
end
1700+
1701+
def handle_info({:unlock, key, ref}, state) do
1702+
Process.demonitor(ref, [:flush])
1703+
unlock(key, state)
1704+
end
1705+
1706+
defp unlock(key, {ets, lock}) do
1707+
%{^key => queue} = lock
1708+
1709+
lock =
1710+
case dequeue(queue, key) do
1711+
:empty -> Map.delete(lock, key)
1712+
{:not_empty, queue} -> Map.put(lock, key, queue)
1713+
end
1714+
1715+
{:noreply, {ets, lock}}
1716+
end
1717+
1718+
defp dequeue(queue, key) do
1719+
case :queue.out(queue) do
1720+
{:empty, _} ->
1721+
:empty
1722+
1723+
{{:value, {pid, _tag} = from}, queue} ->
1724+
if node(pid) != node() or Process.alive?(pid) do
1725+
go(from, key)
1726+
{:not_empty, queue}
1727+
else
1728+
dequeue(queue, key)
1729+
end
1730+
end
1731+
end
1732+
1733+
defp go({pid, _tag} = from, key) do
1734+
ref = Process.monitor(pid, tag: {:unlock, key})
1735+
GenServer.reply(from, {:ok, ref})
16111736
end
16121737
end

lib/elixir/test/elixir/registry_test.exs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -994,3 +994,171 @@ defmodule Registry.Test do
994994
|> Enum.find_value(fn id -> :ets.info(id, :name) == table_name and :ets.info(id, :size) end)
995995
end
996996
end
997+
998+
defmodule Registry.LockTest do
999+
use ExUnit.Case,
1000+
async: true,
1001+
parameterize: [
1002+
%{keys: :unique, partitions: 1},
1003+
%{keys: :unique, partitions: 8},
1004+
%{keys: :duplicate, partitions: 1},
1005+
%{keys: :duplicate, partitions: 8}
1006+
]
1007+
1008+
setup config do
1009+
keys = config.keys
1010+
partitions = config.partitions
1011+
name = :"#{config.test}_#{keys}_#{partitions}"
1012+
opts = [keys: keys, name: name, partitions: partitions]
1013+
{:ok, _} = start_supervised({Registry, opts})
1014+
%{registry: name}
1015+
end
1016+
1017+
test "does not lock when using different keys", config do
1018+
parent = self()
1019+
1020+
task1 =
1021+
Task.async(fn ->
1022+
Registry.lock(config.registry, 1, fn ->
1023+
send(parent, :locked1)
1024+
assert_receive :unlock
1025+
:done
1026+
end)
1027+
end)
1028+
1029+
assert_receive :locked1
1030+
1031+
task2 =
1032+
Task.async(fn ->
1033+
Registry.lock(config.registry, 2, fn ->
1034+
send(parent, :locked2)
1035+
assert_receive :unlock
1036+
:done
1037+
end)
1038+
end)
1039+
1040+
assert_receive :locked2
1041+
1042+
send(task1.pid, :unlock)
1043+
send(task2.pid, :unlock)
1044+
assert Task.await(task1) == :done
1045+
assert Task.await(task2) == :done
1046+
assert Registry.lock(config.registry, 1, fn -> :done end) == :done
1047+
assert Registry.lock(config.registry, 2, fn -> :done end) == :done
1048+
end
1049+
1050+
test "locks when using the same key", config do
1051+
parent = self()
1052+
1053+
task1 =
1054+
Task.async(fn ->
1055+
Registry.lock(config.registry, :ok, fn ->
1056+
send(parent, :locked1)
1057+
assert_receive :unlock
1058+
:done
1059+
end)
1060+
end)
1061+
1062+
assert_receive :locked1
1063+
1064+
task2 =
1065+
Task.async(fn ->
1066+
Registry.lock(config.registry, :ok, fn ->
1067+
send(parent, :locked2)
1068+
:done
1069+
end)
1070+
end)
1071+
1072+
refute_receive :locked2, 100
1073+
1074+
send(task1.pid, :unlock)
1075+
assert Task.await(task1) == :done
1076+
assert_receive :locked2
1077+
assert Task.await(task2) == :done
1078+
assert Registry.lock(config.registry, :ok, fn -> :done end) == :done
1079+
end
1080+
1081+
@tag :capture_log
1082+
test "locks when the one holding the lock raises", config do
1083+
parent = self()
1084+
1085+
task1 =
1086+
Task.async(fn ->
1087+
Registry.lock(config.registry, :ok, fn ->
1088+
send(parent, :locked)
1089+
assert_receive :unlock
1090+
raise "oops"
1091+
end)
1092+
end)
1093+
1094+
Process.unlink(task1.pid)
1095+
assert_receive :locked
1096+
1097+
task2 =
1098+
Task.async(fn ->
1099+
Registry.lock(config.registry, :ok, fn ->
1100+
:done
1101+
end)
1102+
end)
1103+
1104+
send(task1.pid, :unlock)
1105+
assert {:exit, {%RuntimeError{message: "oops"}, [_ | _]}} = Task.yield(task1)
1106+
assert Task.await(task2) == :done
1107+
assert Registry.lock(config.registry, :ok, fn -> :done end) == :done
1108+
end
1109+
1110+
test "locks when the one holding the lock terminates", config do
1111+
parent = self()
1112+
1113+
task1 =
1114+
Task.async(fn ->
1115+
Registry.lock(config.registry, :ok, fn ->
1116+
send(parent, :locked)
1117+
assert_receive :unlock
1118+
:done
1119+
end)
1120+
end)
1121+
1122+
assert_receive :locked
1123+
1124+
task2 =
1125+
Task.async(fn ->
1126+
Registry.lock(config.registry, :ok, fn ->
1127+
:done
1128+
end)
1129+
end)
1130+
1131+
assert Task.shutdown(task1, :brutal_kill) == nil
1132+
assert Task.await(task2) == :done
1133+
assert Registry.lock(config.registry, :ok, fn -> :done end) == :done
1134+
end
1135+
1136+
test "locks when the one waiting for the lock terminates", config do
1137+
parent = self()
1138+
1139+
task1 =
1140+
Task.async(fn ->
1141+
Registry.lock(config.registry, :ok, fn ->
1142+
send(parent, :locked)
1143+
assert_receive :unlock
1144+
:done
1145+
end)
1146+
end)
1147+
1148+
assert_receive :locked
1149+
1150+
task2 =
1151+
Task.async(fn ->
1152+
Registry.lock(config.registry, :ok, fn ->
1153+
:done
1154+
end)
1155+
end)
1156+
1157+
:erlang.yield()
1158+
assert Task.shutdown(task2, :brutal_kill) == nil
1159+
1160+
send(task1.pid, :unlock)
1161+
assert Task.await(task1) == :done
1162+
assert Registry.lock(config.registry, :ok, fn -> :done end) == :done
1163+
end
1164+
end

0 commit comments

Comments
 (0)