Skip to content

Conversation

jonatanklosko
Copy link
Member

@jonatanklosko jonatanklosko commented Oct 11, 2024

This introduces Mix listeners API. A project may specify listeners as follows:

def project do
  [
    ...,
    listeners: [SomeDep.MixListener]
  ]
end

or

config :mix, :listeners, [SomeDep.MixListener]

where each listener is a child spec. A listener is started on Mix startup (after deps.loadpaths) and receives events as messages. Currently the only message is {:modules_compiled, info} and info.module_diff specifies which modules have been added/changed/removed. Also, for now we have a restriction such that listeners must come from dependencies or be already in the path (since we start listeners right after deps.loadpaths).

One immediate use case that this PR implements is IEx. Up to this point, IEx didn't know when mix compile was called separately, so a subsequent recompile inside IEx wouldn't reflect the newly compiled changes. The new IEx.MixListener listens to compilations and accumulates modules to be purged. Then, on recompile we purge these modules before invoking compilation. In the future we can have a mode where the modules are reloaded immediately.

Further use cases include phoenix_live_reload and the language server that can be made aware of external compilations.

Demo

compilenotify.mp4

Implementation

I added Mix.Sync.PubSub with a pub/sub based on TCP and directory listing (similar concepts to the lock implementation). I also moved the lock to Mix.Sync.Lock.

Then, we now have Mix.PubSub supervisor where we start a single subscriber process and a child supervisor with all the listeners. Whenever the subscriber receives a messages, it sends it to all listeners.

@jonatanklosko jonatanklosko force-pushed the jk-compile-notify branch 3 times, most recently from 09509b9 to abebc9b Compare October 11, 2024 16:09
do_run(config, args)
end)
else
Mix.Project.with_build_lock(config, fn ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting change. Although do_run above does not compile any application, it still tries to load them. So there is a chance that we try to load the applications while someone is compiling, and therefore erasing modules and .app files. Or someone could be running clean. I wonder if we should just always keep the lock? If someone cleans, it will fail anyway, but at least it will do so more consistently and not load an intermediate state? (and then I also wonder if we should have a deps lock while loading deps apps).

Copy link
Member Author

@jonatanklosko jonatanklosko Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I saw the lock message when starting a process with --no-compile in tests and I thought there's no need. I will revert.

and then I also wonder if we should have a deps lock while loading deps apps

The loading relies only on build, no? And clean already obtains build lock when deleting deps files.

)

if modules_diff do
Mix.Task.Compiler.notify_modules_compiled(modules_diff)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to suggest moving this call inside Mix.Compilers.Elixir but, to be honest, it is all the same. We will actually know which way is better once we do the Erlang one, because that may force us to go one way or the other. So let's keep this as is, but we may be forced to move it inside Mix.Compilers.* in a follow up PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both compilers we want to send the same event, so it's effectively more generic no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is the same event, but I believe Mix.Compilers.Erlang may be used by other projects, so we cannot change its return type. :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh I see, I though you mean where notify_modules_compiled is defined, but it's about where we call it. I though here is a better place and both Mix.Compilers.{Elixir,Erlang} are marked as private modules, but if they are effectively public, then yeah, we can make the broadcast there also.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Erlang one, last time I checked, was used by third party tools. The Elixir one isn't. This is why the Erlang one will be deciding factor (for consistency), but we can do it in another PR. :)

changed: Map.keys(Map.intersect(compiled_modules, all_modules)),
removed: (all_modules_keys -- compiled_modules_keys) -- pending_modules_keys,
timestamp: timestamp
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried about this code being expensive for really large applications (20k modules). In such cases, we are changing 1-100 files, but we now need to traverse 20k to compute these statistics.

We could simplify the stats: instead of sending added/changed/removed, we just send "changed", noting that it implies added/removed/changed, and the subscriber can then lookup in disk the actual status. For the language server case, this means we may look up some modules that have been removed, but that's fine. They are the minority in most cases. For IEx/Phoenix, they don't care, they will just purge all.

