Skip to content

Conversation

reisub
Copy link
Contributor

@reisub reisub commented Oct 11, 2024

This is a proposal to add a new ExUnit.Case option to support more use cases for running tests concurrently.

Motivation

This addresses situations like the following example.

Imagine you have several test modules that depend on resources which can't be used concurrently.

These test modules depend on MockServiceA:

  • ServiceA.Module1Test
  • ServiceA.Module2Test

And these test modules depend on MockServiceB:

  • ServiceB.Module1Test
  • ServiceB.Module2Test

Because access to the two mock services needs to be serialized, we can't use the :async option and that means all tests are executed sequentially even though tests that only depend on one of the services can run concurrently with other tests that don't depend on it.

CPU0: ServiceA.Module1Test -> ServiceA.Module2Test -> ServiceB.Module1Test -> ServiceB.Module2Test
CPU1: -

The goal of this work is to make it possible to run tests concurrently like so:

CPU0: ServiceA.Module1Test -> ServiceA.Module2Test
CPU1: ServiceB.Module1Test -> ServiceB.Module2Test

As the number of services and tests grows, the opportunity to decrease the time it takes to run the test suite grows dramatically.

Interface

The proposed code uses the name `:async_partition_key` for the option because the concepts of partitioning and partition keys should be familiar to a lot of people already and seems to fit well for this functionality.

We agreed on the name group to avoid confusion with test-suite partitioning.

Here's the example from above:

defmodule ServiceA.Module1Test do
  use ExUnit.Case, async: true, group: :service_a
end

defmodule ServiceA.Module2Test do
  use ExUnit.Case, async: true, group: :service_a
end

defmodule ServiceB.Module1Test do
  use ExUnit.Case, async: true, group: :service_b
end

defmodule ServiceB.Module2Test do
  use ExUnit.Case, async: true, group: :service_b
end

That being said, I'm open to other suggestions on the interface and naming.

Implementation details

I tried to keep the properties of the existing code (e.g. using :queue to make the test runs deterministic when run in compilation order).

For parameterized tests, if you specify a group, all the parameters run in the same group. If you don't specify it, then they run concurrently as the docs already state.

I also added tests for this functionality, though I'm not sure if there is a better way to test it.

diff --git a/lib/ex_unit/lib/ex_unit/case.ex b/lib/ex_unit/lib/ex_unit/case.ex
index 4674ea8..f9837d3 100644
--- a/lib/ex_unit/lib/ex_unit/case.ex
+++ b/lib/ex_unit/lib/ex_unit/case.ex
@@ -294,7 +294,7 @@ test "with tmp_dir", %{tmp_dir: tmp_dir} do

   @type env :: module() | Macro.Env.t()
   @compile {:no_warn_undefined, [IEx.Pry]}
-  @reserved [:module, :file, :line, :test, :async, :registered, :describe]
+  @reserved [:module, :file, :line, :test, :async, :async_partition_key, :registered, :describe]

   @doc false
   defmacro __using__(opts) do
@@ -317,7 +317,7 @@ def __register__(module, opts) do
     end

     {register?, opts} = Keyword.pop(opts, :register, true)
-    {next_opts, opts} = Keyword.split(opts, [:async, :parameterize])
+    {next_opts, opts} = Keyword.split(opts, [:async, :async_partition_key, :parameterize])

     if opts != [] do
       IO.warn("unknown options given to ExUnit.Case: #{inspect(opts)}")
@@ -552,8 +552,13 @@ defmacro __before_compile__(%{module: module}) do

     opts = Module.get_attribute(module, :ex_unit_module, [])
     async? = Keyword.get(opts, :async, false)
+    async_partition_key = Keyword.get(opts, :async_partition_key, nil)
     parameterize = Keyword.get(opts, :parameterize, nil)

+    if async_partition_key != nil and async? == false do
+      raise ArgumentError, ":async_partition_key is only a valid option when async: true"
+    end
+
     if not (parameterize == nil or (is_list(parameterize) and Enum.all?(parameterize, &is_map/1))) do
       raise ArgumentError, ":parameterize must be a list of maps, got: #{inspect(parameterize)}"
     end
