Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions lib/ex_unit/lib/ex_unit/case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ defmodule ExUnit.Case do
It should be enabled only if tests do not change any global state.
Defaults to `false`.

* `:async_partition_key` - configures async tests in this module to run
within an async partition denoted by the provided key. Tests in the same
async partition never run concurrently. Tests from different partitions
can run concurrently. This option is only valid when the :async option
is set to true.
Defaults to `nil`.

* `:register` - when `false`, does not register this module within
ExUnit server. This means the module won't run when ExUnit suite runs.

Expand Down Expand Up @@ -294,7 +301,7 @@ defmodule ExUnit.Case do

@type env :: module() | Macro.Env.t()
@compile {:no_warn_undefined, [IEx.Pry]}
@reserved [:module, :file, :line, :test, :async, :registered, :describe]
@reserved [:module, :file, :line, :test, :async, :async_partition_key, :registered, :describe]

@doc false
defmacro __using__(opts) do
Expand All @@ -317,7 +324,7 @@ defmodule ExUnit.Case do
end

{register?, opts} = Keyword.pop(opts, :register, true)
{next_opts, opts} = Keyword.split(opts, [:async, :parameterize])
{next_opts, opts} = Keyword.split(opts, [:async, :async_partition_key, :parameterize])

if opts != [] do
IO.warn("unknown options given to ExUnit.Case: #{inspect(opts)}")
Expand Down Expand Up @@ -552,8 +559,13 @@ defmodule ExUnit.Case do

opts = Module.get_attribute(module, :ex_unit_module, [])
async? = Keyword.get(opts, :async, false)
async_partition_key = Keyword.get(opts, :async_partition_key, nil)
parameterize = Keyword.get(opts, :parameterize, nil)

if async_partition_key != nil and async? == false do
raise ArgumentError, ":async_partition_key is only a valid option when async: true"
end

if not (parameterize == nil or (is_list(parameterize) and Enum.all?(parameterize, &is_map/1))) do
raise ArgumentError, ":parameterize must be a list of maps, got: #{inspect(parameterize)}"
end
Expand All @@ -569,7 +581,11 @@ defmodule ExUnit.Case do
end

def __ex_unit__(:config) do
{unquote(async?), unquote(Macro.escape(parameterize))}
%{
async?: unquote(async?),
async_partition_key: unquote(async_partition_key),
parameterize: unquote(Macro.escape(parameterize))
}
end
end
end
Expand Down
27 changes: 24 additions & 3 deletions lib/ex_unit/lib/ex_unit/runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ defmodule ExUnit.Runner do
async_loop(config, running, async_once?, modules_to_restore)

# Slots are available, start with async modules
async_modules = ExUnit.Server.take_async_modules(available) ->
running = spawn_modules(config, async_modules, true, running)
modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_modules)
async_partitions = ExUnit.Server.take_async_partitions(available) ->
running = spawn_modules(config, async_partitions, true, running)
modules_to_restore = maybe_store_modules(modules_to_restore, :async, async_partitions)
async_loop(config, running, true, modules_to_restore)

true ->
Expand Down Expand Up @@ -161,6 +161,27 @@ defmodule ExUnit.Runner do
running
end

defp spawn_modules(
config,
[{_partition_key, partition_modules} | modules],
true = async?,
running
)
when is_list(partition_modules) do
if max_failures_reached?(config) do
running
else
{pid, ref} =
spawn_monitor(fn ->
Enum.each(partition_modules, fn {module, params} ->
run_module(config, module, async?, params)
end)
end)

spawn_modules(config, modules, async?, Map.put(running, ref, pid))
end
end

defp spawn_modules(config, [{module, params} | modules], async?, running) do
if max_failures_reached?(config) do
running
Expand Down
138 changes: 103 additions & 35 deletions lib/ex_unit/lib/ex_unit/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ defmodule ExUnit.Server do
GenServer.start_link(__MODULE__, :ok, name: @name)
end

def add_module(name, {async?, parameterize}) do
def add_module(name, config) do
%{
async?: async?,
async_partition_key: async_partition_key,
parameterize: parameterize
} = config

modules =
if parameterize do
Enum.map(parameterize, &{name, &1})
else
[{name, %{}}]
end

case GenServer.call(@name, {:add, async?, modules}, @timeout) do
case GenServer.call(@name, {:add, {async?, async_partition_key}, modules}, @timeout) do
:ok ->
:ok

Expand All @@ -30,16 +36,16 @@ defmodule ExUnit.Server do
GenServer.call(@name, {:modules_loaded, uniq?}, @timeout)
end

def take_async_modules(count) do
GenServer.call(@name, {:take_async_modules, count}, @timeout)
def take_async_partitions(count) do
GenServer.call(@name, {:take_async_partitions, count}, @timeout)
end

def take_sync_modules() do
GenServer.call(@name, :take_sync_modules, @timeout)
end

def restore_modules(async_modules, sync_modules) do
GenServer.call(@name, {:restore_modules, async_modules, sync_modules}, @timeout)
def restore_modules(async_partitions, sync_modules) do
GenServer.call(@name, {:restore_modules, async_partitions, sync_modules}, @timeout)
end

## Callbacks
Expand All @@ -51,34 +57,39 @@ defmodule ExUnit.Server do
state = %{
loaded: System.monotonic_time(),
waiting: nil,
async_modules: :queue.new(),
async_partitions: %{},
async_partition_keys: :queue.new(),
sync_modules: :queue.new()
}

{:ok, state}
end