If we don't want a single field, we can also do with changed/removed. But I think a single field would totally work on this case. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just pushed an update to never traverse all the modules. If you still prefer to have a single field, let me know :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fantastic, I believe we can ship it. Something we can also try to do is to pass a function to PubSub. If someone is listening, we compute the event and send it. If nobody is listening, we don't. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do that, the function would close over all_modules and we would end up copying possibly huge amounts of data when passing it to the process, which may be worse than before, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be copying them at all? Couldn't we get open socket instances in the GenServer client and write to them directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I confused for the subscriber side. For publishing we only list the directory, so yeah we can make the message lazy. It will be a part of Mix.Task.Compiler.notify_modules_compiled API, but that should be fine. See b824a5d.

@josevalim
Copy link
Member

config :mix, :listeners, [SomeDep.MixListener]

I just realized this is not going to work as expected for umbrella projects. In case of umbrella projects, configuration is shared, but only some apps in the umbrella may depend on SomeDep. Given one of the reasons we chose the app environment was for IEx and we are ultimately not using it, it probably makes sense to roll back a :listeners key in def project.

@jonatanklosko
Copy link
Member Author

it probably makes sense to roll back a :listeners key in def project.

The issue with project is that the config is not easily injectable, which is what the language server would need.

@josevalim
Copy link
Member

The issue with project is that the config is not easily injectable, which is what the language server would need.

Mix has some APIs for pushing overrides to config but I don't know if the language servers can use it. We can also keep both, since the app config definitely won't work for Elixir projects.

@jonatanklosko
Copy link
Member Author

Mix has some APIs for pushing overrides to config but I don't know if the language servers can use it.

Yeah, but whichever place the language server would do may be too late, after stuff is loaded.

Copy link
Member

@josevalim josevalim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last comment and we can ship it.

message =
case message do
lazy_message when is_function(lazy_message, 0) -> lazy_message.()
message -> message
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to always pass a function, just in case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your call, we may use the pub/sub for other cases where there's nothing to compute, so forcing a function could be weird :)

@josevalim josevalim merged commit 7e90906 into elixir-lang:main Oct 14, 2024
9 checks passed
@josevalim
Copy link
Member

💚 💙 💜 💛 ❤️

@jonatanklosko jonatanklosko deleted the jk-compile-notify branch October 14, 2024 14:27
@ruslandoga
Copy link
Contributor

ruslandoga commented Apr 2, 2025

👋

This is probably silly, but I was working on a similar idea and noticed that beyond a certain size :gen_tcp.recv starts returning :enomem errors. Setting :buffer, :recbuf, :sndbuf to larger values doesn't seem to help.

Repro script
defmodule BigTcpTest do
  def run do
    port = 5555

    # Start a server in an async Task
    server_task =
      Task.async(fn ->
        # Listen on the chosen port
        {:ok, listen_sock} =
          :gen_tcp.listen(port, mode: :binary, packet: :raw, active: false, reuseaddr: true)

        IO.puts("Server listening on port #{port}")

        # Accept one connection
        {:ok, client_sock} = :gen_tcp.accept(listen_sock)
        IO.puts("Server accepted connection")

        # Attempt to read the entire ~100 MB in a single recv call
        case :gen_tcp.recv(client_sock, 100 * 1024 * 1024, :infinity) do
          {:ok, data} ->
            IO.puts("Server read data of length #{byte_size(data)}")
            # Echo it back
            :ok = :gen_tcp.send(client_sock, data)

          {:error, reason} ->
            IO.puts("Server failed to read: #{inspect(reason)}")
        end

        # Close sockets
        :gen_tcp.shutdown(client_sock, :read_write)
        :gen_tcp.close(client_sock)
        :gen_tcp.close(listen_sock)
      end)

    # Give the server time to start
    Process.sleep(500)

    # Connect to the server
    {:ok, sock} = :gen_tcp.connect(~c"127.0.0.1", port, [mode: :binary, packet: :raw, active: false], 5000)

    # Create 100 MB of data
    data = :crypto.strong_rand_bytes(100 * 1024 * 1024)
    IO.puts("Client generated 100MB of data")

    # Send it in one go
    :ok = :gen_tcp.send(sock, data)

    # Attempt to read the echo
    case :gen_tcp.recv(sock, byte_size(data), 15_000) do
      {:ok, ^data} ->
        IO.puts("Client received identical data back")

      {:ok, other} ->
        IO.puts("Client received different data back: #{byte_size(other)}")

      {:error, reason} ->
        IO.puts("Client failed to recv: #{inspect(reason)}")
    end

    :gen_tcp.close(sock)
    Task.await(server_task)
    IO.puts("Done.")
  end