@@ -569,7 +574,11 @@ def __ex_unit__ do
       end

       def __ex_unit__(:config) do
-        {unquote(async?), unquote(Macro.escape(parameterize))}
+        %{
+          async?: unquote(async?),
+          async_partition_key: unquote(async_partition_key),
+          parameterize: unquote(Macro.escape(parameterize))
+        }
       end
     end
   end
diff --git a/lib/ex_unit/lib/ex_unit/runner.ex b/lib/ex_unit/lib/ex_unit/runner.ex
index 29bd90d..babd131 100644
--- a/lib/ex_unit/lib/ex_unit/runner.ex
+++ b/lib/ex_unit/lib/ex_unit/runner.ex
@@ -106,9 +106,9 @@ defp async_loop(config, running, async_once?, modules_to_restore) do
         async_loop(config, running, async_once?, modules_to_restore)

       # Slots are available, start with async modules
-      async_modules = ExUnit.Server.take_async_modules(available) ->
-        running = spawn_modules(config, async_modules, true, running)
-        modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_modules)
+      async_partitions = ExUnit.Server.take_async_partitions(available) ->
+        running = spawn_modules(config, async_partitions, true, running)
+        modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_partitions)
         async_loop(config, running, true, modules_to_restore)

       true ->
@@ -161,6 +161,27 @@ defp spawn_modules(_config, [], _async, running) do
     running
   end

+  defp spawn_modules(
+         config,
+         [{_partition_key, partition_modules} | modules],
+         true = async?,
+         running
+       )
+       when is_list(partition_modules) do
+    if max_failures_reached?(config) do
+      running
+    else
+      {pid, ref} =
+        spawn_monitor(fn ->
+          Enum.each(partition_modules, fn {module, params} ->
+            run_module(config, module, async?, params)
+          end)
+        end)
+
+      spawn_modules(config, modules, async?, Map.put(running, ref, pid))
+    end
+  end
+
   defp spawn_modules(config, [{module, params} | modules], async?, running) do
     if max_failures_reached?(config) do
       running
diff --git a/lib/ex_unit/lib/ex_unit/server.ex b/lib/ex_unit/lib/ex_unit/server.ex
index 18962a1..f1bf728 100644
--- a/lib/ex_unit/lib/ex_unit/server.ex
+++ b/lib/ex_unit/lib/ex_unit/server.ex
@@ -9,7 +9,13 @@ def start_link(_opts) do
     GenServer.start_link(__MODULE__, :ok, name: @name)
   end

-  def add_module(name, {async?, parameterize}) do
+  def add_module(name, config) do
+    %{
+      async?: async?,
+      async_partition_key: async_partition_key,
+      parameterize: parameterize
+    } = config
+
     modules =
       if parameterize do
         Enum.map(parameterize, &{name, &1})
@@ -17,7 +23,7 @@ def add_module(name, {async?, parameterize}) do
         [{name, %{}}]
       end

-    case GenServer.call(@name, {:add, async?, modules}, @timeout) do
+    case GenServer.call(@name, {:add, {async?, async_partition_key}, modules}, @timeout) do
       :ok ->
         :ok

@@ -30,16 +36,16 @@ def modules_loaded(uniq?) do
     GenServer.call(@name, {:modules_loaded, uniq?}, @timeout)
   end

-  def take_async_modules(count) do
-    GenServer.call(@name, {:take_async_modules, count}, @timeout)
+  def take_async_partitions(count) do
+    GenServer.call(@name, {:take_async_partitions, count}, @timeout)
   end

   def take_sync_modules() do
     GenServer.call(@name, :take_sync_modules, @timeout)
   end

-  def restore_modules(async_modules, sync_modules) do
-    GenServer.call(@name, {:restore_modules, async_modules, sync_modules}, @timeout)
+  def restore_modules(async_partitions, sync_modules) do
+    GenServer.call(@name, {:restore_modules, async_partitions, sync_modules}, @timeout)
   end

   ## Callbacks