# Called on demand until we are signaled all modules are loaded.
def handle_call({:take_async_modules, count}, from, %{waiting: nil} = state) do
{:noreply, take_modules(%{state | waiting: {from, count}})}
def handle_call({:take_async_partitions, count}, from, %{waiting: nil} = state) do
{:noreply, take_module_partitions(%{state | waiting: {from, count}})}
end

# Called once after all async modules have been sent and reverts the state.
def handle_call(:take_sync_modules, _from, state) do
%{waiting: nil, loaded: :done, async_modules: async_modules} = state
0 = :queue.len(async_modules)
%{waiting: nil, loaded: :done, async_partition_keys: async_partition_keys} = state
0 = :queue.len(async_partition_keys)

{:reply, :queue.to_list(state.sync_modules),
%{state | sync_modules: :queue.new(), loaded: System.monotonic_time()}}
end

# Called by the runner when --repeat-until-failure is used.
def handle_call({:restore_modules, async_modules, sync_modules}, _from, state) do
def handle_call({:restore_modules, async_partitions, sync_modules}, _from, state) do
async_partition_keys =
Enum.map(async_partitions, fn {partition_key, _modules} -> partition_key end)

{:reply, :ok,
%{
state
| loaded: :done,
async_modules: :queue.from_list(async_modules),
async_partitions: Map.new(async_partitions),
async_partition_keys: :queue.from_list(async_partition_keys),
sync_modules: :queue.from_list(sync_modules)
}}
end
Expand All @@ -91,63 +102,120 @@ defmodule ExUnit.Server do
when is_integer(loaded) do
state =
if uniq? do
async_modules = :queue.to_list(state.async_modules) |> Enum.uniq() |> :queue.from_list()
async_partitions =
state.async_partitions
|> Enum.map(fn {partition_key, modules} ->
partition_modules = Enum.uniq(modules)
{partition_key, partition_modules}
end)
|> Map.new()

sync_modules = :queue.to_list(state.sync_modules) |> Enum.uniq() |> :queue.from_list()

%{
state
| async_modules: async_modules,
| async_partitions: async_partitions,
sync_modules: sync_modules
}
else
state
end

diff = System.convert_time_unit(System.monotonic_time() - loaded, :native, :microsecond)
{:reply, diff, take_modules(%{state | loaded: :done})}
{:reply, diff, take_module_partitions(%{state | loaded: :done})}
end

def handle_call({:add, true, names}, _from, %{loaded: loaded} = state)
def handle_call({:add, {true, async_partition_key}, names}, _from, %{loaded: loaded} = state)
when is_integer(loaded) do
state =
update_in(
state.async_modules,
&Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end)
)

{:reply, :ok, take_modules(state)}
end

def handle_call({:add, false, names}, _from, %{loaded: loaded} = state)
Enum.reduce(names, state, fn name, updated_state ->
async_partition_key = async_partition_key || default_async_partition_key(name)
partition_key_exists? = Map.has_key?(state.async_partitions, async_partition_key)

updated_state
|> update_in([:async_partitions], fn async_partitions ->
{_, async_partitions} =
Map.get_and_update(async_partitions, async_partition_key, fn
nil ->
{nil, [name]}

modules ->
{modules, [name | modules]}
end)

async_partitions
end)
|> update_in([:async_partition_keys], fn q ->
if partition_key_exists? do
q
else
:queue.in(async_partition_key, q)
end
end)
end)

{:reply, :ok, take_module_partitions(state)}
end

def handle_call({:add, {false, _async_partition_key}, names}, _from, %{loaded: loaded} = state)
when is_integer(loaded) do
state =
update_in(state.sync_modules, &Enum.reduce(names, &1, fn name, q -> :queue.in(name, q) end))

{:reply, :ok, state}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lost a call to take_modules here.

end

def handle_call({:add, _async?, _names}, _from, state),
def handle_call({:add, {_async?, _async_partition_key}, _names}, _from, state),
do: {:reply, :already_running, state}

defp take_modules(%{waiting: nil} = state) do
defp default_async_partition_key({module, params}) do
if params == %{} do
module
else
# if no async partition is specified, parameters run concurrently
"#{module}_#{:erlang.phash2(params)}"
end
end

defp take_module_partitions(%{waiting: nil} = state) do
state
end

defp take_modules(%{waiting: {from, count}} = state) do
has_async_modules? = not :queue.is_empty(state.async_modules)
defp take_module_partitions(%{waiting: {from, count}} = state) do
has_async_partitions? = not :queue.is_empty(state.async_partition_keys)

cond do
not has_async_modules? and state.loaded == :done ->
not has_async_partitions? and state.loaded == :done ->
GenServer.reply(from, nil)
%{state | waiting: nil}

not has_async_modules? ->
not has_async_partitions? ->
state

true ->
{modules, async_modules} = take_until(count, state.async_modules)
GenServer.reply(from, modules)
%{state | async_modules: async_modules, waiting: nil}
{partition_keys, remaining_partition_keys} =
take_until(count, state.async_partition_keys)

{async_partitions, remaining_partitions} =
Enum.map_reduce(partition_keys, state.async_partitions, fn partition_key,
remaining_partitions ->
{partition_modules, remaining_partitions} =
Map.pop!(remaining_partitions, partition_key)

{
{partition_key, Enum.reverse(partition_modules)},
remaining_partitions
}
end)

GenServer.reply(from, async_partitions)

%{
state
| async_partition_keys: remaining_partition_keys,
async_partitions: remaining_partitions,
waiting: nil
}
end
end

Expand Down
Loading