Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions lib/mix/lib/mix/lock.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
defmodule Mix.Lock do
@moduledoc false

# Lock implementation working across multiple OS processes.
#
# The lock is implemented using TCP sockets and hard links.
#
# A process holds the lock if it owns a TCP socket, whose port is
# written in the lock_0 file. We need to create such lock files
# atomically, so the process first writes its port to a port_P
# file and then attempts to create a hard link to it at lock_0.
#
# An inherent problem with lock files is that the lock owner may
# terminate abruptly, leaving a "stale" file. Other processes can
# detect a stale file by reading the port written in that file,
# trying to connect to thart port and failing. In order for another
# process to link to the same path, the file needs to be replaced.
# However, we need to guarantee that only a single process can
# remove or replace the file, otherwise a concurrent process may
# end up removing a newly linked file.
#
# To address this problem we employ a chained locking procedure.
# Specifically, we attempt to link our port to lock_0, if that
# fails, we try to connect to the lock_0 port. If we manage to
# connect, it means the lock is taken, so we wait for it to close
# and start over. If we fail to connect, it means the lock is stale,
# so we want to replace it. In order to do that, we try to obtain
# lock_1. Again, we try to link and connect. Eventually, we should
# successfully link to lock_N. At that point we can clean up all
# the files, so we perform these steps:
#
# * move our port_P to lock_0
# * remove all the other port_P files
# * remove all lock_1+ files
#
# It is important to perform these steps in this order, to avoid
# race conditions. By moving to lock_0, we make sure that all new
# processes trying to lock will connect to our port. By removing
# all port_P files we make sure that currently paused processes
# that are about to link port_P at lock_N will fail to link, since
# the port_P file will no longer exist (once lock_N is removed).
#
# Finally, note that we do not remove the lock file in `unlock/1`.
# If we did that, another process could try to connect and fail
# because the file would not exist, in such case the process would
# assume the file is stale and needs to be replaced, therefore
# possibly replacing another process who successfully links at the
# empty spot. This means we effectively always leave a stale file,
# however, in order to shortcut the port check for future processes,
# we atomically replace the file content with port 0, to indicate
# the file is stale.
#
# The main caveat of using ephemeral TCP ports is that they are not
# unique. This creates a theoretical scenario where the lock holder
# terminates abruptly and leaves its port in lock_0, then the port
# is assigned to a unrelated process (unaware of the locking). To
# handle this scenario, when we connect to a lock_N port, we expect
# it to immediately send us `@probe_data`. If this does not happen
# within `@probe_timeout_ms`, we assume the port is taken by an
# unrelated process and the lock file is stale. Note that it is ok
# to use a long timeout, because this scenario is very unlikely.
# Theoretically, if an actual lock owner is not able to send the
# probe data within the timeout, the lock will fail, however with
# a high enough timeout, this should not be a problem in practice.

@loopback {127, 0, 0, 1}
@listen_opts [:binary, ip: @loopback, packet: :raw, nodelay: true, backlog: 128, active: false]
@connect_opts [:binary, packet: :raw, nodelay: true, active: false]
@probe_data <<"elixirlock">>
@probe_timeout_ms 5_000

@doc """
Acquires a lock identified by the given key.

This function blocks until the lock is acquired by this process.

This function can also be called if this process already has the
lock. In such case the function is executed immediately.
"""
@spec lock(iodata(), (-> term())) :: :ok
def lock(key, fun) do
key = key |> :erlang.md5() |> Base.url_encode64(padding: false)
path = Path.join([System.tmp_dir!(), "mix_lock", key])

pdict_key = {__MODULE__, path}
acquire_lock? = Process.get(pdict_key) == nil

try do
if acquire_lock? do
lock = lock(path)
Process.put(pdict_key, lock)
end

fun.()
after
if acquire_lock? do
lock = Process.get(pdict_key)
unlock(lock)
Process.delete(pdict_key)
end
end
end

defp lock(path) do
:ok = File.mkdir_p(path)

{:ok, socket} = :gen_tcp.listen(0, @listen_opts)
{:ok, port} = :inet.port(socket)

spawn_link(fn -> accept_loop(socket) end)

try_lock(path, socket, port)
end

defp try_lock(path, socket, port) do
port_path = Path.join(path, "port_#{port}")

:ok = File.write(port_path, <<port::unsigned-integer-32>>, [:raw])

case grab_lock(path, port_path, 0) do
{:ok, 0} ->
# We grabbed lock_0, so all good
%{socket: socket, path: path}

{:ok, _n} ->
# We grabbed lock_1+, so we need to replace lock_0 and clean up
take_over(path, port_path)
%{socket: socket, path: path}

{:taken, probe_socket} ->
# Another process has the lock, wait for close and start over
await_close(probe_socket)
try_lock(path, socket, port)