@@ -51,7 +57,8 @@ def init(:ok) do
     state = %{
       loaded: System.monotonic_time(),
       waiting: nil,
-      async_modules: :queue.new(),
+      async_partitions: %{},
+      async_partition_keys: :queue.new(),
       sync_modules: :queue.new()
     }

@@ -59,26 +66,30 @@ def init(:ok) do
   end

   # Called on demand until we are signaled all modules are loaded.
-  def handle_call({:take_async_modules, count}, from, %{waiting: nil} = state) do
-    {:noreply, take_modules(%{state | waiting: {from, count}})}
+  def handle_call({:take_async_partitions, count}, from, %{waiting: nil} = state) do
+    {:noreply, take_module_partitions(%{state | waiting: {from, count}})}
   end

   # Called once after all async modules have been sent and reverts the state.
   def handle_call(:take_sync_modules, _from, state) do
-    %{waiting: nil, loaded: :done, async_modules: async_modules} = state
-    0 = :queue.len(async_modules)
+    %{waiting: nil, loaded: :done, async_partition_keys: async_partition_keys} = state
+    0 = :queue.len(async_partition_keys)

     {:reply, :queue.to_list(state.sync_modules),
      %{state | sync_modules: :queue.new(), loaded: System.monotonic_time()}}
   end

   # Called by the runner when --repeat-until-failure is used.
-  def handle_call({:restore_modules, async_modules, sync_modules}, _from, state) do
+  def handle_call({:restore_modules, async_partitions, sync_modules}, _from, state) do
+    async_partition_keys =
+      Enum.map(async_partitions, fn {partition_key, _modules} -> partition_key end)
+
     {:reply, :ok,
      %{
        state
        | loaded: :done,
-         async_modules: :queue.from_list(async_modules),
+         async_partitions: Map.new(async_partitions),
+         async_partition_keys: :queue.from_list(async_partition_keys),
          sync_modules: :queue.from_list(sync_modules)
      }}
   end
@@ -91,12 +102,19 @@ def handle_call({:modules_loaded, uniq?}, _from, %{loaded: loaded} = state)
       when is_integer(loaded) do
     state =
       if uniq? do
-        async_modules = :queue.to_list(state.async_modules) |> Enum.uniq() |> :queue.from_list()
+        async_partitions =
+          state.async_partitions
+          |> Enum.map(fn {partition_key, modules} ->
+            partition_modules = Enum.uniq(modules)
+            {partition_key, partition_modules}
+          end)
+          |> Map.new()
+
         sync_modules = :queue.to_list(state.sync_modules) |> Enum.uniq() |> :queue.from_list()

         %{
           state
-          | async_modules: async_modules,
+          | async_partitions: async_partitions,
             sync_modules: sync_modules
         }
       else
@@ -104,21 +122,42 @@ def handle_call({:modules_loaded, uniq?}, _from, %{loaded: loaded} = state)
       end

     diff = System.convert_time_unit(System.monotonic_time() - loaded, :native, :microsecond)
-    {:reply, diff, take_modules(%{state | loaded: :done})}
+    {:reply, diff, take_module_partitions(%{state | loaded: :done})}
   end

-  def handle_call({:add, true, names}, _from, %{loaded: loaded} = state)
+  def handle_call({:add, {true, async_partition_key}, names}, _from, %{loaded: loaded} = state)
       when is_integer(loaded) do
     state =
-      update_in(
-        state.async_modules,
-        &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end)
-      )
-
-    {:reply, :ok, take_modules(state)}
-  end
-
-  def handle_call({:add, false, names}, _from, %{loaded: loaded} = state)
+      Enum.reduce(names, state, fn name, updated_state ->
+        async_partition_key = async_partition_key || default_async_partition_key(name)
+        partition_key_exists? = Map.has_key?(state.async_partitions, async_partition_key)
+
+        updated_state
+        |> update_in([:async_partitions], fn async_partitions ->
+          {_, async_partitions} =
+            Map.get_and_update(async_partitions, async_partition_key, fn
+              nil ->
+                {nil, [name]}
+
+              modules ->
+                {modules, [name | modules]}
+            end)
+
+          async_partitions
+        end)
+        |> update_in([:async_partition_keys], fn q ->
+          if partition_key_exists? do
+            q
+          else
+            :queue.in(async_partition_key, q)
+          end
+        end)
+      end)
+
+    {:reply, :ok, take_module_partitions(state)}
+  end
+
+  def handle_call({:add, {false, _async_partition_key}, names}, _from, %{loaded: loaded} = state)
       when is_integer(loaded) do
     state =
       update_in(state.sync_modules, &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end))