end

BigTcpTest.run()
$ elixir big_tcp.exs
Server listening on port 5555
Server accepted connection
Client generated 100MB of data
Server failed to read: :enomem
Client failed to recv: :enomem
Done.

Reading in chunks seems to work.

Updated script
defmodule BigTcpTest do
  def run do
    port = 5555

    # Start a server in an async Task
    server_task =
      Task.async(fn ->
        # Listen on the chosen port
        {:ok, listen_sock} =
          :gen_tcp.listen(port, mode: :binary, packet: :raw, active: false, reuseaddr: true)

        IO.puts("Server listening on port #{port}")

        # Accept one connection
        {:ok, client_sock} = :gen_tcp.accept(listen_sock)
        IO.puts("Server accepted connection")

        # Attempt to read the entire ~100 MB in a single recv call
        case sock_recv(client_sock, 100 * 1024 * 1024, :infinity) do
          {:ok, data} ->
            IO.puts("Server read data of length #{byte_size(data)}")
            # Echo it back
            :ok = :gen_tcp.send(client_sock, data)

          {:error, reason} ->
            IO.puts("Server failed to read: #{inspect(reason)}")
        end

        # Close sockets
        :gen_tcp.shutdown(client_sock, :read_write)
        :gen_tcp.close(client_sock)
        :gen_tcp.close(listen_sock)
      end)

    # Give the server time to start
    Process.sleep(500)

    # Connect to the server
    {:ok, sock} = :gen_tcp.connect(~c"127.0.0.1", port, [mode: :binary, packet: :raw, active: false], 5000)

    # Create 100 MB of data
    data = :crypto.strong_rand_bytes(100 * 1024 * 1024)
    IO.puts("Client generated 100MB of data")

    # Send it in one go
    :ok = :gen_tcp.send(sock, data)

    # Attempt to read the echo
    case sock_recv(sock, byte_size(data), 15_000) do
      {:ok, ^data} ->
        IO.puts("Client received identical data back")

      {:ok, other} ->
        IO.puts("Client received different data back: #{byte_size(other)}")

      {:error, reason} ->
        IO.puts("Client failed to recv: #{inspect(reason)}")
    end

    :gen_tcp.close(sock)
    Task.await(server_task)
    IO.puts("Done.")
  end

  @one_mb 1024 * 1024

  # for larger messages, we need to read in chunks or we get {:error, :enomem}
  defp sock_recv(socket, size, timeout, acc \\ []) do
    with {:ok, data} <- :gen_tcp.recv(socket, min(size, @one_mb), timeout) do
      acc = [acc | data]

      case size - byte_size(data) do
        0 -> {:ok, IO.iodata_to_binary(acc)}
        left -> sock_recv(socket, left, timeout, acc)
      end
    end
  end
end

BigTcpTest.run()
$ elixir big_tcp_2.exs
Server listening on port 5555
Server accepted connection
Client generated 100MB of data
Server read data of length 104857600
Client received identical data back
Done.

@josevalim
Copy link
Member

Thank you for the heads up. I think we are fine as we are not sending this much data here. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

3 participants