From 83e8e657b78ff363f90703388583cdc4d662874c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Mon, 7 Oct 2024 16:18:50 +0200 Subject: [PATCH 01/14] Support async_partition_key option in ExUnit diff --git a/lib/ex_unit/lib/ex_unit/case.ex b/lib/ex_unit/lib/ex_unit/case.ex index 4674ea847..f9837d36b 100644 --- a/lib/ex_unit/lib/ex_unit/case.ex +++ b/lib/ex_unit/lib/ex_unit/case.ex @@ -294,7 +294,7 @@ test "with tmp_dir", %{tmp_dir: tmp_dir} do @type env :: module() | Macro.Env.t() @compile {:no_warn_undefined, [IEx.Pry]} - @reserved [:module, :file, :line, :test, :async, :registered, :describe] + @reserved [:module, :file, :line, :test, :async, :async_partition_key, :registered, :describe] @doc false defmacro __using__(opts) do @@ -317,7 +317,7 @@ def __register__(module, opts) do end {register?, opts} = Keyword.pop(opts, :register, true) - {next_opts, opts} = Keyword.split(opts, [:async, :parameterize]) + {next_opts, opts} = Keyword.split(opts, [:async, :async_partition_key, :parameterize]) if opts != [] do IO.warn("unknown options given to ExUnit.Case: #{inspect(opts)}") @@ -552,8 +552,13 @@ defmacro __before_compile__(%{module: module}) do opts = Module.get_attribute(module, :ex_unit_module, []) async? = Keyword.get(opts, :async, false) + async_partition_key = Keyword.get(opts, :async_partition_key, nil) parameterize = Keyword.get(opts, :parameterize, nil) + if async_partition_key != nil and async? == false do + raise ArgumentError, ":async_partition_key is only a valid option when async: true" + end + if not (parameterize == nil or (is_list(parameterize) and Enum.all?(parameterize, &is_map/1))) do raise ArgumentError, ":parameterize must be a list of maps, got: #{inspect(parameterize)}" end @@ -569,7 +574,11 @@ def __ex_unit__ do end def __ex_unit__(:config) do - {unquote(async?), unquote(Macro.escape(parameterize))} + %{ + async?: unquote(async?), + async_partition_key: unquote(async_partition_key), + parameterize: unquote(Macro.escape(parameterize)) + } end end end diff --git a/lib/ex_unit/lib/ex_unit/runner.ex b/lib/ex_unit/lib/ex_unit/runner.ex index 29bd90d37..babd131a5 100644 --- a/lib/ex_unit/lib/ex_unit/runner.ex +++ b/lib/ex_unit/lib/ex_unit/runner.ex @@ -106,9 +106,9 @@ defp async_loop(config, running, async_once?, modules_to_restore) do async_loop(config, running, async_once?, modules_to_restore) # Slots are available, start with async modules - async_modules = ExUnit.Server.take_async_modules(available) -> - running = spawn_modules(config, async_modules, true, running) - modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_modules) + async_partitions = ExUnit.Server.take_async_partitions(available) -> + running = spawn_modules(config, async_partitions, true, running) + modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_partitions) async_loop(config, running, true, modules_to_restore) true -> @@ -161,6 +161,27 @@ defp spawn_modules(_config, [], _async, running) do running end + defp spawn_modules( + config, + [{_partition_key, partition_modules} | modules], + true = async?, + running + ) + when is_list(partition_modules) do + if max_failures_reached?(config) do + running + else + {pid, ref} = + spawn_monitor(fn -> + Enum.each(partition_modules, fn {module, params} -> + run_module(config, module, async?, params) + end) + end) + + spawn_modules(config, modules, async?, Map.put(running, ref, pid)) + end + end + defp spawn_modules(config, [{module, params} | modules], async?, running) do if max_failures_reached?(config) do running diff --git a/lib/ex_unit/lib/ex_unit/server.ex b/lib/ex_unit/lib/ex_unit/server.ex index 18962a1af..f1bf7287c 100644 --- a/lib/ex_unit/lib/ex_unit/server.ex +++ b/lib/ex_unit/lib/ex_unit/server.ex @@ -9,7 +9,13 @@ def start_link(_opts) do GenServer.start_link(__MODULE__, :ok, name: @name) end - def add_module(name, {async?, parameterize}) do + def add_module(name, config) do + %{ + async?: async?, + async_partition_key: async_partition_key, + parameterize: parameterize + } = config + modules = if parameterize do Enum.map(parameterize, &{name, &1}) @@ -17,7 +23,7 @@ def add_module(name, {async?, parameterize}) do [{name, %{}}] end - case GenServer.call(@name, {:add, async?, modules}, @timeout) do + case GenServer.call(@name, {:add, {async?, async_partition_key}, modules}, @timeout) do :ok -> :ok @@ -30,16 +36,16 @@ def modules_loaded(uniq?) do GenServer.call(@name, {:modules_loaded, uniq?}, @timeout) end - def take_async_modules(count) do - GenServer.call(@name, {:take_async_modules, count}, @timeout) + def take_async_partitions(count) do + GenServer.call(@name, {:take_async_partitions, count}, @timeout) end def take_sync_modules() do GenServer.call(@name, :take_sync_modules, @timeout) end - def restore_modules(async_modules, sync_modules) do - GenServer.call(@name, {:restore_modules, async_modules, sync_modules}, @timeout) + def restore_modules(async_partitions, sync_modules) do + GenServer.call(@name, {:restore_modules, async_partitions, sync_modules}, @timeout) end ## Callbacks @@ -51,7 +57,8 @@ def init(:ok) do state = %{ loaded: System.monotonic_time(), waiting: nil, - async_modules: :queue.new(), + async_partitions: %{}, + async_partition_keys: :queue.new(), sync_modules: :queue.new() } @@ -59,26 +66,30 @@ def init(:ok) do end # Called on demand until we are signaled all modules are loaded. - def handle_call({:take_async_modules, count}, from, %{waiting: nil} = state) do - {:noreply, take_modules(%{state | waiting: {from, count}})} + def handle_call({:take_async_partitions, count}, from, %{waiting: nil} = state) do + {:noreply, take_module_partitions(%{state | waiting: {from, count}})} end # Called once after all async modules have been sent and reverts the state. def handle_call(:take_sync_modules, _from, state) do - %{waiting: nil, loaded: :done, async_modules: async_modules} = state - 0 = :queue.len(async_modules) + %{waiting: nil, loaded: :done, async_partition_keys: async_partition_keys} = state + 0 = :queue.len(async_partition_keys) {:reply, :queue.to_list(state.sync_modules), %{state | sync_modules: :queue.new(), loaded: System.monotonic_time()}} end # Called by the runner when --repeat-until-failure is used. - def handle_call({:restore_modules, async_modules, sync_modules}, _from, state) do + def handle_call({:restore_modules, async_partitions, sync_modules}, _from, state) do + async_partition_keys = + Enum.map(async_partitions, fn {partition_key, _modules} -> partition_key end) + {:reply, :ok, %{ state | loaded: :done, - async_modules: :queue.from_list(async_modules), + async_partitions: Map.new(async_partitions), + async_partition_keys: :queue.from_list(async_partition_keys), sync_modules: :queue.from_list(sync_modules) }} end @@ -91,12 +102,19 @@ def handle_call({:modules_loaded, uniq?}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = if uniq? do - async_modules = :queue.to_list(state.async_modules) |> Enum.uniq() |> :queue.from_list() + async_partitions = + state.async_partitions + |> Enum.map(fn {partition_key, modules} -> + partition_modules = Enum.uniq(modules) + {partition_key, partition_modules} + end) + |> Map.new() + sync_modules = :queue.to_list(state.sync_modules) |> Enum.uniq() |> :queue.from_list() %{ state - | async_modules: async_modules, + | async_partitions: async_partitions, sync_modules: sync_modules } else @@ -104,21 +122,42 @@ def handle_call({:modules_loaded, uniq?}, _from, %{loaded: loaded} = state) end diff = System.convert_time_unit(System.monotonic_time() - loaded, :native, :microsecond) - {:reply, diff, take_modules(%{state | loaded: :done})} + {:reply, diff, take_module_partitions(%{state | loaded: :done})} end - def handle_call({:add, true, names}, _from, %{loaded: loaded} = state) + def handle_call({:add, {true, async_partition_key}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = - update_in( - state.async_modules, - &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end) - ) - - {:reply, :ok, take_modules(state)} - end - - def handle_call({:add, false, names}, _from, %{loaded: loaded} = state) + Enum.reduce(names, state, fn name, updated_state -> + async_partition_key = async_partition_key || default_async_partition_key(name) + partition_key_exists? = Map.has_key?(state.async_partitions, async_partition_key) + + updated_state + |> update_in([:async_partitions], fn async_partitions -> + {_, async_partitions} = + Map.get_and_update(async_partitions, async_partition_key, fn + nil -> + {nil, [name]} + + modules -> + {modules, [name | modules]} + end) + + async_partitions + end) + |> update_in([:async_partition_keys], fn q -> + if partition_key_exists? do + q + else + :queue.in(async_partition_key, q) + end + end) + end) + + {:reply, :ok, take_module_partitions(state)} + end + + def handle_call({:add, {false, _async_partition_key}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = update_in(state.sync_modules, &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end)) @@ -126,28 +165,57 @@ def handle_call({:add, false, names}, _from, %{loaded: loaded} = state) {:reply, :ok, state} end - def handle_call({:add, _async?, _names}, _from, state), + def handle_call({:add, {_async?, _async_partition_key}, _names}, _from, state), do: {:reply, :already_running, state} - defp take_modules(%{waiting: nil} = state) do + defp default_async_partition_key({module, params}) do + if params == %{} do + module + else + # if no async partition is specified, parameters run concurrently + "#{module}_#{:erlang.phash2(params)}" + end + end + + defp take_module_partitions(%{waiting: nil} = state) do state end - defp take_modules(%{waiting: {from, count}} = state) do - has_async_modules? = not :queue.is_empty(state.async_modules) + defp take_module_partitions(%{waiting: {from, count}} = state) do + has_async_partitions? = not :queue.is_empty(state.async_partition_keys) cond do - not has_async_modules? and state.loaded == :done -> + not has_async_partitions? and state.loaded == :done -> GenServer.reply(from, nil) %{state | waiting: nil} - not has_async_modules? -> + not has_async_partitions? -> state true -> - {modules, async_modules} = take_until(count, state.async_modules) - GenServer.reply(from, modules) - %{state | async_modules: async_modules, waiting: nil} + {partition_keys, remaining_partition_keys} = + take_until(count, state.async_partition_keys) + + {async_partitions, remaining_partitions} = + Enum.map_reduce(partition_keys, state.async_partitions, fn partition_key, + remaining_partitions -> + {partition_modules, remaining_partitions} = + Map.pop!(remaining_partitions, partition_key) + + { + {partition_key, partition_modules}, + remaining_partitions + } + end) + + GenServer.reply(from, async_partitions) + + %{ + state + | async_partition_keys: remaining_partition_keys, + async_partitions: remaining_partitions, + waiting: nil + } end end --- lib/ex_unit/lib/ex_unit/case.ex | 15 +++- lib/ex_unit/lib/ex_unit/runner.ex | 27 +++++- lib/ex_unit/lib/ex_unit/server.ex | 138 ++++++++++++++++++++++-------- 3 files changed, 139 insertions(+), 41 deletions(-) diff --git a/lib/ex_unit/lib/ex_unit/case.ex b/lib/ex_unit/lib/ex_unit/case.ex index 4674ea84734..f9837d36b51 100644 --- a/lib/ex_unit/lib/ex_unit/case.ex +++ b/lib/ex_unit/lib/ex_unit/case.ex @@ -294,7 +294,7 @@ defmodule ExUnit.Case do @type env :: module() | Macro.Env.t() @compile {:no_warn_undefined, [IEx.Pry]} - @reserved [:module, :file, :line, :test, :async, :registered, :describe] + @reserved [:module, :file, :line, :test, :async, :async_partition_key, :registered, :describe] @doc false defmacro __using__(opts) do @@ -317,7 +317,7 @@ defmodule ExUnit.Case do end {register?, opts} = Keyword.pop(opts, :register, true) - {next_opts, opts} = Keyword.split(opts, [:async, :parameterize]) + {next_opts, opts} = Keyword.split(opts, [:async, :async_partition_key, :parameterize]) if opts != [] do IO.warn("unknown options given to ExUnit.Case: #{inspect(opts)}") @@ -552,8 +552,13 @@ defmodule ExUnit.Case do opts = Module.get_attribute(module, :ex_unit_module, []) async? = Keyword.get(opts, :async, false) + async_partition_key = Keyword.get(opts, :async_partition_key, nil) parameterize = Keyword.get(opts, :parameterize, nil) + if async_partition_key != nil and async? == false do + raise ArgumentError, ":async_partition_key is only a valid option when async: true" + end + if not (parameterize == nil or (is_list(parameterize) and Enum.all?(parameterize, &is_map/1))) do raise ArgumentError, ":parameterize must be a list of maps, got: #{inspect(parameterize)}" end @@ -569,7 +574,11 @@ defmodule ExUnit.Case do end def __ex_unit__(:config) do - {unquote(async?), unquote(Macro.escape(parameterize))} + %{ + async?: unquote(async?), + async_partition_key: unquote(async_partition_key), + parameterize: unquote(Macro.escape(parameterize)) + } end end end diff --git a/lib/ex_unit/lib/ex_unit/runner.ex b/lib/ex_unit/lib/ex_unit/runner.ex index 29bd90d3716..babd131a540 100644 --- a/lib/ex_unit/lib/ex_unit/runner.ex +++ b/lib/ex_unit/lib/ex_unit/runner.ex @@ -106,9 +106,9 @@ defmodule ExUnit.Runner do async_loop(config, running, async_once?, modules_to_restore) # Slots are available, start with async modules - async_modules = ExUnit.Server.take_async_modules(available) -> - running = spawn_modules(config, async_modules, true, running) - modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_modules) + async_partitions = ExUnit.Server.take_async_partitions(available) -> + running = spawn_modules(config, async_partitions, true, running) + modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_partitions) async_loop(config, running, true, modules_to_restore) true -> @@ -161,6 +161,27 @@ defmodule ExUnit.Runner do running end + defp spawn_modules( + config, + [{_partition_key, partition_modules} | modules], + true = async?, + running + ) + when is_list(partition_modules) do + if max_failures_reached?(config) do + running + else + {pid, ref} = + spawn_monitor(fn -> + Enum.each(partition_modules, fn {module, params} -> + run_module(config, module, async?, params) + end) + end) + + spawn_modules(config, modules, async?, Map.put(running, ref, pid)) + end + end + defp spawn_modules(config, [{module, params} | modules], async?, running) do if max_failures_reached?(config) do running diff --git a/lib/ex_unit/lib/ex_unit/server.ex b/lib/ex_unit/lib/ex_unit/server.ex index 18962a1af1a..505fe91b0c8 100644 --- a/lib/ex_unit/lib/ex_unit/server.ex +++ b/lib/ex_unit/lib/ex_unit/server.ex @@ -9,7 +9,13 @@ defmodule ExUnit.Server do GenServer.start_link(__MODULE__, :ok, name: @name) end - def add_module(name, {async?, parameterize}) do + def add_module(name, config) do + %{ + async?: async?, + async_partition_key: async_partition_key, + parameterize: parameterize + } = config + modules = if parameterize do Enum.map(parameterize, &{name, &1}) @@ -17,7 +23,7 @@ defmodule ExUnit.Server do [{name, %{}}] end - case GenServer.call(@name, {:add, async?, modules}, @timeout) do + case GenServer.call(@name, {:add, {async?, async_partition_key}, modules}, @timeout) do :ok -> :ok @@ -30,16 +36,16 @@ defmodule ExUnit.Server do GenServer.call(@name, {:modules_loaded, uniq?}, @timeout) end - def take_async_modules(count) do - GenServer.call(@name, {:take_async_modules, count}, @timeout) + def take_async_partitions(count) do + GenServer.call(@name, {:take_async_partitions, count}, @timeout) end def take_sync_modules() do GenServer.call(@name, :take_sync_modules, @timeout) end - def restore_modules(async_modules, sync_modules) do - GenServer.call(@name, {:restore_modules, async_modules, sync_modules}, @timeout) + def restore_modules(async_partitions, sync_modules) do + GenServer.call(@name, {:restore_modules, async_partitions, sync_modules}, @timeout) end ## Callbacks @@ -51,7 +57,8 @@ defmodule ExUnit.Server do state = %{ loaded: System.monotonic_time(), waiting: nil, - async_modules: :queue.new(), + async_partitions: %{}, + async_partition_keys: :queue.new(), sync_modules: :queue.new() } @@ -59,26 +66,30 @@ defmodule ExUnit.Server do end # Called on demand until we are signaled all modules are loaded. - def handle_call({:take_async_modules, count}, from, %{waiting: nil} = state) do - {:noreply, take_modules(%{state | waiting: {from, count}})} + def handle_call({:take_async_partitions, count}, from, %{waiting: nil} = state) do + {:noreply, take_module_partitions(%{state | waiting: {from, count}})} end # Called once after all async modules have been sent and reverts the state. def handle_call(:take_sync_modules, _from, state) do - %{waiting: nil, loaded: :done, async_modules: async_modules} = state - 0 = :queue.len(async_modules) + %{waiting: nil, loaded: :done, async_partition_keys: async_partition_keys} = state + 0 = :queue.len(async_partition_keys) {:reply, :queue.to_list(state.sync_modules), %{state | sync_modules: :queue.new(), loaded: System.monotonic_time()}} end # Called by the runner when --repeat-until-failure is used. - def handle_call({:restore_modules, async_modules, sync_modules}, _from, state) do + def handle_call({:restore_modules, async_partitions, sync_modules}, _from, state) do + async_partition_keys = + Enum.map(async_partitions, fn {partition_key, _modules} -> partition_key end) + {:reply, :ok, %{ state | loaded: :done, - async_modules: :queue.from_list(async_modules), + async_partitions: Map.new(async_partitions), + async_partition_keys: :queue.from_list(async_partition_keys), sync_modules: :queue.from_list(sync_modules) }} end @@ -91,12 +102,19 @@ defmodule ExUnit.Server do when is_integer(loaded) do state = if uniq? do - async_modules = :queue.to_list(state.async_modules) |> Enum.uniq() |> :queue.from_list() + async_partitions = + state.async_partitions + |> Enum.map(fn {partition_key, modules} -> + partition_modules = Enum.uniq(modules) + {partition_key, partition_modules} + end) + |> Map.new() + sync_modules = :queue.to_list(state.sync_modules) |> Enum.uniq() |> :queue.from_list() %{ state - | async_modules: async_modules, + | async_partitions: async_partitions, sync_modules: sync_modules } else @@ -104,21 +122,42 @@ defmodule ExUnit.Server do end diff = System.convert_time_unit(System.monotonic_time() - loaded, :native, :microsecond) - {:reply, diff, take_modules(%{state | loaded: :done})} + {:reply, diff, take_module_partitions(%{state | loaded: :done})} end - def handle_call({:add, true, names}, _from, %{loaded: loaded} = state) + def handle_call({:add, {true, async_partition_key}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = - update_in( - state.async_modules, - &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end) - ) - - {:reply, :ok, take_modules(state)} - end - - def handle_call({:add, false, names}, _from, %{loaded: loaded} = state) + Enum.reduce(names, state, fn name, updated_state -> + async_partition_key = async_partition_key || default_async_partition_key(name) + partition_key_exists? = Map.has_key?(state.async_partitions, async_partition_key) + + updated_state + |> update_in([:async_partitions], fn async_partitions -> + {_, async_partitions} = + Map.get_and_update(async_partitions, async_partition_key, fn + nil -> + {nil, [name]} + + modules -> + {modules, [name | modules]} + end) + + async_partitions + end) + |> update_in([:async_partition_keys], fn q -> + if partition_key_exists? do + q + else + :queue.in(async_partition_key, q) + end + end) + end) + + {:reply, :ok, take_module_partitions(state)} + end + + def handle_call({:add, {false, _async_partition_key}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = update_in(state.sync_modules, &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end)) @@ -126,28 +165,57 @@ defmodule ExUnit.Server do {:reply, :ok, state} end - def handle_call({:add, _async?, _names}, _from, state), + def handle_call({:add, {_async?, _async_partition_key}, _names}, _from, state), do: {:reply, :already_running, state} - defp take_modules(%{waiting: nil} = state) do + defp default_async_partition_key({module, params}) do + if params == %{} do + module + else + # if no async partition is specified, parameters run concurrently + "#{module}_#{:erlang.phash2(params)}" + end + end + + defp take_module_partitions(%{waiting: nil} = state) do state end - defp take_modules(%{waiting: {from, count}} = state) do - has_async_modules? = not :queue.is_empty(state.async_modules) + defp take_module_partitions(%{waiting: {from, count}} = state) do + has_async_partitions? = not :queue.is_empty(state.async_partition_keys) cond do - not has_async_modules? and state.loaded == :done -> + not has_async_partitions? and state.loaded == :done -> GenServer.reply(from, nil) %{state | waiting: nil} - not has_async_modules? -> + not has_async_partitions? -> state true -> - {modules, async_modules} = take_until(count, state.async_modules) - GenServer.reply(from, modules) - %{state | async_modules: async_modules, waiting: nil} + {partition_keys, remaining_partition_keys} = + take_until(count, state.async_partition_keys) + + {async_partitions, remaining_partitions} = + Enum.map_reduce(partition_keys, state.async_partitions, fn partition_key, + remaining_partitions -> + {partition_modules, remaining_partitions} = + Map.pop!(remaining_partitions, partition_key) + + { + {partition_key, Enum.reverse(partition_modules)}, + remaining_partitions + } + end) + + GenServer.reply(from, async_partitions) + + %{ + state + | async_partition_keys: remaining_partition_keys, + async_partitions: remaining_partitions, + waiting: nil + } end end From c74322fcb7233a18a740cc69febb6cea24a86b05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Thu, 10 Oct 2024 22:47:30 +0200 Subject: [PATCH 02/14] Add tests for :async and :async_partition_key --- lib/ex_unit/test/ex_unit_test.exs | 117 ++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/lib/ex_unit/test/ex_unit_test.exs b/lib/ex_unit/test/ex_unit_test.exs index e48067cca77..fff937855b9 100644 --- a/lib/ex_unit/test/ex_unit_test.exs +++ b/lib/ex_unit/test/ex_unit_test.exs @@ -715,6 +715,123 @@ defmodule ExUnitTest do assert_receive {:tmp_dir, tmp_dir2} when tmp_dir1 != tmp_dir2 end + test "async tests run concurrently" do + Process.register(self(), :async_tests) + + defmodule FirstAsyncTest do + use ExUnit.Case, async: true + + test "first test" do + send(:async_tests, {:first_test, :started}) + Process.sleep(10) + assert true + send(:async_tests, {:first_test, :finished}) + end + end + + defmodule SecondAsyncTest do + use ExUnit.Case, async: true + + test "second test" do + send(:async_tests, {:second_test, :started}) + Process.sleep(15) + assert true + send(:async_tests, {:second_test, :finished}) + end + end + + configure_and_reload_on_exit(max_cases: 2) + + test_task = + Task.async(fn -> + capture_io(fn -> ExUnit.run() end) + end) + + # Expected test distribution through time + # + # Time (ms): 0 10 20 + # |-----|-----| + # CPU0: ( 1 ) + # CPU1: ( 2 ) + assert_receive({:first_test, :started}, 5) + assert_receive({:second_test, :started}, 5) + + # make sure we don't leave the task running after the outer test finishes + Task.await(test_task) + end + + test "async tests run concurrently respecting partition keys" do + Process.register(self(), :async_partitioned_tests) + + defmodule RedOneTest do + use ExUnit.Case, async: true, async_partition_key: :red + + test "red one test" do + send(:async_partitioned_tests, {:red_one, :started}) + Process.sleep(30) + assert true + send(:async_partitioned_tests, {:red_one, :finished}) + end + end + + defmodule RedTwoTest do + use ExUnit.Case, async: true, async_partition_key: :red + + test "red two test" do + send(:async_partitioned_tests, {:red_two, :started}) + Process.sleep(10) + assert true + send(:async_partitioned_tests, {:red_two, :finished}) + end + end + + defmodule BlueOneTest do + use ExUnit.Case, async: true, async_partition_key: :blue + + test "blue one test" do + send(:async_partitioned_tests, {:blue_one, :started}) + Process.sleep(10) + assert true + send(:async_partitioned_tests, {:blue_one, :finished}) + end + end + + defmodule BlueTwoTest do + use ExUnit.Case, async: true, async_partition_key: :blue + + test "blue two test" do + send(:async_partitioned_tests, {:blue_two, :started}) + Process.sleep(10) + assert true + send(:async_partitioned_tests, {:blue_two, :finished}) + end + end + + configure_and_reload_on_exit(max_cases: 4) + + test_task = + Task.async(fn -> + capture_io(fn -> ExUnit.run() end) + end) + + # Expected test distribution through time + # + # Time (ms): 0 10 20 30 40 + # |-----|-----|-----|-----| + # CPU0: ( R1 )( R2 ) + # CPU1: ( B1 )( B2 ) + assert_receive({:red_one, :started}, 5) + assert_receive({:blue_one, :started}, 5) + refute_receive({:red_two, :started}, 25) + assert_received({:blue_one, :finished}) + assert_received({:blue_two, :started}) + assert_receive({:blue_two, :finished}, 15) + assert_receive({:red_two, :started}, 15) + + # make sure we don't leave the task running after the outer test finishes + Task.await(test_task) + end + describe "after_suite/1" do test "executes all callbacks set in reverse order" do Process.register(self(), :after_suite_test_process) From 7c6ece6126d9f72d0b05619f9036b25c6b1e9c9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Fri, 11 Oct 2024 20:40:31 +0200 Subject: [PATCH 03/14] Add test to ensure async partition order is predictable --- lib/ex_unit/test/ex_unit_test.exs | 48 +++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/lib/ex_unit/test/ex_unit_test.exs b/lib/ex_unit/test/ex_unit_test.exs index fff937855b9..6ded0c6040f 100644 --- a/lib/ex_unit/test/ex_unit_test.exs +++ b/lib/ex_unit/test/ex_unit_test.exs @@ -1218,6 +1218,54 @@ defmodule ExUnitTest do assert third =~ "ThirdTestFIFO" end + test "async test partitions are run in compile order (FIFO)" do + defmodule RedOneFIFO do + use ExUnit.Case, async: true, async_partition_key: :red + + test "red one test" do + assert true + end + end + + defmodule BlueOneFIFO do + use ExUnit.Case, async: true, async_partition_key: :blue + + test "blue one test" do + assert true + end + end + + defmodule RedTwoFIFO do + use ExUnit.Case, async: true, async_partition_key: :red + + test "red two test" do + assert true + end + end + + defmodule BlueTwoFIFO do + use ExUnit.Case, async: true, async_partition_key: :blue + + test "blue two test" do + assert true + end + end + + configure_and_reload_on_exit(trace: true) + + output = + capture_io(fn -> + assert ExUnit.run() == %{total: 4, failures: 0, excluded: 0, skipped: 0} + end) + + [_, first, second, third, fourth | _] = String.split(output, "\n\n") + + assert first =~ "RedOneFIFO" + assert second =~ "RedTwoFIFO" + assert third =~ "BlueOneFIFO" + assert fourth =~ "BlueTwoFIFO" + end + test "can filter async tests" do defmodule FirstTestAsyncTrue do use ExUnit.Case, async: true From b0dd42fe9d1c8e47673d36c19c0bba447c40e62b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Fri, 11 Oct 2024 20:57:04 +0200 Subject: [PATCH 04/14] Document :async_partition_key option --- lib/ex_unit/lib/ex_unit/case.ex | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/ex_unit/lib/ex_unit/case.ex b/lib/ex_unit/lib/ex_unit/case.ex index f9837d36b51..618afca6aba 100644 --- a/lib/ex_unit/lib/ex_unit/case.ex +++ b/lib/ex_unit/lib/ex_unit/case.ex @@ -12,6 +12,13 @@ defmodule ExUnit.Case do It should be enabled only if tests do not change any global state. Defaults to `false`. + * `:async_partition_key` - configures async tests in this module to run + within an async partition denoted by the provided key. Tests in the same + async partition never run concurrently. Tests from different partitions + can run concurrently. This option is only valid when the :async option + is set to true. + Defaults to `nil`. + * `:register` - when `false`, does not register this module within ExUnit server. This means the module won't run when ExUnit suite runs. From 07e5429ff03d3c51ff0db7a1931abdf2d99f3c10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Sat, 12 Oct 2024 11:07:12 +0200 Subject: [PATCH 05/14] Macro.escape() key option --- lib/ex_unit/lib/ex_unit/case.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ex_unit/lib/ex_unit/case.ex b/lib/ex_unit/lib/ex_unit/case.ex index 618afca6aba..6d25047d8b9 100644 --- a/lib/ex_unit/lib/ex_unit/case.ex +++ b/lib/ex_unit/lib/ex_unit/case.ex @@ -583,7 +583,7 @@ defmodule ExUnit.Case do def __ex_unit__(:config) do %{ async?: unquote(async?), - async_partition_key: unquote(async_partition_key), + async_partition_key: unquote(Macro.escape(async_partition_key)), parameterize: unquote(Macro.escape(parameterize)) } end From 8707d42c2794a58b6ab50da13ad6129217c1e584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Sat, 12 Oct 2024 11:09:18 +0200 Subject: [PATCH 06/14] Avoid raising --- lib/ex_unit/lib/ex_unit/case.ex | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/ex_unit/lib/ex_unit/case.ex b/lib/ex_unit/lib/ex_unit/case.ex index 6d25047d8b9..0ad1d0619b5 100644 --- a/lib/ex_unit/lib/ex_unit/case.ex +++ b/lib/ex_unit/lib/ex_unit/case.ex @@ -562,10 +562,6 @@ defmodule ExUnit.Case do async_partition_key = Keyword.get(opts, :async_partition_key, nil) parameterize = Keyword.get(opts, :parameterize, nil) - if async_partition_key != nil and async? == false do - raise ArgumentError, ":async_partition_key is only a valid option when async: true" - end - if not (parameterize == nil or (is_list(parameterize) and Enum.all?(parameterize, &is_map/1))) do raise ArgumentError, ":parameterize must be a list of maps, got: #{inspect(parameterize)}" end From ade76058e69def1443c74de447c9c9bee659608b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Sat, 12 Oct 2024 12:33:37 +0200 Subject: [PATCH 07/14] Change option name to :group --- lib/ex_unit/lib/ex_unit/case.ex | 16 ++++++------- lib/ex_unit/lib/ex_unit/server.ex | 4 ++-- lib/ex_unit/test/ex_unit_test.exs | 38 +++++++++++++++---------------- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/lib/ex_unit/lib/ex_unit/case.ex b/lib/ex_unit/lib/ex_unit/case.ex index 0ad1d0619b5..8dc0c4916a0 100644 --- a/lib/ex_unit/lib/ex_unit/case.ex +++ b/lib/ex_unit/lib/ex_unit/case.ex @@ -12,11 +12,9 @@ defmodule ExUnit.Case do It should be enabled only if tests do not change any global state. Defaults to `false`. - * `:async_partition_key` - configures async tests in this module to run - within an async partition denoted by the provided key. Tests in the same - async partition never run concurrently. Tests from different partitions - can run concurrently. This option is only valid when the :async option - is set to true. + * `:group` - configures async tests in this module to run within a group. + Tests in the same group never run concurrently. Tests from different + groups can run concurrently with `async: true`. Defaults to `nil`. * `:register` - when `false`, does not register this module within @@ -301,7 +299,7 @@ defmodule ExUnit.Case do @type env :: module() | Macro.Env.t() @compile {:no_warn_undefined, [IEx.Pry]} - @reserved [:module, :file, :line, :test, :async, :async_partition_key, :registered, :describe] + @reserved [:module, :file, :line, :test, :async, :group, :registered, :describe] @doc false defmacro __using__(opts) do @@ -324,7 +322,7 @@ defmodule ExUnit.Case do end {register?, opts} = Keyword.pop(opts, :register, true) - {next_opts, opts} = Keyword.split(opts, [:async, :async_partition_key, :parameterize]) + {next_opts, opts} = Keyword.split(opts, [:async, :group, :parameterize]) if opts != [] do IO.warn("unknown options given to ExUnit.Case: #{inspect(opts)}") @@ -559,7 +557,7 @@ defmodule ExUnit.Case do opts = Module.get_attribute(module, :ex_unit_module, []) async? = Keyword.get(opts, :async, false) - async_partition_key = Keyword.get(opts, :async_partition_key, nil) + group = Keyword.get(opts, :group, nil) parameterize = Keyword.get(opts, :parameterize, nil) if not (parameterize == nil or (is_list(parameterize) and Enum.all?(parameterize, &is_map/1))) do @@ -579,7 +577,7 @@ defmodule ExUnit.Case do def __ex_unit__(:config) do %{ async?: unquote(async?), - async_partition_key: unquote(Macro.escape(async_partition_key)), + group: unquote(Macro.escape(group)), parameterize: unquote(Macro.escape(parameterize)) } end diff --git a/lib/ex_unit/lib/ex_unit/server.ex b/lib/ex_unit/lib/ex_unit/server.ex index 505fe91b0c8..ed830c363cf 100644 --- a/lib/ex_unit/lib/ex_unit/server.ex +++ b/lib/ex_unit/lib/ex_unit/server.ex @@ -12,7 +12,7 @@ defmodule ExUnit.Server do def add_module(name, config) do %{ async?: async?, - async_partition_key: async_partition_key, + group: group, parameterize: parameterize } = config @@ -23,7 +23,7 @@ defmodule ExUnit.Server do [{name, %{}}] end - case GenServer.call(@name, {:add, {async?, async_partition_key}, modules}, @timeout) do + case GenServer.call(@name, {:add, {async?, group}, modules}, @timeout) do :ok -> :ok diff --git a/lib/ex_unit/test/ex_unit_test.exs b/lib/ex_unit/test/ex_unit_test.exs index 6ded0c6040f..e554ce663da 100644 --- a/lib/ex_unit/test/ex_unit_test.exs +++ b/lib/ex_unit/test/ex_unit_test.exs @@ -760,50 +760,50 @@ defmodule ExUnitTest do Task.await(test_task) end - test "async tests run concurrently respecting partition keys" do - Process.register(self(), :async_partitioned_tests) + test "async tests run concurrently respecting groups" do + Process.register(self(), :async_grouped_tests) defmodule RedOneTest do - use ExUnit.Case, async: true, async_partition_key: :red + use ExUnit.Case, async: true, group: :red test "red one test" do - send(:async_partitioned_tests, {:red_one, :started}) + send(:async_grouped_tests, {:red_one, :started}) Process.sleep(30) assert true - send(:async_partitioned_tests, {:red_one, :finished}) + send(:async_grouped_tests, {:red_one, :finished}) end end defmodule RedTwoTest do - use ExUnit.Case, async: true, async_partition_key: :red + use ExUnit.Case, async: true, group: :red test "red two test" do - send(:async_partitioned_tests, {:red_two, :started}) + send(:async_grouped_tests, {:red_two, :started}) Process.sleep(10) assert true - send(:async_partitioned_tests, {:red_two, :finished}) + send(:async_grouped_tests, {:red_two, :finished}) end end defmodule BlueOneTest do - use ExUnit.Case, async: true, async_partition_key: :blue + use ExUnit.Case, async: true, group: :blue test "blue one test" do - send(:async_partitioned_tests, {:blue_one, :started}) + send(:async_grouped_tests, {:blue_one, :started}) Process.sleep(10) assert true - send(:async_partitioned_tests, {:blue_one, :finished}) + send(:async_grouped_tests, {:blue_one, :finished}) end end defmodule BlueTwoTest do - use ExUnit.Case, async: true, async_partition_key: :blue + use ExUnit.Case, async: true, group: :blue test "blue two test" do - send(:async_partitioned_tests, {:blue_two, :started}) + send(:async_grouped_tests, {:blue_two, :started}) Process.sleep(10) assert true - send(:async_partitioned_tests, {:blue_two, :finished}) + send(:async_grouped_tests, {:blue_two, :finished}) end end @@ -1218,9 +1218,9 @@ defmodule ExUnitTest do assert third =~ "ThirdTestFIFO" end - test "async test partitions are run in compile order (FIFO)" do + test "async test groups are run in compile order (FIFO)" do defmodule RedOneFIFO do - use ExUnit.Case, async: true, async_partition_key: :red + use ExUnit.Case, async: true, group: :red test "red one test" do assert true @@ -1228,7 +1228,7 @@ defmodule ExUnitTest do end defmodule BlueOneFIFO do - use ExUnit.Case, async: true, async_partition_key: :blue + use ExUnit.Case, async: true, group: :blue test "blue one test" do assert true @@ -1236,7 +1236,7 @@ defmodule ExUnitTest do end defmodule RedTwoFIFO do - use ExUnit.Case, async: true, async_partition_key: :red + use ExUnit.Case, async: true, group: :red test "red two test" do assert true @@ -1244,7 +1244,7 @@ defmodule ExUnitTest do end defmodule BlueTwoFIFO do - use ExUnit.Case, async: true, async_partition_key: :blue + use ExUnit.Case, async: true, group: :blue test "blue two test" do assert true From 3b9d737354a2964bd3ac188857526d9603181146 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Sat, 12 Oct 2024 14:02:39 +0200 Subject: [PATCH 08/14] Refactor for consistency, simplify state --- lib/ex_unit/lib/ex_unit/runner.ex | 15 ++-- lib/ex_unit/lib/ex_unit/server.ex | 140 ++++++++++++++---------------- 2 files changed, 74 insertions(+), 81 deletions(-) diff --git a/lib/ex_unit/lib/ex_unit/runner.ex b/lib/ex_unit/lib/ex_unit/runner.ex index babd131a540..83025786db7 100644 --- a/lib/ex_unit/lib/ex_unit/runner.ex +++ b/lib/ex_unit/lib/ex_unit/runner.ex @@ -106,9 +106,9 @@ defmodule ExUnit.Runner do async_loop(config, running, async_once?, modules_to_restore) # Slots are available, start with async modules - async_partitions = ExUnit.Server.take_async_partitions(available) -> - running = spawn_modules(config, async_partitions, true, running) - modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_partitions) + async_modules = ExUnit.Server.take_async_modules(available) -> + running = spawn_modules(config, async_modules, true, running) + modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_modules) async_loop(config, running, true, modules_to_restore) true -> @@ -163,17 +163,16 @@ defmodule ExUnit.Runner do defp spawn_modules( config, - [{_partition_key, partition_modules} | modules], + [{_group, group_modules} | modules], true = async?, running - ) - when is_list(partition_modules) do + ) do if max_failures_reached?(config) do running else {pid, ref} = spawn_monitor(fn -> - Enum.each(partition_modules, fn {module, params} -> + Enum.each(group_modules, fn {module, params} -> run_module(config, module, async?, params) end) end) @@ -182,7 +181,7 @@ defmodule ExUnit.Runner do end end - defp spawn_modules(config, [{module, params} | modules], async?, running) do + defp spawn_modules(config, [{_group, [{module, params}]} | modules], async?, running) do if max_failures_reached?(config) do running else diff --git a/lib/ex_unit/lib/ex_unit/server.ex b/lib/ex_unit/lib/ex_unit/server.ex index ed830c363cf..221001c09d0 100644 --- a/lib/ex_unit/lib/ex_unit/server.ex +++ b/lib/ex_unit/lib/ex_unit/server.ex @@ -36,16 +36,16 @@ defmodule ExUnit.Server do GenServer.call(@name, {:modules_loaded, uniq?}, @timeout) end - def take_async_partitions(count) do - GenServer.call(@name, {:take_async_partitions, count}, @timeout) + def take_async_modules(count) do + GenServer.call(@name, {:take_async_modules, count}, @timeout) end def take_sync_modules() do GenServer.call(@name, :take_sync_modules, @timeout) end - def restore_modules(async_partitions, sync_modules) do - GenServer.call(@name, {:restore_modules, async_partitions, sync_modules}, @timeout) + def restore_modules(async_modules, sync_modules) do + GenServer.call(@name, {:restore_modules, async_modules, sync_modules}, @timeout) end ## Callbacks @@ -57,8 +57,8 @@ defmodule ExUnit.Server do state = %{ loaded: System.monotonic_time(), waiting: nil, - async_partitions: %{}, - async_partition_keys: :queue.new(), + async_groups: MapSet.new(), + async_modules: :queue.new(), sync_modules: :queue.new() } @@ -66,30 +66,27 @@ defmodule ExUnit.Server do end # Called on demand until we are signaled all modules are loaded. - def handle_call({:take_async_partitions, count}, from, %{waiting: nil} = state) do - {:noreply, take_module_partitions(%{state | waiting: {from, count}})} + def handle_call({:take_async_modules, count}, from, %{waiting: nil} = state) do + {:noreply, take_modules(%{state | waiting: {from, count}})} end # Called once after all async modules have been sent and reverts the state. def handle_call(:take_sync_modules, _from, state) do - %{waiting: nil, loaded: :done, async_partition_keys: async_partition_keys} = state - 0 = :queue.len(async_partition_keys) + %{waiting: nil, loaded: :done, async_modules: async_modules} = state + 0 = :queue.len(async_modules) {:reply, :queue.to_list(state.sync_modules), %{state | sync_modules: :queue.new(), loaded: System.monotonic_time()}} end # Called by the runner when --repeat-until-failure is used. - def handle_call({:restore_modules, async_partitions, sync_modules}, _from, state) do - async_partition_keys = - Enum.map(async_partitions, fn {partition_key, _modules} -> partition_key end) - + def handle_call({:restore_modules, async_modules, sync_modules}, _from, state) do {:reply, :ok, %{ state | loaded: :done, - async_partitions: Map.new(async_partitions), - async_partition_keys: :queue.from_list(async_partition_keys), + async_groups: MapSet.new(async_modules, fn {group, _} -> group end), + async_modules: :queue.from_list(async_modules), sync_modules: :queue.from_list(sync_modules) }} end @@ -102,19 +99,20 @@ defmodule ExUnit.Server do when is_integer(loaded) do state = if uniq? do - async_partitions = - state.async_partitions - |> Enum.map(fn {partition_key, modules} -> - partition_modules = Enum.uniq(modules) - {partition_key, partition_modules} + async_modules = + state.async_modules + |> :queue.to_list() + |> Enum.uniq() + |> Enum.map(fn {group, modules} -> + {group, Enum.uniq(modules)} end) - |> Map.new() + |> :queue.from_list() sync_modules = :queue.to_list(state.sync_modules) |> Enum.uniq() |> :queue.from_list() %{ state - | async_partitions: async_partitions, + | async_modules: async_modules, sync_modules: sync_modules } else @@ -122,98 +120,94 @@ defmodule ExUnit.Server do end diff = System.convert_time_unit(System.monotonic_time() - loaded, :native, :microsecond) - {:reply, diff, take_module_partitions(%{state | loaded: :done})} + {:reply, diff, take_modules(%{state | loaded: :done})} end - def handle_call({:add, {true, async_partition_key}, names}, _from, %{loaded: loaded} = state) + def handle_call({:add, {true, group}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = Enum.reduce(names, state, fn name, updated_state -> - async_partition_key = async_partition_key || default_async_partition_key(name) - partition_key_exists? = Map.has_key?(state.async_partitions, async_partition_key) - - updated_state - |> update_in([:async_partitions], fn async_partitions -> - {_, async_partitions} = - Map.get_and_update(async_partitions, async_partition_key, fn - nil -> - {nil, [name]} - - modules -> - {modules, [name | modules]} - end) + group = group || default_group(name) + group_exists? = MapSet.member?(updated_state.async_groups, group) - async_partitions - end) - |> update_in([:async_partition_keys], fn q -> - if partition_key_exists? do + if group_exists? do + update_in(updated_state.async_modules, fn q -> q - else - :queue.in(async_partition_key, q) - end - end) + |> :queue.to_list() + |> Enum.map(fn + {^group, modules} -> + {group, [name | modules]} + + other -> + other + end) + |> :queue.from_list() + end) + else + updated_state + |> update_in([:async_modules], &:queue.in({group, [name]}, &1)) + |> update_in([:async_groups], &MapSet.put(&1, group)) + end end) - {:reply, :ok, take_module_partitions(state)} + {:reply, :ok, take_modules(state)} end - def handle_call({:add, {false, _async_partition_key}, names}, _from, %{loaded: loaded} = state) + def handle_call({:add, {false, group}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = - update_in(state.sync_modules, &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end)) + update_in( + state.sync_modules, + &Enum.reduce(names, &1, fn name, q -> :queue.in({group, [name]}, q) end) + ) {:reply, :ok, state} end - def handle_call({:add, {_async?, _async_partition_key}, _names}, _from, state), + def handle_call({:add, {_async?, _group}, _names}, _from, state), do: {:reply, :already_running, state} - defp default_async_partition_key({module, params}) do + defp default_group({module, params}) do if params == %{} do module else - # if no async partition is specified, parameters run concurrently + # if no group is specified, parameters need to run concurrently "#{module}_#{:erlang.phash2(params)}" end end - defp take_module_partitions(%{waiting: nil} = state) do + defp take_modules(%{waiting: nil} = state) do state end - defp take_module_partitions(%{waiting: {from, count}} = state) do - has_async_partitions? = not :queue.is_empty(state.async_partition_keys) + defp take_modules(%{waiting: {from, count}} = state) do + has_async_modules? = not :queue.is_empty(state.async_modules) cond do - not has_async_partitions? and state.loaded == :done -> + not has_async_modules? and state.loaded == :done -> GenServer.reply(from, nil) %{state | waiting: nil} - not has_async_partitions? -> + not has_async_modules? -> state true -> - {partition_keys, remaining_partition_keys} = - take_until(count, state.async_partition_keys) - - {async_partitions, remaining_partitions} = - Enum.map_reduce(partition_keys, state.async_partitions, fn partition_key, - remaining_partitions -> - {partition_modules, remaining_partitions} = - Map.pop!(remaining_partitions, partition_key) - - { - {partition_key, Enum.reverse(partition_modules)}, - remaining_partitions - } + {async_modules, remaining_modules} = take_until(count, state.async_modules) + + remaining_groups = + Enum.reduce(async_modules, state.async_groups, fn {group, _modules}, groups -> + MapSet.delete(groups, group) end) - GenServer.reply(from, async_partitions) + async_modules = + Enum.map(async_modules, fn {group, modules} -> {group, Enum.reverse(modules)} end) + + GenServer.reply(from, async_modules) %{ state - | async_partition_keys: remaining_partition_keys, - async_partitions: remaining_partitions, + | async_groups: remaining_groups, + async_modules: remaining_modules, waiting: nil } end From c1cfa3f30631b95bca6917fddc9093e3b8c6f632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Sat, 12 Oct 2024 14:12:53 +0200 Subject: [PATCH 09/14] Remove unneeded spawn_modules/4 function head --- lib/ex_unit/lib/ex_unit/runner.ex | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/lib/ex_unit/lib/ex_unit/runner.ex b/lib/ex_unit/lib/ex_unit/runner.ex index 83025786db7..d5a33a56a93 100644 --- a/lib/ex_unit/lib/ex_unit/runner.ex +++ b/lib/ex_unit/lib/ex_unit/runner.ex @@ -164,7 +164,7 @@ defmodule ExUnit.Runner do defp spawn_modules( config, [{_group, group_modules} | modules], - true = async?, + async?, running ) do if max_failures_reached?(config) do @@ -181,15 +181,6 @@ defmodule ExUnit.Runner do end end - defp spawn_modules(config, [{_group, [{module, params}]} | modules], async?, running) do - if max_failures_reached?(config) do - running - else - {pid, ref} = spawn_monitor(fn -> run_module(config, module, async?, params) end) - spawn_modules(config, modules, async?, Map.put(running, ref, pid)) - end - end - defp maybe_store_modules(nil, _type, _modules), do: nil defp maybe_store_modules({async, sync}, :async, modules), do: {async ++ modules, sync} defp maybe_store_modules({async, sync}, :sync, modules), do: {async, sync ++ modules} From 27b463ddd160f907648613daf6f25448beb42131 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Sat, 12 Oct 2024 22:59:04 +0200 Subject: [PATCH 10/14] Simplify ExUnit.Server --- lib/ex_unit/lib/ex_unit/server.ex | 86 ++++++++++++++----------------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/lib/ex_unit/lib/ex_unit/server.ex b/lib/ex_unit/lib/ex_unit/server.ex index 221001c09d0..cd596ba58a0 100644 --- a/lib/ex_unit/lib/ex_unit/server.ex +++ b/lib/ex_unit/lib/ex_unit/server.ex @@ -57,7 +57,7 @@ defmodule ExUnit.Server do state = %{ loaded: System.monotonic_time(), waiting: nil, - async_groups: MapSet.new(), + async_groups: %{}, async_modules: :queue.new(), sync_modules: :queue.new() } @@ -81,11 +81,20 @@ defmodule ExUnit.Server do # Called by the runner when --repeat-until-failure is used. def handle_call({:restore_modules, async_modules, sync_modules}, _from, state) do + {async_modules, async_groups} = + Enum.reduce(async_modules, {[], []}, fn + {nil, [module]}, {modules, groups} -> + {[{:module, module} | modules], groups} + + {group, group_modules}, {modules, groups} -> + {[{:group, group} | modules], Map.put(groups, group, group_modules)} + end) + {:reply, :ok, %{ state | loaded: :done, - async_groups: MapSet.new(async_modules, fn {group, _} -> group end), + async_groups: async_groups, async_modules: :queue.from_list(async_modules), sync_modules: :queue.from_list(sync_modules) }} @@ -123,59 +132,42 @@ defmodule ExUnit.Server do {:reply, diff, take_modules(%{state | loaded: :done})} end - def handle_call({:add, {true, group}, names}, _from, %{loaded: loaded} = state) + def handle_call({:add, {false = _async, _group}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = - Enum.reduce(names, state, fn name, updated_state -> - group = group || default_group(name) - group_exists? = MapSet.member?(updated_state.async_groups, group) - - if group_exists? do - update_in(updated_state.async_modules, fn q -> - q - |> :queue.to_list() - |> Enum.map(fn - {^group, modules} -> - {group, [name | modules]} - - other -> - other - end) - |> :queue.from_list() - end) - else - updated_state - |> update_in([:async_modules], &:queue.in({group, [name]}, &1)) - |> update_in([:async_groups], &MapSet.put(&1, group)) - end - end) + update_in(state.sync_modules, &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end)) - {:reply, :ok, take_modules(state)} + {:reply, :ok, state} end - def handle_call({:add, {false, group}, names}, _from, %{loaded: loaded} = state) + def handle_call({:add, {true = _async, nil = _group}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = update_in( - state.sync_modules, - &Enum.reduce(names, &1, fn name, q -> :queue.in({group, [name]}, q) end) + state.async_modules, + &Enum.reduce(names, &1, fn name, q -> :queue.in({:module, name}, q) end) ) {:reply, :ok, state} end - def handle_call({:add, {_async?, _group}, _names}, _from, state), - do: {:reply, :already_running, state} + def handle_call({:add, {true = _async, group}, names}, _from, %{loaded: loaded} = state) + when is_integer(loaded) do + state = + if Map.has_key?(state.async_groups, group) do + update_in(state.async_groups[group], &(names ++ &1)) + else + state + |> put_in([:async_groups, group], names) + |> update_in([:async_modules], &:queue.in({:group, group}, &1)) + end - defp default_group({module, params}) do - if params == %{} do - module - else - # if no group is specified, parameters need to run concurrently - "#{module}_#{:erlang.phash2(params)}" - end + {:reply, :ok, state} end + def handle_call({:add, {_async?, _group}, _names}, _from, state), + do: {:reply, :already_running, state} + defp take_modules(%{waiting: nil} = state) do state end @@ -194,15 +186,17 @@ defmodule ExUnit.Server do true -> {async_modules, remaining_modules} = take_until(count, state.async_modules) - remaining_groups = - Enum.reduce(async_modules, state.async_groups, fn {group, _modules}, groups -> - MapSet.delete(groups, group) - end) + {async_modules, remaining_groups} = + Enum.reduce(async_modules, {[], state.async_groups}, fn + {:module, module}, {collected_modules, async_groups} -> + {[{nil, [module]} | collected_modules], async_groups} - async_modules = - Enum.map(async_modules, fn {group, modules} -> {group, Enum.reverse(modules)} end) + {:group, group}, {collected_modules, async_groups} -> + {group_modules, async_groups} = Map.pop!(async_groups, group) + {[{group, group_modules} | collected_modules], async_groups} + end) - GenServer.reply(from, async_modules) + GenServer.reply(from, Enum.reverse(async_modules)) %{ state From 55f38a432395e9fede81b8ebf7052ee649964c4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Sat, 12 Oct 2024 23:06:58 +0200 Subject: [PATCH 11/14] Remove :group from reserved list --- lib/ex_unit/lib/ex_unit/case.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ex_unit/lib/ex_unit/case.ex b/lib/ex_unit/lib/ex_unit/case.ex index 8dc0c4916a0..5605b76a02a 100644 --- a/lib/ex_unit/lib/ex_unit/case.ex +++ b/lib/ex_unit/lib/ex_unit/case.ex @@ -299,7 +299,7 @@ defmodule ExUnit.Case do @type env :: module() | Macro.Env.t() @compile {:no_warn_undefined, [IEx.Pry]} - @reserved [:module, :file, :line, :test, :async, :group, :registered, :describe] + @reserved [:module, :file, :line, :test, :async, :registered, :describe] @doc false defmacro __using__(opts) do From fe3f66c5bfe18e07d1434985f07e0997431d987b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Sun, 13 Oct 2024 12:10:05 +0200 Subject: [PATCH 12/14] Update unique code to match new state format --- lib/ex_unit/lib/ex_unit/server.ex | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/ex_unit/lib/ex_unit/server.ex b/lib/ex_unit/lib/ex_unit/server.ex index cd596ba58a0..4a6d7ff95cf 100644 --- a/lib/ex_unit/lib/ex_unit/server.ex +++ b/lib/ex_unit/lib/ex_unit/server.ex @@ -108,20 +108,18 @@ defmodule ExUnit.Server do when is_integer(loaded) do state = if uniq? do - async_modules = - state.async_modules - |> :queue.to_list() - |> Enum.uniq() - |> Enum.map(fn {group, modules} -> + async_groups = + Map.new(state.async_groups, fn {group, modules} -> {group, Enum.uniq(modules)} end) - |> :queue.from_list() + async_modules = :queue.to_list(state.async_modules) |> Enum.uniq() |> :queue.from_list() sync_modules = :queue.to_list(state.sync_modules) |> Enum.uniq() |> :queue.from_list() %{ state - | async_modules: async_modules, + | async_groups: async_groups, + async_modules: async_modules, sync_modules: sync_modules } else From 9882f65e6f0de58923f94e48413e5d6da368e7c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kova=C4=8D?= Date: Sun, 13 Oct 2024 12:12:17 +0200 Subject: [PATCH 13/14] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: José Valim --- lib/ex_unit/lib/ex_unit/case.ex | 6 +++--- lib/ex_unit/lib/ex_unit/server.ex | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/ex_unit/lib/ex_unit/case.ex b/lib/ex_unit/lib/ex_unit/case.ex index 5605b76a02a..c99496133d8 100644 --- a/lib/ex_unit/lib/ex_unit/case.ex +++ b/lib/ex_unit/lib/ex_unit/case.ex @@ -12,10 +12,10 @@ defmodule ExUnit.Case do It should be enabled only if tests do not change any global state. Defaults to `false`. - * `:group` - configures async tests in this module to run within a group. + * `:group` - configures the group this module belongs to. Tests in the same group never run concurrently. Tests from different - groups can run concurrently with `async: true`. - Defaults to `nil`. + groups (or with no groups) can run concurrently when `async: true` + is given. By default, belongs to no group (defaults to `nil`). * `:register` - when `false`, does not register this module within ExUnit server. This means the module won't run when ExUnit suite runs. diff --git a/lib/ex_unit/lib/ex_unit/server.ex b/lib/ex_unit/lib/ex_unit/server.ex index 4a6d7ff95cf..17037f9a020 100644 --- a/lib/ex_unit/lib/ex_unit/server.ex +++ b/lib/ex_unit/lib/ex_unit/server.ex @@ -152,13 +152,13 @@ defmodule ExUnit.Server do def handle_call({:add, {true = _async, group}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = - if Map.has_key?(state.async_groups, group) do - update_in(state.async_groups[group], &(names ++ &1)) - else - state - |> put_in([:async_groups, group], names) - |> update_in([:async_modules], &:queue.in({:group, group}, &1)) - end + case state.async_groups do + %{^group => entries} -> + {%{async_groups | group => names ++ entries}, state.async_modules} + %{} -> + {Map.put(async_groups, group, names), + :queue.in({:group, group}, state.async_modules)} + end {:reply, :ok, state} end @@ -191,7 +191,7 @@ defmodule ExUnit.Server do {:group, group}, {collected_modules, async_groups} -> {group_modules, async_groups} = Map.pop!(async_groups, group) - {[{group, group_modules} | collected_modules], async_groups} + {[{group, Enum.reverse(group_modules)} | collected_modules], async_groups} end) GenServer.reply(from, Enum.reverse(async_modules)) From fa5e52cc955390d9d0aecb3e4044dd7da3d0db69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dino=20Kovac=CC=8C?= Date: Sun, 13 Oct 2024 12:14:53 +0200 Subject: [PATCH 14/14] Fix references after applying suggestions --- lib/ex_unit/lib/ex_unit/server.ex | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/ex_unit/lib/ex_unit/server.ex b/lib/ex_unit/lib/ex_unit/server.ex index 17037f9a020..00c46140a89 100644 --- a/lib/ex_unit/lib/ex_unit/server.ex +++ b/lib/ex_unit/lib/ex_unit/server.ex @@ -152,13 +152,13 @@ defmodule ExUnit.Server do def handle_call({:add, {true = _async, group}, names}, _from, %{loaded: loaded} = state) when is_integer(loaded) do state = - case state.async_groups do - %{^group => entries} -> - {%{async_groups | group => names ++ entries}, state.async_modules} - %{} -> - {Map.put(async_groups, group, names), - :queue.in({:group, group}, state.async_modules)} - end + case state.async_groups do + %{^group => entries} = async_groups -> + {%{async_groups | group => names ++ entries}, state.async_modules} + + %{} = async_groups -> + {Map.put(async_groups, group, names), :queue.in({:group, group}, state.async_modules)} + end {:reply, :ok, state} end