Skip to content

Commit 0533ce2

Browse files
authored
Merge pull request #6 from vasuadari/task/add_mock_api
Add Flume.Mock for testing
2 parents ff64ab6 + 83065d2 commit 0533ce2

File tree

9 files changed

+355
-42
lines changed

9 files changed

+355
-42
lines changed

README.md

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Flume is a job processing system backed by [GenStage](https://github.com/elixir-
1818
- [Batch Processing](#batch-processing)
1919
- [Pipeline Control](#pipeline-control)
2020
- [Instrumentation](#instrumentation)
21-
- [Testing](#testing)
21+
- [Writing Tests](#writing-tests)
2222
- [Roadmap](#roadmap)
2323
- [References](#references)
2424
- [Contributing](#contributing)
@@ -329,17 +329,57 @@ Following metrics are emitted:
329329
- duration of a job/worker
330330
- count, latency and payload_size of dequeued jobs
331331

332-
## Testing
332+
## Writing Tests
333333

334-
Use these guidelines for running tests:
335-
336-
* Disable flume pipelines in test env
334+
**To enable mock in the test environment**
337335

338336
**config/test.exs**
339337

340338
```elixir
341-
config :flume,
342-
pipelines: []
339+
config :flume, mock: true
340+
```
341+
342+
**To mock individual test**
343+
344+
```elixir
345+
import Flume.Mock
346+
...
347+
describe "enqueue/4" do
348+
test "mock works" do
349+
with_flume_mock do
350+
Flume.enqueue(:test, List, :last, [[1]])
351+
352+
assert_receive %{
353+
queue: :test,
354+
worker: List,
355+
function_name: :last,
356+
args: [[1]]
357+
}
358+
end
359+
end
360+
end
361+
```
362+
363+
**To enable mock for all tests in a module**
364+
365+
```elixir
366+
defmodule ListTest do
367+
use ExUnit.Case, async: true
368+
use Flume.Mock
369+
370+
describe "enqueue/4" do
371+
test "mock works" do
372+
Flume.enqueue(:test, List, :last, [[1]])
373+
374+
assert_receive %{
375+
queue: :test,
376+
worker: List,
377+
function_name: :last,
378+
args: [[1]]
379+
}
380+
end
381+
end
382+
end
343383
```
344384

345385
## Roadmap

lib/flume.ex

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@ defmodule Flume do
1616
end
1717

1818
def start_link do
19-
children = [
20-
supervisor(Flume.Redis.Supervisor, []),
21-
worker(Flume.Queue.Scheduler, [Config.scheduler_opts()]),
22-
supervisor(Flume.Pipeline.SystemEvent.Supervisor, []),
23-
supervisor(Task.Supervisor, [[name: Flume.SafeApplySupervisor]])
24-
]
25-
26-
# This order matters, first we need to start all redix worker processes
27-
# then all other processes.
28-
children = children ++ Flume.Support.Pipelines.list()
19+
children =
20+
if Config.mock() do
21+
[]
22+
else
23+
# This order matters, first we need to start all redix worker processes
24+
# then all other processes.
25+
[
26+
supervisor(Flume.Redis.Supervisor, []),
27+
worker(Flume.Queue.Scheduler, [Config.scheduler_opts()]),
28+
supervisor(Flume.Pipeline.SystemEvent.Supervisor, []),
29+
supervisor(Task.Supervisor, [[name: Flume.SafeApplySupervisor]])
30+
] ++ Flume.Support.Pipelines.list()
31+
end
2932

3033
opts = [
3134
strategy: :one_for_one,

lib/flume/api.ex

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,52 +5,61 @@ defmodule Flume.API do
55
alias Flume.{Config, Pipeline}
66
alias Flume.Queue.Manager
77

8-
def enqueue(
9-
queue,
10-
worker,
11-
function_name \\ :perform,
12-
args,
13-
opts \\ []
14-
) do
15-
Manager.enqueue(
16-
namespace(),
8+
def bulk_enqueue(queue, jobs) do
9+
apply(Flume.Config.queue_api_module(), :bulk_enqueue, [queue, jobs])
10+
end
11+
12+
def bulk_enqueue(queue, jobs, opts) do
13+
apply(Flume.Config.queue_api_module(), :bulk_enqueue, [queue, jobs, opts])
14+
end
15+
16+
def enqueue(queue, worker, args) do
17+
apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, args])
18+
end
19+
20+
def enqueue(queue, worker, function_name, args) do
21+
apply(Flume.Config.queue_api_module(), :enqueue, [queue, worker, function_name, args])
22+
end
23+
24+
def enqueue(queue, worker, function_name, args, opts) do
25+
apply(Flume.Config.queue_api_module(), :enqueue, [
1726
queue,
1827
worker,
1928
function_name,
2029
args,
2130
opts
22-
)
31+
])
2332
end
2433

25-
def bulk_enqueue(queue, jobs, opts \\ []) do
26-
Manager.bulk_enqueue(namespace(), queue, jobs, opts)
34+
def enqueue_in(queue, time_in_seconds, worker, args) do
35+
apply(Flume.Config.queue_api_module(), :enqueue_in, [queue, time_in_seconds, worker, args])
2736
end
2837

29-
def enqueue_in(
30-
queue,
31-
time_in_seconds,
32-
worker,
33-
function_name \\ :perform,
34-
args,
35-
opts \\ []
36-
) do
37-
Manager.enqueue_in(
38-
namespace(),
38+
def enqueue_in(queue, time_in_seconds, worker, function_name, args) do
39+
apply(Flume.Config.queue_api_module(), :enqueue_in, [
40+
queue,
41+
time_in_seconds,
42+
worker,
43+
function_name,
44+
args
45+
])
46+
end
47+
48+
def enqueue_in(queue, time_in_seconds, worker, function_name, args, opts) do
49+
apply(Flume.Config.queue_api_module(), :enqueue_in, [
3950
queue,
4051
time_in_seconds,
4152
worker,
4253
function_name,
4354
args,
4455
opts
45-
)
56+
])
4657
end
4758

4859
def pause_all(temporary \\ true) do
4960
Config.pipeline_names() |> Enum.map(&pause(&1, temporary))
5061
end
5162

52-
defp namespace, do: Config.namespace()
53-
5463
defdelegate pause(pipeline_name, temporary \\ true), to: Pipeline.Event
5564

5665
defdelegate resume(pipeline_name, temporary \\ true), to: Pipeline.Event

lib/flume/config.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ defmodule Flume.Config do
55
database: 0,
66
host: "127.0.0.1",
77
logger: Flume.DefaultLogger,
8+
mock: false,
89
max_retries: 5,
910
name: Flume,
1011
namespace: "flume",
@@ -92,4 +93,14 @@ defmodule Flume.Config do
9293
def queues, do: Enum.map(pipelines(), & &1.queue)
9394

9495
def pipeline_names, do: Enum.map(pipelines(), & &1.name)
96+
97+
def queue_api_module do
98+
case mock() do
99+
false ->
100+
Flume.Queue.DefaultAPI
101+
102+
true ->
103+
Flume.Queue.MockAPI
104+
end
105+
end
95106
end

lib/flume/mock.ex

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
defmodule Flume.Mock do
2+
defmacro __using__(_) do
3+
quote do
4+
setup _mock do
5+
Application.put_env(:flume, :mock, true)
6+
7+
on_exit(fn ->
8+
Application.put_env(:flume, :mock, false)
9+
end)
10+
11+
:ok
12+
end
13+
end
14+
end
15+
16+
defmacro with_flume_mock(do: yield) do
17+
quote do
18+
Application.put_env(:flume, :mock, true)
19+
20+
on_exit(fn ->
21+
Application.put_env(:flume, :mock, false)
22+
end)
23+
24+
unquote(yield)
25+
end
26+
end
27+
end

lib/flume/queue/api.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
defmodule Flume.Queue.API do
2+
@callback bulk_enqueue(String.t(), [any()], [any()]) :: {:ok, term} | {:error, String.t()}
3+
4+
@callback enqueue(String.t(), Atom.t(), Atom.t(), [any()], [any()]) ::
5+
{:ok, term} | {:error, String.t()}
6+
7+
@callback enqueue_in(String.t(), integer, Atom.t(), Atom.t(), [any()], [any()]) ::
8+
{:ok, term} | {:error, String.t()}
9+
end

lib/flume/queue/default_api.ex

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
defmodule Flume.Queue.DefaultAPI do
2+
@behaviour Flume.Queue.API
3+
4+
alias Flume.Config
5+
alias Flume.Queue.Manager
6+
7+
def bulk_enqueue(queue, jobs, opts \\ []) do
8+
Manager.bulk_enqueue(namespace(), queue, jobs, opts)
9+
end
10+
11+
def enqueue(
12+
queue,
13+
worker,
14+
function_name \\ :perform,
15+
args,
16+
opts \\ []
17+
) do
18+
Manager.enqueue(
19+
namespace(),
20+
queue,
21+
worker,
22+
function_name,
23+
args,
24+
opts
25+
)
26+
end
27+
28+
def enqueue_in(
29+
queue,
30+
time_in_seconds,
31+
worker,
32+
function_name \\ :perform,
33+
args,
34+
opts \\ []
35+
) do
36+
Manager.enqueue_in(
37+
namespace(),
38+
queue,
39+
time_in_seconds,
40+
worker,
41+
function_name,
42+
args,
43+
opts
44+
)
45+
end
46+
47+
defp namespace, do: Config.namespace()
48+
end

0 commit comments

Comments
 (0)