:invalidated ->
try_lock(path, socket, port)
end
end

defp grab_lock(path, port_path, n) do
lock_path = Path.join(path, "lock_#{n}")

case File.ln(port_path, lock_path) do
:ok ->
{:ok, n}

{:error, :eexist} ->
case probe(lock_path) do
{:ok, probe_socket} ->
{:taken, probe_socket}

:error ->
grab_lock(path, port_path, n + 1)
end

{:error, :enoent} ->
:invalidated
end
end

defp accept_loop(listen_socket) do
case :gen_tcp.accept(listen_socket) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any other error happens here, we'll get a nasty CaseClauseError. Some of those are transient (:system_limit?), some are not. Maybe we do a simple algorithm of:

  1. Accept
  2. If {:error, reason} and we don't know if reason is transient or not (basically catch all), then sleep for like a second and go back to 1.

With a max number of like 2 or 3 retries for now? I know that just rerunning mix compile would likely achieve the same, but it seems like a small price to pay for a potentially-better UX. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned that if we fail to accept, it means the client fails to connect and assumes the socket is stale. So it's probably safer to crash the lock owner to avoid corruption? We could raise a more specific error to avoid CaseClauseError.

When we reach system_limit on open files/sockets it usually blows everything up anyway? Are there any other transient errors you worry about? I'm looking at the list in linux manual and I'm not sure if it makes sense to retry any of them (other than EINTR).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The accepting process is sort of holding its own little lock, so this confuses me a bit:

it means the client fails to connect and assumes the socket is stale

The client would fail to connect with :closed if the socket is closed, not with other errors, no?

Copy link
Member Author

@jonatanklosko jonatanklosko Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right, I think these are not tied. The client is put in the accept queue and at that point they connect successfully, whether :gen_tcp.accept grabs it from the queue is separate.

But if the idea is to sleep and retry accepting, this heavily increases the chance that we don't send the probe data to the client on time, which is critical.

{:ok, socket} ->
_ = :gen_tcp.send(socket, @probe_data)
accept_loop(listen_socket)

{:error, reason} when reason in [:closed, :einval] ->
:ok
end
end

defp probe(port_path) do
with {:ok, <<port::unsigned-integer-32>>} when port > 0 <- File.read(port_path),
{:ok, socket} <- connect(port) do
case :gen_tcp.recv(socket, 0, @probe_timeout_ms) do
{:ok, @probe_data} ->
{:ok, socket}

{:error, _reason} ->
:gen_tcp.close(socket)
:error
end
else
_other -> :error
end
end

defp connect(port) do
# On Windows connecting to an unbound port takes a few seconds to
# fail, so instead we shortcut the check by attempting a listen,
# which succeeds or fails immediately
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice trick!

case :gen_tcp.listen(port, [reuseaddr: true] ++ @listen_opts) do
{:ok, socket} ->
:gen_tcp.close(socket)
# The port is free, so connecting would fail
{:error, :econnrefused}

{:error, _reason} ->
:gen_tcp.connect(@loopback, port, @connect_opts)
end
end

defp take_over(path, port_path) do
lock_path = Path.join(path, "lock_0")

# We linked to lock_N successfully, so port_path should exist
:ok = File.rename(port_path, lock_path)

{:ok, names} = File.ls(path)

for "port_" <> _ = name <- names do
_ = File.rm(Path.join(path, name))
end

for "lock_" <> _ = name <- names, name != "lock_0" do
_ = File.rm(Path.join(path, name))
end
end

defp await_close(socket) do
{:error, _reason} = :gen_tcp.recv(socket, 0)
end

defp unlock(lock) do
port_path = Path.join(lock.path, "port_0")
lock_path = Path.join(lock.path, "lock_0")

with :ok <- File.write(port_path, <<0::unsigned-integer-32>>, [:raw]) do
_ = File.rename(port_path, lock_path)
end

# Closing the socket will cause the accepting process to finish
# and all accepted sockets (tied to that process) will get closed
:gen_tcp.close(lock.socket)

:ok
end
end
74 changes: 0 additions & 74 deletions lib/mix/lib/mix/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@ defmodule Mix.State do
GenServer.start_link(__MODULE__, :ok, name: @name)
end

def lock(key, fun) do
try do
GenServer.call(@name, {:lock, key}, @timeout)
fun.()
after
GenServer.call(@name, {:unlock, key}, @timeout)
end
end

def builtin_apps do
GenServer.call(@name, :builtin_apps, @timeout)
end
Expand Down Expand Up @@ -83,8 +74,6 @@ defmodule Mix.State do
)

state = %{
key_to_waiting: %{},
pid_to_key: %{},
builtin_apps: :code.get_path()
}

Expand Down Expand Up @@ -114,69 +103,6 @@ defmodule Mix.State do
end
end

@impl true
def handle_call({:lock, key}, {pid, _} = from, state) do
%{key_to_waiting: key_to_waiting, pid_to_key: pid_to_key} = state

key_to_waiting =
case key_to_waiting do
%{^key => {locked, waiting}} ->
Map.put(key_to_waiting, key, {locked, :queue.in(from, waiting)})

%{} ->
go!(from)
Map.put(key_to_waiting, key, {pid, :queue.new()})
end

ref = Process.monitor(pid)
pid_to_key = Map.put(pid_to_key, pid, {key, ref})
{:noreply, %{state | key_to_waiting: key_to_waiting, pid_to_key: pid_to_key}}
end

@impl true
def handle_call({:unlock, key}, {pid, _}, state) do
%{key_to_waiting: key_to_waiting, pid_to_key: pid_to_key} = state
{{^key, ref}, pid_to_key} = Map.pop(pid_to_key, pid)
Process.demonitor(ref, [:flush])
key_to_waiting = unlock(key_to_waiting, pid_to_key, key)
{:reply, :ok, %{state | key_to_waiting: key_to_waiting, pid_to_key: pid_to_key}}
end

@impl true
def handle_info({:DOWN, ref, _type, pid, _reason}, state) do
%{key_to_waiting: key_to_waiting, pid_to_key: pid_to_key} = state
{{key, ^ref}, pid_to_key} = Map.pop(pid_to_key, pid)

key_to_waiting =
case key_to_waiting do
%{^key => {^pid, _}} ->
unlock(key_to_waiting, pid_to_key, key)

%{^key => {locked, waiting}} ->
waiting = :queue.delete_with(fn {qpid, _qref} -> qpid == pid end, waiting)
Map.put(key_to_waiting, key, {locked, waiting})
end

{:noreply, %{state | key_to_waiting: key_to_waiting, pid_to_key: pid_to_key}}
end

defp unlock(key_to_waiting, pid_to_key, key) do
%{^key => {_locked, waiting}} = key_to_waiting

case :queue.out(waiting) do
{{:value, {pid, _} = from}, waiting} ->
# Assert that we still know this PID
_ = Map.fetch!(pid_to_key, pid)
go!(from)
Map.put(key_to_waiting, key, {pid, waiting})

{:empty, _waiting} ->
Map.delete(key_to_waiting, key)
end
end

defp go!(from), do: GenServer.reply(from, :ok)

# ../elixir/ebin -> elixir
# ../ssl-9.6/ebin -> ssl
defp app_from_code_path(path) do
Expand Down
7 changes: 7 additions & 0 deletions lib/mix/lib/mix/tasks/compile.all.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ defmodule Mix.Tasks.Compile.All do
@impl true
def run(args) do
Mix.Project.get!()

Mix.Lock.lock(Mix.Tasks.Compile.compile_lock_key(), fn ->
do_run(args)
end)
end

defp do_run(args) do
config = Mix.Project.config()

# Compute the app cache if it is stale and we are
Expand Down
5 changes: 2 additions & 3 deletions lib/mix/lib/mix/tasks/compile.elixir.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,10 @@ defmodule Mix.Tasks.Compile.Elixir do
|> profile_opts()

# Having compilations racing with other is most undesired,
# so we wrap the compiler in a lock. Ideally we would use
# flock in the future.
# so we wrap the compiler in a lock.

with_logger_app(project, fn ->
Mix.State.lock(__MODULE__, fn ->
Mix.Lock.lock(Mix.Tasks.Compile.compile_lock_key(), fn ->
Mix.Compilers.Elixir.compile(
manifest,
srcs,
Expand Down
13 changes: 8 additions & 5 deletions lib/mix/lib/mix/tasks/compile.erlang.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ defmodule Mix.Tasks.Compile.Erlang do
@impl true
def run(args) do
{opts, _, _} = OptionParser.parse(args, switches: @switches)
project = Mix.Project.config()
source_paths = project[:erlc_paths]
Mix.Compilers.Erlang.assert_valid_erlc_paths(source_paths)
files = Mix.Utils.extract_files(source_paths, [:erl])
do_run(files, opts, project, source_paths)

Mix.Lock.lock(Mix.Tasks.Compile.compile_lock_key(), fn ->
project = Mix.Project.config()
source_paths = project[:erlc_paths]
Mix.Compilers.Erlang.assert_valid_erlc_paths(source_paths)
files = Mix.Utils.extract_files(source_paths, [:erl])
do_run(files, opts, project, source_paths)
end)
end

defp do_run([], _, _, _), do: {:noop, []}
Expand Down
Loading
Loading