diff --git a/lib/iex/lib/iex/helpers.ex b/lib/iex/lib/iex/helpers.ex index ad7a0e4c41b..b14fe542a9a 100644 --- a/lib/iex/lib/iex/helpers.ex +++ b/lib/iex/lib/iex/helpers.ex @@ -136,7 +136,14 @@ defmodule IEx.Helpers do if is_nil(project) or project.__info__(:compile)[:source] == String.to_charlist(Path.absname("mix.exs")) do - do_recompile(options) + Mix.Project.with_build_lock(fn -> + purge_result = IEx.MixListener.purge() + + case do_recompile(options) do + :noop -> purge_result + compile_result -> compile_result + end + end) else message = "Cannot recompile because the current working directory changed" IO.puts(IEx.color(:eval_error, message)) diff --git a/lib/iex/lib/iex/mix_listener.ex b/lib/iex/lib/iex/mix_listener.ex new file mode 100644 index 00000000000..c905a93c764 --- /dev/null +++ b/lib/iex/lib/iex/mix_listener.ex @@ -0,0 +1,55 @@ +defmodule IEx.MixListener do + @moduledoc false + + use GenServer + + @name __MODULE__ + + @spec start_link(keyword) :: GenServer.on_start() + def start_link(_opts) do + GenServer.start_link(__MODULE__, {}, name: @name) + end + + @doc """ + Unloads all modules invalidated by external compilations. + """ + @spec purge :: :ok | :noop + def purge do + GenServer.call(@name, :purge, :infinity) + end + + @impl true + def init({}) do + {:ok, %{to_purge: MapSet.new()}} + end + + @impl true + def handle_call(:purge, _from, state) do + for module <- state.to_purge do + :code.purge(module) + :code.delete(module) + end + + status = if Enum.empty?(state.to_purge), do: :noop, else: :ok + + {:reply, status, %{state | to_purge: MapSet.new()}} + end + + @impl true + def handle_info({:modules_compiled, info}, state) do + if info.os_pid == System.pid() do + # Ignore compilations from ourselves, because the modules are + # already updated in memory + {:noreply, state} + else + %{changed: changed, removed: removed} = info.modules_diff + state = update_in(state.to_purge, &Enum.into(changed, &1)) + state = update_in(state.to_purge, &Enum.into(removed, &1)) + {:noreply, state} + end + end + + def handle_info(_message, state) do + {:noreply, state} + end +end diff --git a/lib/mix/lib/mix.ex b/lib/mix/lib/mix.ex index dfe2562220d..dface59a498 100644 --- a/lib/mix/lib/mix.ex +++ b/lib/mix/lib/mix.ex @@ -402,7 +402,7 @@ defmodule Mix do def start(_type, []) do Mix.Local.append_archives() Mix.Local.append_paths() - children = [Mix.State, Mix.TasksServer, Mix.ProjectStack] + children = [Mix.Sync.PubSub, Mix.State, Mix.TasksServer, Mix.ProjectStack] opts = [strategy: :one_for_one, name: Mix.Supervisor, max_restarts: 0] Supervisor.start_link(children, opts) end diff --git a/lib/mix/lib/mix/compilers/elixir.ex b/lib/mix/lib/mix/compilers/elixir.ex index fd29b6a5aee..cae665d4776 100644 --- a/lib/mix/lib/mix/compilers/elixir.ex +++ b/lib/mix/lib/mix/compilers/elixir.ex @@ -202,7 +202,16 @@ defmodule Mix.Compilers.Elixir do put_compile_env(sources) all_warnings = previous_warnings ++ runtime_warnings ++ compile_warnings - unless_previous_warnings_as_errors(previous_warnings, opts, {:ok, all_warnings}) + + lazy_modules_diff = fn -> + modules_diff(modules, removed_modules, all_modules, timestamp) + end + + unless_previous_warnings_as_errors( + previous_warnings, + opts, + {:ok, all_warnings, lazy_modules_diff} + ) {:error, errors, %{runtime_warnings: r_warnings, compile_warnings: c_warnings}, state} -> # In case of errors, we show all previous warnings and all new ones. @@ -210,7 +219,7 @@ defmodule Mix.Compilers.Elixir do errors = Enum.map(errors, &diagnostic/1) warnings = Enum.map(r_warnings ++ c_warnings, &diagnostic/1) all_warnings = Keyword.get(opts, :all_warnings, errors == []) - {:error, previous_warnings(sources, all_warnings) ++ warnings ++ errors} + {:error, previous_warnings(sources, all_warnings) ++ warnings ++ errors, nil} after Code.compiler_options(previous_opts) end @@ -247,7 +256,12 @@ defmodule Mix.Compilers.Elixir do all_warnings = Keyword.get(opts, :all_warnings, true) previous_warnings = previous_warnings(sources, all_warnings) - unless_previous_warnings_as_errors(previous_warnings, opts, {status, previous_warnings}) + + unless_previous_warnings_as_errors( + previous_warnings, + opts, + {status, previous_warnings, nil} + ) end end @@ -989,16 +1003,38 @@ defmodule Mix.Compilers.Elixir do File.rm(manifest <> ".checkpoint") end - defp unless_previous_warnings_as_errors(previous_warnings, opts, {status, all_warnings}) do + defp unless_previous_warnings_as_errors( + previous_warnings, + opts, + {status, all_warnings, modules_diff} + ) do if previous_warnings != [] and opts[:warnings_as_errors] do message = "Compilation failed due to warnings while using the --warnings-as-errors option" IO.puts(:stderr, message) - {:error, all_warnings} + {:error, all_warnings, modules_diff} else - {status, all_warnings} + {status, all_warnings, modules_diff} end end + defp modules_diff(compiled_modules, removed_modules, all_modules, timestamp) do + {changed, added} = + compiled_modules + |> Map.keys() + |> Enum.split_with(&Map.has_key?(all_modules, &1)) + + # Note that removed_modules may also include changed modules + removed = + for {module, _} <- removed_modules, not Map.has_key?(compiled_modules, module), do: module + + %{ + added: added, + changed: changed, + removed: removed, + timestamp: timestamp + } + end + ## Compiler loop # The compiler is invoked in a separate process so we avoid blocking its main loop. diff --git a/lib/mix/lib/mix/project.ex b/lib/mix/lib/mix/project.ex index ec10da1d000..f37b1e51b1b 100644 --- a/lib/mix/lib/mix/project.ex +++ b/lib/mix/lib/mix/project.ex @@ -896,7 +896,7 @@ defmodule Mix.Project do Mix.shell().info("Waiting for lock on the build directory (held by process #{os_pid})") end - Mix.Lock.with_lock(build_path, fun, on_taken: on_taken) + Mix.Sync.Lock.with_lock(build_path, fun, on_taken: on_taken) end @doc false @@ -910,7 +910,7 @@ defmodule Mix.Project do Mix.shell().info("Waiting for lock on the deps directory (held by process #{os_pid})") end - Mix.Lock.with_lock(deps_path, fun, on_taken: on_taken) + Mix.Sync.Lock.with_lock(deps_path, fun, on_taken: on_taken) end # Loads mix.exs in the current directory or loads the project from the diff --git a/lib/mix/lib/mix/pubsub.ex b/lib/mix/lib/mix/pubsub.ex new file mode 100644 index 00000000000..251cbbda8a9 --- /dev/null +++ b/lib/mix/lib/mix/pubsub.ex @@ -0,0 +1,107 @@ +defmodule Mix.PubSub do + @moduledoc false + + # The Mix pub/sub is responsible for notifying other OS processes + # about relevant concurrent events. + # + # The pub/sub consists of a local subscriber process that receives + # events from other OS processes, and a listener supervisor, which + # holds all listener processes that have been configured. Whenever + # the subscriber receives an event, it sends a message to each of + # the listeners. + # + # Inherently, a compilation may be required before the listener + # modules are available, so we start the local subscriber process + # separately with `start/0`, and then start the listeners later + # with `start_listeners/0`. The subscriber is going to accumulate + # events and reply them once the listenres are started. + + @spec start :: :ok + def start do + # Avoid calling the supervisor, if already started + if Process.whereis(Mix.PubSub) do + :ok + else + case Supervisor.start_child(Mix.Supervisor, Mix.PubSub) do + {:ok, _pid} -> + :ok + + {:error, {:already_started, _pid}} -> + :ok + + {:error, reason} -> + raise RuntimeError, "failed to start Mix.PubSub, reason: #{inspect(reason)}" + end + end + end + + @spec child_spec(term) :: Supervisor.child_spec() + def child_spec(_opts) do + children = [ + Mix.PubSub.Subscriber + ] + + opts = [strategy: :one_for_one, name: Mix.PubSub] + + %{ + id: Mix.PubSub, + start: {Supervisor, :start_link, [children, opts]}, + type: :supervisor + } + end + + @spec start_listeners :: :ok + def start_listeners do + # Avoid calling the supervisor, if already started + if Process.whereis(Mix.PubSub.ListenerSupervisor) do + :ok + else + case Supervisor.start_child(Mix.PubSub, listener_supervisor()) do + {:ok, _pid} -> + Mix.PubSub.Subscriber.flush() + :ok + + {:error, {:already_started, _pid}} -> + :ok + + {:error, reason} -> + raise RuntimeError, + "failed to start Mix.PubSub.ListenerSupervisor, reason: #{inspect(reason)}" + end + end + end + + defp listener_supervisor do + children = configured_listeners() ++ built_in_listeners() + + children = Enum.map(children, &Supervisor.child_spec(&1, [])) + + opts = [strategy: :one_for_one, name: Mix.PubSub.ListenerSupervisor] + + %{ + id: Mix.PubSub.ListenerSupervisor, + start: {Supervisor, :start_link, [children, opts]}, + type: :supervisor + } + end + + defp configured_listeners do + config = Mix.Project.config() + + listeners = + Application.get_env(:mix, :listeners, []) ++ + Keyword.get(config, :listeners, []) + + Enum.uniq(listeners) + end + + defp built_in_listeners do + iex_started? = Process.whereis(IEx.Supervisor) != nil + + if iex_started? do + [IEx.MixListener] + else + [] + end + end +end diff --git a/lib/mix/lib/mix/pubsub/subscriber.ex b/lib/mix/lib/mix/pubsub/subscriber.ex new file mode 100644 index 00000000000..6c944d43ae2 --- /dev/null +++ b/lib/mix/lib/mix/pubsub/subscriber.ex @@ -0,0 +1,51 @@ +defmodule Mix.PubSub.Subscriber do + @moduledoc false + + use GenServer + + @name __MODULE__ + + @spec start_link(keyword) :: GenServer.on_start() + def start_link(_opts) do + GenServer.start_link(__MODULE__, {}, name: @name) + end + + @spec flush :: :ok + def flush do + GenServer.cast(@name, :flush) + end + + @impl true + def init({}) do + build_path = Mix.Project.build_path() + Mix.Sync.PubSub.subscribe(build_path) + {:ok, %{acc: []}} + end + + @impl true + def handle_info(message, %{acc: nil} = state) do + notify_listeners([message]) + {:noreply, state} + end + + def handle_info(message, state) do + # Accumulate messages until the flush + {:noreply, update_in(state.acc, &[message | &1])} + end + + @impl true + def handle_cast(:flush, state) do + notify_listeners(Enum.reverse(state.acc)) + {:noreply, %{state | acc: nil}} + end + + defp notify_listeners(messages) do + children = Supervisor.which_children(Mix.PubSub.ListenerSupervisor) + + for message <- messages do + for {_, pid, _, _} <- children, is_pid(pid) do + send(pid, message) + end + end + end +end diff --git a/lib/mix/lib/mix/lock.ex b/lib/mix/lib/mix/sync/lock.ex similarity index 91% rename from lib/mix/lib/mix/lock.ex rename to lib/mix/lib/mix/sync/lock.ex index 855bb891633..5253d610e09 100644 --- a/lib/mix/lib/mix/lock.ex +++ b/lib/mix/lib/mix/sync/lock.ex @@ -1,4 +1,4 @@ -defmodule Mix.Lock do +defmodule Mix.Sync.Lock do @moduledoc false # Lock implementation working across multiple OS processes. @@ -66,7 +66,7 @@ defmodule Mix.Lock do @loopback {127, 0, 0, 1} @listen_opts [:binary, ip: @loopback, packet: :raw, nodelay: true, backlog: 128, active: false] @connect_opts [:binary, packet: :raw, nodelay: true, active: false] - @probe_data "elixirlock" + @probe_data "mixlock" @probe_data_size byte_size(@probe_data) @probe_timeout_ms 5_000 @@ -92,8 +92,8 @@ defmodule Mix.Lock do def with_lock(key, fun, opts \\ []) do opts = Keyword.validate!(opts, [:on_taken]) - key = key |> :erlang.md5() |> Base.url_encode64(padding: false) - path = Path.join([System.tmp_dir!(), "mix_lock", key]) + hash = key |> :erlang.md5() |> Base.url_encode64(padding: false) + path = Path.join([System.tmp_dir!(), "mix_lock", hash]) pdict_key = {__MODULE__, path} has_lock? = Process.get(pdict_key) @@ -207,17 +207,17 @@ defmodule Mix.Lock do end defp accept_loop(listen_socket) do - case :gen_tcp.accept(listen_socket) do + case accept(listen_socket) do {:ok, socket} -> _ = :gen_tcp.send(socket, @probe_data) accept_loop(listen_socket) - # eintr is "Interrupted system call". - {:error, :eintr} -> - accept_loop(listen_socket) - {:error, reason} when reason in [:closed, :einval] -> :ok + + {:error, reason} -> + raise RuntimeError, + "failed to accept connection in #{inspect(__MODULE__)}.receive_event/1, reason: #{inspect(reason)}" end end @@ -255,7 +255,7 @@ defmodule Mix.Lock do end defp await_probe_data(socket) do - case :gen_tcp.recv(socket, @probe_data_size, @probe_timeout_ms) do + case recv(socket, @probe_data_size, @probe_timeout_ms) do {:ok, @probe_data} -> {:ok, socket} @@ -263,15 +263,25 @@ defmodule Mix.Lock do :gen_tcp.close(socket) {:error, :unexpected_port_owner} - {:error, :eintr} -> - await_probe_data(socket) - {:error, reason} -> :gen_tcp.close(socket) {:error, reason} end end + defp recv(socket, size, timeout \\ :infinity) do + # eintr is "Interrupted system call". + with {:error, :eintr} <- :gen_tcp.recv(socket, size, timeout) do + recv(socket, size, timeout) + end + end + + defp accept(socket) do + with {:error, :eintr} <- :gen_tcp.accept(socket) do + accept(socket) + end + end + defp take_over(path, port_path) do # The operations here must happen in precise order, so if anything # fails, we keep the files as is and the next process that grabs @@ -284,23 +294,23 @@ defmodule Mix.Lock do names = File.ls!(path) + # On Windows, removing a file may fail if the file is open, so we + # ignore failures just to be safe + for "port_" <> _ = name <- names do - File.rm!(Path.join(path, name)) + _ = File.rm(Path.join(path, name)) end for "lock_" <> _ = name <- names, name != "lock_0" do - File.rm!(Path.join(path, name)) + _ = File.rm(Path.join(path, name)) end end defp await_close(socket) do - case :gen_tcp.recv(socket, 0) do + case recv(socket, 0) do {:error, :closed} -> :ok - {:error, :eintr} -> - await_close(socket) - {:error, _other} -> # In case of an unexpected error, we close the socket ourselves # to retry diff --git a/lib/mix/lib/mix/sync/pubsub.ex b/lib/mix/lib/mix/sync/pubsub.ex new file mode 100644 index 00000000000..8c72406e944 --- /dev/null +++ b/lib/mix/lib/mix/sync/pubsub.ex @@ -0,0 +1,292 @@ +defmodule Mix.Sync.PubSub do + @moduledoc false + + # Pub/sub implementation working across multiple OS processes. + # + # The pub/sub is implemented using TCP sockets. + # + # Every subscriber opens a TCP socket and creates a port_P file. + # A publisher lists the directory, connects to each port and sends + # the message payload. When subscriber processes terminate, the + # files are left behind, so whenever a publisher fails to connect + # to one of the ports, it removes the corresponding file. + # + # We use a single socket for all the subscriptions. We create the + # socket lazily on the first subscription. Since the socket is + # shared, we include the hashed key in the broadcast payload, so + # that we can determine which subscribers to notify. + + use GenServer + + @opaque subscription :: %{socket: :gen_tcp.socket()} + + @loopback {127, 0, 0, 1} + @listen_opts [:binary, ip: @loopback, packet: :raw, nodelay: true, backlog: 128, active: false] + @connect_opts [:binary, packet: :raw, nodelay: true, active: false] + + @tag_data "mixpubsub" + @tag_data_size byte_size(@tag_data) + + @name __MODULE__ + + @doc false + def start_link(_opts) do + GenServer.start_link(__MODULE__, {}, name: @name) + end + + @doc """ + Subscribes the caller to messages for `key`. + + The messages are delivered to the caller's message queue. + """ + @spec subscribe(String.t()) :: :ok + def subscribe(key) do + case GenServer.call(@name, {:subscribe, self(), key}, :infinity) do + :ok -> :ok + {:error, message} -> Mix.raise(message) + end + end + + @doc """ + Sends `message` to all processes subscribed to `key`. + """ + @spec broadcast(String.t(), term() | (-> term())) :: :ok + def broadcast(key, message) do + hash = hash(key) + path = path(hash) + + case File.ls(path) do + {:ok, []} -> + :ok + + {:ok, names} -> + message = + case message do + lazy_message when is_function(lazy_message, 0) -> lazy_message.() + message -> message + end + + binary = :erlang.term_to_binary(message) + + for "port_" <> port = name <- names do + port = String.to_integer(port) + port_path = Path.join(path, name) + _ = send_message(port_path, port, hash, binary) + end + + :ok + + {:error, :enoent} -> + :ok + + {:error, reason} -> + raise File.Error, reason: reason, action: "list directory", path: path + end + end + + defp send_message(port_path, port, hash, binary) do + # On Windows connecting to an unbound port takes a few seconds to + # fail, so instead we shortcut the check by attempting a listen, + # which succeeds or fails immediately. Note that `reuseaddr` here + # ensures that if the listening socket closed recently, we can + # immediately reclaim the same port. + # + # Also, if we manage to bind, it means the port is free and the + # subscription is no longer active, so we remove the file. It is + # important we remove the file while holding onto the port, so + # it is not claimed by a new subscriber as we remove the file. + # + # Finally, if we fail to connect, we could attempt to bind again + # and remove the file, but we just leave it up to the next send + # attempt. + + case :gen_tcp.listen(port, [reuseaddr: true] ++ @listen_opts) do + {:ok, socket} -> + _ = File.rm(port_path) + :gen_tcp.close(socket) + {:error, :econnrefused} + + {:error, _reason} -> + with {:ok, socket} <- :gen_tcp.connect(@loopback, port, @connect_opts) do + size = byte_size(binary) + + result = + :gen_tcp.send( + socket, + <<@tag_data, hash::binary, size::unsigned-integer-64, binary::binary>> + ) + + :gen_tcp.shutdown(socket, :read_write) + :gen_tcp.close(socket) + + result + end + end + end + + @impl true + def init({}) do + state = %{port: nil, hash_to_pids: %{}} + {:ok, state} + end + + @impl true + def handle_call({:subscribe, pid, key}, _from, state) do + Process.monitor(pid) + + case ensure_socket(state) do + {:ok, state} -> + hash = hash(key) + path = path(hash) + + create_subscription_file(path, state.port) + + state = + update_in(state.hash_to_pids, fn + %{^hash => pids} = hash_to_pids -> + %{hash_to_pids | hash => MapSet.put(pids, pid)} + + %{} = hash_to_pids -> + Map.put(hash_to_pids, hash, MapSet.new([pid])) + end) + + {:reply, :ok, state} + + {:error, error} -> + {:reply, {:error, error}, state} + end + end + + @impl true + def handle_info({:message, hash, message}, state) do + if pids = state.hash_to_pids[hash] do + for pid <- pids do + send(pid, message) + end + end + + {:noreply, state} + end + + def handle_info({:DOWN, _, :process, pid, _}, state) do + hash_to_pids = + for {hash, pids} <- state.hash_to_pids, + pids = remove_pid(pids, pid, hash, state.port), + into: %{}, + do: {hash, pids} + + {:noreply, %{state | hash_to_pids: hash_to_pids}} + end + + defp remove_pid(pids, pid, hash, port) do + pids = MapSet.delete(pids, pid) + + if Enum.empty?(pids) do + path = path(hash) + remove_subscription_file(path, port) + nil + else + pids + end + end + + defp ensure_socket(%{port: nil} = state) do + case listen() do + {:ok, socket, port} -> + parent = self() + spawn_link(fn -> receive_message_loop(socket, parent) end) + {:ok, %{state | port: port}} + + {:error, reason} -> + {:error, + "failed to open a TCP socket in #{inspect(__MODULE__)}.subscribe/1, reason: #{inspect(reason)}"} + end + end + + defp ensure_socket(state), do: {:ok, state} + + defp listen do + with {:ok, socket} <- :gen_tcp.listen(0, @listen_opts) do + case :inet.port(socket) do + {:ok, port} -> + {:ok, socket, port} + + {:error, reason} -> + :gen_tcp.close(socket) + {:error, reason} + end + end + end + + defp create_subscription_file(path, port) do + File.mkdir_p!(path) + port_path = Path.join(path, "port_#{port}") + File.touch!(port_path) + end + + defp remove_subscription_file(path, port) do + port_path = Path.join(path, "port_#{port}") + _ = File.rm(port_path) + end + + defp receive_message_loop(socket, parent) do + {hash, message} = receive_message(socket) + send(parent, {:message, hash, message}) + receive_message_loop(socket, parent) + end + + defp receive_message(listen_socket) do + # Note that receive failures may happen if the sender terminates + # abruptly. We ignore such attempts and wait for the next message. + # + # Also, the first recv has a timeout in case an unrelated process + # connects to the port and doesn't send any data. + + case accept(listen_socket) do + {:ok, socket} -> + with {:ok, <<@tag_data, hash::binary-size(16), size::unsigned-integer-64>>} <- + recv(socket, @tag_data_size + 16 + 8, 1_000), + {:ok, binary} <- recv(socket, size), + {:ok, data} <- decode_data(binary) do + :gen_tcp.close(socket) + {hash, data} + else + _other -> + :gen_tcp.close(socket) + receive_message(listen_socket) + end + + {:error, reason} -> + raise RuntimeError, + "failed to accept connection in #{inspect(__MODULE__)}.receive_message/1, reason: #{inspect(reason)}" + end + end + + defp decode_data(binary) do + try do + {:ok, :erlang.binary_to_term(binary)} + rescue + _error -> :error + end + end + + defp hash(key), do: :erlang.md5(key) + + defp path(hash) do + hash = Base.url_encode64(hash, padding: false) + Path.join([System.tmp_dir!(), "mix_pubsub", hash]) + end + + defp recv(socket, size, timeout \\ :infinity) do + # eintr is "Interrupted system call". + with {:error, :eintr} <- :gen_tcp.recv(socket, size, timeout) do + recv(socket, size) + end + end + + defp accept(socket) do + with {:error, :eintr} <- :gen_tcp.accept(socket) do + accept(socket) + end + end +end diff --git a/lib/mix/lib/mix/task.compiler.ex b/lib/mix/lib/mix/task.compiler.ex index cd0856d4202..5ad6b97dd8e 100644 --- a/lib/mix/lib/mix/task.compiler.ex +++ b/lib/mix/lib/mix/task.compiler.ex @@ -26,6 +26,49 @@ defmodule Mix.Task.Compiler do A compiler supports the same attributes for configuration and documentation as a regular Mix task. See `Mix.Task` for more information. + + ## Listening to compilation + + When a running a long-lived Mix task you may want to detect compilations + triggered in a separate OS process, for example, to reload the modules. + In order to do that, the Mix project may configure listeners: + + def project do + [ + ..., + listeners: [SomeDep.MixListener] + ] + end + + or + + config :mix, :listeners, [SomeDep.MixListener] + + Each entry in the list must be either `t:Supervisor.module_spec/0` or + `t:Supervisor.child_spec/0`. Additionally, the listener module must + be defined in a dependency of the project, not the project itself. + + The listener process receives the following messages: + + * `{:modules_compiled, info}` - delivered after a set of modules is + compiled. `info` is a map with the following keys: + + * `:app` - app which modules have been compiled. + + * `:build_scm` - the SCM module of the compiled project. + + * `:modules_diff` - information about the compiled modules. The + value is a map with keys: `:added`, `:changed`, `:removed`, + where each holds a list of modules. There is also a `:timestamp` + key, which matches the modification time of all the compiled + module files. + + * `:os_pid` - the operating system PID of the process that run + the compilation. The value is a string and it can be compared + with `System.pid/0` to determine if compilation happened in + the same OS process as the listener. + + Note that the listener starts before any of the project apps are started. """ defmodule Diagnostic do @@ -162,4 +205,31 @@ defmodule Mix.Task.Compiler do {:noop, []} end end + + @doc false + # Broadcast an event about a finished compilation of a set of modules. + @spec notify_modules_compiled((-> modules_diff)) :: :ok + when modules_diff: %{ + added: [module], + changed: [module], + removed: [module], + timestamp: integer + } + def notify_modules_compiled(lazy_modules_diff) when is_function(lazy_modules_diff, 0) do + config = Mix.Project.config() + build_path = Mix.Project.build_path(config) + + lazy_message = fn -> + info = %{ + app: config[:app], + build_scm: config[:build_scm], + modules_diff: lazy_modules_diff.(), + os_pid: System.pid() + } + + {:modules_compiled, info} + end + + Mix.Sync.PubSub.broadcast(build_path, lazy_message) + end end diff --git a/lib/mix/lib/mix/tasks/compile.elixir.ex b/lib/mix/lib/mix/tasks/compile.elixir.ex index 7ae75848361..734f25b7871 100644 --- a/lib/mix/lib/mix/tasks/compile.elixir.ex +++ b/lib/mix/lib/mix/tasks/compile.elixir.ex @@ -127,15 +127,22 @@ defmodule Mix.Tasks.Compile.Elixir do with_logger_app(project, fn -> Mix.Project.with_build_lock(project, fn -> - Mix.Compilers.Elixir.compile( - manifest, - srcs, - dest, - cache_key, - Mix.Tasks.Compile.Erlang.manifests(), - Mix.Tasks.Compile.Erlang.modules(), - opts - ) + {status, warnings, lazy_modules_diff} = + Mix.Compilers.Elixir.compile( + manifest, + srcs, + dest, + cache_key, + Mix.Tasks.Compile.Erlang.manifests(), + Mix.Tasks.Compile.Erlang.modules(), + opts + ) + + if lazy_modules_diff do + Mix.Task.Compiler.notify_modules_compiled(lazy_modules_diff) + end + + {status, warnings} end) end) end diff --git a/lib/mix/lib/mix/tasks/compile.ex b/lib/mix/lib/mix/tasks/compile.ex index 72224d0d741..19378f2bffd 100644 --- a/lib/mix/lib/mix/tasks/compile.ex +++ b/lib/mix/lib/mix/tasks/compile.ex @@ -149,9 +149,7 @@ defmodule Mix.Tasks.Compile do def run(args) do Mix.Project.get!() - # We run loadpaths to perform checks but we don't bother setting - # up the load paths because compile.all will manage them anyway. - Mix.Task.run("loadpaths", ["--no-path-loading" | args]) + Mix.Task.run("loadpaths", args) {opts, _, _} = OptionParser.parse(args, switches: [erl_config: :string]) load_erl_config(opts) diff --git a/lib/mix/lib/mix/tasks/deps.loadpaths.ex b/lib/mix/lib/mix/tasks/deps.loadpaths.ex index efca17f2bb2..03fa0093d4e 100644 --- a/lib/mix/lib/mix/tasks/deps.loadpaths.ex +++ b/lib/mix/lib/mix/tasks/deps.loadpaths.ex @@ -12,6 +12,11 @@ defmodule Mix.Tasks.Deps.Loadpaths do Although this task does not show up in `mix help`, it is part of Mix public API and can be depended on. + ## Configuration + + * `:listeners` - the list of listener modules. For more details + see `Mix.Task.Compiler` + ## Command line options * `--no-archives-check` - does not check archives @@ -19,12 +24,13 @@ defmodule Mix.Tasks.Deps.Loadpaths do * `--no-deps-check` - does not check or compile deps, only load available ones * `--no-elixir-version-check` - does not check Elixir version * `--no-optional-deps` - does not compile or load optional deps - * `--no-path-loading` - does not add entries to the code path """ @impl true def run(args) do + Mix.PubSub.start() + if "--no-archives-check" not in args do Mix.Task.run("archive.check", args) end @@ -48,9 +54,11 @@ defmodule Mix.Tasks.Deps.Loadpaths do deps_check(all, "--no-compile" in args) end - if "--no-path-loading" not in args do - Code.prepend_paths(Enum.flat_map(all, &Mix.Dep.load_paths/1), cache: true) - end + Code.prepend_paths(Enum.flat_map(all, &Mix.Dep.load_paths/1), cache: true) + + # For now we only allow listeners defined in dependencies, so + # we start them right after adding adding deps to the path + Mix.PubSub.start_listeners() :ok end diff --git a/lib/mix/lib/mix/tasks/loadpaths.ex b/lib/mix/lib/mix/tasks/loadpaths.ex index 2a9e12d515d..fffe930f5fc 100644 --- a/lib/mix/lib/mix/tasks/loadpaths.ex +++ b/lib/mix/lib/mix/tasks/loadpaths.ex @@ -22,7 +22,6 @@ defmodule Mix.Tasks.Loadpaths do * `--no-deps-check` - does not check dependencies, only load available ones * `--no-elixir-version-check` - does not check Elixir version * `--no-optional-deps` - does not compile or load optional deps - * `--no-path-loading` - does not add entries to the code path """ @impl true @@ -38,13 +37,13 @@ defmodule Mix.Tasks.Loadpaths do end if config[:app] do - load_project(config, args) + load_project(config) end :ok end - defp load_project(config, args) do + defp load_project(config) do vsn = {System.version(), :erlang.system_info(:otp_release)} scm = config[:build_scm] @@ -61,9 +60,7 @@ defmodule Mix.Tasks.Loadpaths do :ok end - if "--no-path-loading" not in args do - # We don't cache the current application as we may still write to it - Code.prepend_path(Mix.Project.compile_path(config)) - end + # We don't cache the current application as we may still write to it + Code.prepend_path(Mix.Project.compile_path(config)) end end diff --git a/lib/mix/test/fixtures/compile_listeners/deps/reloader/lib/reloader.ex b/lib/mix/test/fixtures/compile_listeners/deps/reloader/lib/reloader.ex new file mode 100644 index 00000000000..bb4027bfffe --- /dev/null +++ b/lib/mix/test/fixtures/compile_listeners/deps/reloader/lib/reloader.ex @@ -0,0 +1,32 @@ +defmodule Reloader do + use GenServer + + def start_link(_opts) do + GenServer.start_link(__MODULE__, {}) + end + + @impl true + def init({}) do + {:ok, {}} + end + + @impl true + def handle_info({:modules_compiled, info}, state) do + %{ + modules_diff: %{added: added, changed: changed, removed: removed, timestamp: _timestamp}, + app: app, + build_scm: build_scm, + os_pid: os_pid + } = info + + IO.write(""" + Received :modules_compiled with + added: #{inspect(Enum.sort(added))}, changed: #{inspect(Enum.sort(changed))}, removed: #{inspect(Enum.sort(removed))} + app: #{inspect(app)} + build_scm: #{inspect(build_scm)} + os_pid: #{inspect(os_pid)} + """) + + {:noreply, state} + end +end diff --git a/lib/mix/test/fixtures/compile_listeners/deps/reloader/mix.exs b/lib/mix/test/fixtures/compile_listeners/deps/reloader/mix.exs new file mode 100644 index 00000000000..5f6f6b8bd09 --- /dev/null +++ b/lib/mix/test/fixtures/compile_listeners/deps/reloader/mix.exs @@ -0,0 +1,16 @@ +defmodule Reloader.MixProject do + use Mix.Project + + def project do + [ + app: :reloader, + version: "0.1.0" + ] + end + + def application do + [ + extra_applications: [:logger] + ] + end +end diff --git a/lib/mix/test/mix/lock_test.exs b/lib/mix/test/mix/sync/lock_test.exs similarity index 70% rename from lib/mix/test/mix/lock_test.exs rename to lib/mix/test/mix/sync/lock_test.exs index 1b902916046..dd3382fb1f9 100644 --- a/lib/mix/test/mix/lock_test.exs +++ b/lib/mix/test/mix/sync/lock_test.exs @@ -1,31 +1,33 @@ -Code.require_file("../test_helper.exs", __DIR__) +Code.require_file("../../test_helper.exs", __DIR__) -defmodule Mix.LockTest do +defmodule Mix.Sync.LockTest do use ExUnit.Case, async: true + alias Mix.Sync.Lock + @lock_key Atom.to_string(__MODULE__) test "executes functions" do - assert Mix.Lock.with_lock(@lock_key, fn -> :it_works! end) == :it_works! - assert Mix.Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! + assert Lock.with_lock(@lock_key, fn -> :it_works! end) == :it_works! + assert Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! end test "releases lock on error" do assert_raise RuntimeError, fn -> - Mix.Lock.with_lock(@lock_key, fn -> raise "oops" end) + Lock.with_lock(@lock_key, fn -> raise "oops" end) end - assert Mix.Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! + assert Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! end test "releases lock on exit" do {_pid, ref} = spawn_monitor(fn -> - Mix.Lock.with_lock(@lock_key, fn -> Process.exit(self(), :kill) end) + Lock.with_lock(@lock_key, fn -> Process.exit(self(), :kill) end) end) assert_receive {:DOWN, ^ref, _, _, _} - assert Mix.Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! + assert Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! end test "blocks until released" do @@ -33,7 +35,7 @@ defmodule Mix.LockTest do task = Task.async(fn -> - Mix.Lock.with_lock(@lock_key, fn -> + Lock.with_lock(@lock_key, fn -> send(parent, :locked) assert_receive :will_lock :it_works! @@ -42,7 +44,7 @@ defmodule Mix.LockTest do assert_receive :locked send(task.pid, :will_lock) - assert Mix.Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! + assert Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! assert Task.await(task) == :it_works! end @@ -52,7 +54,7 @@ defmodule Mix.LockTest do {pid, ref} = spawn_monitor(fn -> - Mix.Lock.with_lock(@lock_key, fn -> + Lock.with_lock(@lock_key, fn -> send(parent, :locked) assert_receive :will_lock raise "oops" @@ -61,7 +63,7 @@ defmodule Mix.LockTest do assert_receive :locked send(pid, :will_lock) - assert Mix.Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! + assert Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! assert_receive {:DOWN, ^ref, _, _, _} end @@ -70,7 +72,7 @@ defmodule Mix.LockTest do {pid, ref} = spawn_monitor(fn -> - Mix.Lock.with_lock(@lock_key, fn -> + Lock.with_lock(@lock_key, fn -> send(parent, :locked) assert_receive :will_not_lock end) @@ -78,15 +80,15 @@ defmodule Mix.LockTest do assert_receive :locked Process.exit(pid, :kill) - assert Mix.Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! + assert Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! assert_receive {:DOWN, ^ref, _, _, _} end test "schedules and releases on exit" do - assert Mix.Lock.with_lock(@lock_key, fn -> + assert Lock.with_lock(@lock_key, fn -> {pid, ref} = spawn_monitor(fn -> - Mix.Lock.with_lock(@lock_key, fn -> + Lock.with_lock(@lock_key, fn -> raise "this will never be invoked" end) end) @@ -96,7 +98,7 @@ defmodule Mix.LockTest do :it_works! end) == :it_works! - assert Mix.Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! + assert Lock.with_lock(@lock_key, fn -> :still_works! end) == :still_works! end @tag :tmp_dir @@ -110,7 +112,7 @@ defmodule Mix.LockTest do refs = for _ <- 1..n do spawn_monitor(fn -> - Mix.Lock.with_lock(@lock_key, fn -> + Lock.with_lock(@lock_key, fn -> number = number_path |> File.read!() |> String.to_integer() new_number = number + 1 File.write!(number_path, Integer.to_string(new_number)) @@ -135,8 +137,8 @@ defmodule Mix.LockTest do test "lock can be acquired multiple times by the same process" do {_pid, ref} = spawn_monitor(fn -> - Mix.Lock.with_lock(@lock_key, fn -> - Mix.Lock.with_lock(@lock_key, fn -> + Lock.with_lock(@lock_key, fn -> + Lock.with_lock(@lock_key, fn -> Process.exit(self(), :kill) end) end) @@ -144,8 +146,8 @@ defmodule Mix.LockTest do assert_receive {:DOWN, ^ref, _, _, _} - assert Mix.Lock.with_lock(@lock_key, fn -> - Mix.Lock.with_lock(@lock_key, fn -> + assert Lock.with_lock(@lock_key, fn -> + Lock.with_lock(@lock_key, fn -> :still_works! end) end) == :still_works! @@ -156,7 +158,7 @@ defmodule Mix.LockTest do {pid, ref} = spawn_monitor(fn -> - Mix.Lock.with_lock(@lock_key, fn -> + Lock.with_lock(@lock_key, fn -> send(parent, :locked) assert_receive :will_lock end) @@ -169,7 +171,7 @@ defmodule Mix.LockTest do send(self(), {:on_taken_called, os_pid}) end - assert Mix.Lock.with_lock(@lock_key, fn -> :it_works! end, on_taken: on_taken) == :it_works! + assert Lock.with_lock(@lock_key, fn -> :it_works! end, on_taken: on_taken) == :it_works! os_pid = System.pid() assert_receive {:on_taken_called, ^os_pid} diff --git a/lib/mix/test/mix/sync/pubsub_test.exs b/lib/mix/test/mix/sync/pubsub_test.exs new file mode 100644 index 00000000000..a653d85551f --- /dev/null +++ b/lib/mix/test/mix/sync/pubsub_test.exs @@ -0,0 +1,92 @@ +Code.require_file("../../test_helper.exs", __DIR__) + +defmodule Mix.Sync.PubSubTest do + use ExUnit.Case, async: true + + alias Mix.Sync.PubSub + + @pubsub_key inspect(__MODULE__) + + test "delivers broadcast to multiple subscribers for the same key" do + parent = self() + + spawn_link(fn -> + PubSub.subscribe(@pubsub_key) + send(parent, :subscribed1) + assert_receive %{event: "event1"} + assert_receive %{event: "event2"} + send(parent, :done1) + end) + + spawn_link(fn -> + PubSub.subscribe(@pubsub_key) + send(parent, :subscribed2) + assert_receive %{event: "event1"} + assert_receive %{event: "event2"} + send(parent, :done2) + end) + + assert_receive :subscribed1 + assert_receive :subscribed2 + + PubSub.broadcast(@pubsub_key, %{event: "event1"}) + PubSub.broadcast(@pubsub_key, %{event: "event2"}) + + assert_receive :done1 + assert_receive :done2 + end + + test "delivers broadcast to subscribers for different keys" do + parent = self() + + spawn_link(fn -> + PubSub.subscribe([@pubsub_key, "1"]) + send(parent, :subscribed1) + assert_receive %{event: "event1"} + assert_receive %{event: "event3"} + refute_received %{event: "event2"} + send(parent, :done1) + end) + + spawn_link(fn -> + PubSub.subscribe([@pubsub_key, "2"]) + send(parent, :subscribed2) + assert_receive %{event: "event2"} + assert_receive %{event: "event3"} + refute_received %{event: "event1"} + send(parent, :done2) + end) + + assert_receive :subscribed1 + assert_receive :subscribed2 + + PubSub.broadcast([@pubsub_key, "1"], %{event: "event1"}) + PubSub.broadcast([@pubsub_key, "2"], %{event: "event2"}) + PubSub.broadcast([@pubsub_key, "1"], %{event: "event3"}) + PubSub.broadcast([@pubsub_key, "2"], %{event: "event3"}) + + assert_receive :done1 + assert_receive :done2 + end + + test "evaluates lazy message only if there are subscribers" do + lazy_message = fn -> + send(self(), :lazy1) + %{event: "event1"} + end + + PubSub.broadcast(@pubsub_key, lazy_message) + + PubSub.subscribe(@pubsub_key) + + lazy_message = fn -> + send(self(), :lazy2) + %{event: "event2"} + end + + PubSub.broadcast(@pubsub_key, lazy_message) + + refute_received :lazy1 + assert_received :lazy2 + end +end diff --git a/lib/mix/test/mix/tasks/compile_test.exs b/lib/mix/test/mix/tasks/compile_test.exs index fe49131b57a..a54561360c2 100644 --- a/lib/mix/test/mix/tasks/compile_test.exs +++ b/lib/mix/test/mix/tasks/compile_test.exs @@ -384,4 +384,113 @@ defmodule Mix.Tasks.CompileTest do assert is_list(:code.where_is_file(~c"parsetools.app")) end) end + + test "listening to concurrent compilations" do + timeout = 2_000 + + Mix.Project.pop() + + in_fixture("compile_listeners", fn -> + File.write!("mix.exs", """ + defmodule WithReloader do + use Mix.Project + + def project do + [ + app: :with_reloader, + version: "0.1.0", + deps: [{:reloader, "0.1.0", path: "deps/reloader"}], + # Register a listener from a dependency + listeners: [Reloader] + ] + end + end + """) + + File.mkdir_p!("config") + + File.mkdir_p!("lib") + + File.write!("lib/a.ex", "defmodule A do end") + File.write!("lib/b.ex", "defmodule B do end") + File.write!("lib/c.ex", "defmodule C do end") + + mix(["deps.compile"]) + + parent = self() + + spawn_link(fn -> + port = + mix_port([ + "run", + "--no-halt", + "--no-compile", + "--no-start", + "--eval", + ~s/IO.puts("ok"); IO.gets(""); System.halt()/ + ]) + + assert_receive {^port, {:data, "ok\n"}}, timeout + send(parent, :mix_started) + + assert_receive {^port, {:data, output}}, timeout + send(parent, {:output, output}) + end) + + assert_receive :mix_started, timeout + + output = mix(["do", "compile", "+", "eval", "IO.write System.pid"]) + os_pid = output |> String.split("\n") |> List.last() + + assert_receive {:output, output}, timeout + + assert output == """ + Received :modules_compiled with + added: [A, B, C], changed: [], removed: [] + app: :with_reloader + build_scm: Mix.SCM.Path + os_pid: "#{os_pid}" + """ + + # Changed + File.write!("lib/a.ex", "defmodule A do @moduledoc false end") + # Removed + File.rm!("lib/b.ex") + # New + File.write!("lib/d.ex", "defmodule D do end") + + spawn_link(fn -> + port = + mix_port([ + "run", + "--no-halt", + "--no-compile", + "--no-start", + "--eval", + ~s/IO.puts("ok"); IO.gets(""); System.halt()/ + ]) + + assert_receive {^port, {:data, "ok\n"}}, timeout + send(parent, :mix_started) + + assert_receive {^port, {:data, output}}, timeout + send(parent, {:output, output}) + end) + + assert_receive :mix_started, timeout + + output = mix(["do", "compile", "+", "eval", "IO.write System.pid"]) + os_pid = output |> String.split("\n") |> List.last() + + assert_receive {:output, output}, timeout + + assert output == """ + Received :modules_compiled with + added: [D], changed: [A], removed: [B] + app: :with_reloader + build_scm: Mix.SCM.Path + os_pid: "#{os_pid}" + """ + end) + end end