@@ -126,28 +165,57 @@ def handle_call({:add, false, names}, _from, %{loaded: loaded} = state)
     {:reply, :ok, state}
   end

-  def handle_call({:add, _async?, _names}, _from, state),
+  def handle_call({:add, {_async?, _async_partition_key}, _names}, _from, state),
     do: {:reply, :already_running, state}

-  defp take_modules(%{waiting: nil} = state) do
+  defp default_async_partition_key({module, params}) do
+    if params == %{} do
+      module
+    else
+      # if no async partition is specified, parameters run concurrently
+      "#{module}_#{:erlang.phash2(params)}"
+    end
+  end
+
+  defp take_module_partitions(%{waiting: nil} = state) do
     state
   end

-  defp take_modules(%{waiting: {from, count}} = state) do
-    has_async_modules? = not :queue.is_empty(state.async_modules)
+  defp take_module_partitions(%{waiting: {from, count}} = state) do
+    has_async_partitions? = not :queue.is_empty(state.async_partition_keys)

     cond do
-      not has_async_modules? and state.loaded == :done ->
+      not has_async_partitions? and state.loaded == :done ->
         GenServer.reply(from, nil)
         %{state | waiting: nil}

-      not has_async_modules? ->
+      not has_async_partitions? ->
         state

       true ->
-        {modules, async_modules} = take_until(count, state.async_modules)
-        GenServer.reply(from, modules)
-        %{state | async_modules: async_modules, waiting: nil}
+        {partition_keys, remaining_partition_keys} =
+          take_until(count, state.async_partition_keys)
+
+        {async_partitions, remaining_partitions} =
+          Enum.map_reduce(partition_keys, state.async_partitions, fn partition_key,
+                                                                     remaining_partitions ->
+            {partition_modules, remaining_partitions} =
+              Map.pop!(remaining_partitions, partition_key)
+
+            {
+              {partition_key, partition_modules},
+              remaining_partitions
+            }
+          end)
+
+        GenServer.reply(from, async_partitions)
+
+        %{
+          state
+          | async_partition_keys: remaining_partition_keys,
+            async_partitions: remaining_partitions,
+            waiting: nil
+        }
     end
   end
@reisub reisub changed the title Support :async_partition_key option in ExUnit Support :group option in ExUnit Oct 12, 2024
Copy link
Member

@josevalim josevalim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped the last batch of nitpicks and we can ship it!!!!

@reisub
Copy link
Contributor Author

reisub commented Oct 13, 2024

@josevalim I've updated the branch from main and this is good for a final look, thanks for your help to get this ready! 🙏

@reisub reisub changed the title Support :group option in ExUnit Support :group option in ExUnit Oct 13, 2024
@josevalim josevalim merged commit 7190f41 into elixir-lang:main Oct 13, 2024
9 checks passed
@josevalim
Copy link
Member

💚 💙 💜 💛 ❤️

)

{:reply, :ok, take_modules(state)}
{:reply, :ok, state}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lost a call to take_modules here.

when is_integer(loaded) do
state =
update_in(state.sync_modules, &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end))
case state.async_groups do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are setting the state here to a tuple. I have fixes for it upcoming in main.

defp spawn_modules(config, [{module, params} | modules], async?, running) do
defp spawn_modules(
config,
[{_group, group_modules} | modules],
Copy link
Member

@josevalim josevalim Oct 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sync modules are not arriving in this shape and they are being skipped (because the group modules is treated as an empty map). I will add a pattern to help us catch it. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn, that masked the other issues!
Thanks for fixing it.

And bummer about the tests being flaky, it would have been nice to have tests for this functionality.

@reisub reisub deleted the async-test-partitioning branch October 13, 2024 19:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants