Skip to content

Commit 7b819f9

Browse files
authored
Merge pull request #37 from annatel/addCleaningQueuetopiaImprovement
add job_cleaner_initial_delay to prevent deadLock
2 parents df1975f + caa76ee commit 7b819f9

File tree

6 files changed

+119
-67
lines changed

6 files changed

+119
-67
lines changed

README.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,35 @@ defmodule MyApp.MailQueuetopia do
9090
use Queuetopia,
9191
otp_app: :my_app,
9292
performer: MyApp.MailQueuetopia.Performer,
93-
repo: MyApp.Repo
93+
repo: MyApp.Repo,
94+
cleanup_interval: {1, :day},
95+
job_retention: {7, :day},
96+
job_cleaner_max_initial_delay: 100
9497
end
9598
```
99+
#### Job Cleanup Configuration
100+
101+
Queuetopia provides automatic cleanup of completed jobs with the following options:
102+
103+
- **`cleanup_interval`** *(optional)* - Defines how often the job cleaner runs. Must be a tuple like `{1, :day}`, `{2, :hour}`, `{30, :minute}`, etc. If not set, job cleanup is **disabled** and completed jobs will remain in the database indefinitely.
104+
105+
- **`job_retention`** *(optional)* - Defines how long completed jobs are kept before being deleted. Defaults to `{7, :day}` (7 days). Must be a tuple specifying the duration.
106+
107+
- **`job_cleaner_max_initial_delay`** *(optional)* - Maximum delay in milliseconds before the first cleanup runs when the JobCleaner starts. A random delay between 0 and this value is used to prevent multiple nodes from running cleanup simultaneously. If set to 0, cleanup runs immediately on startup. Defaults to a reasonable value to distribute cleanup across nodes.
108+
109+
**Examples:**
110+
```elixir
111+
# Cleanup every hour, keep jobs for 3 days, start immediately
112+
cleanup_interval: {1, :hour},
113+
job_retention: {3, :day},
114+
job_cleaner_max_initial_delay: 0
115+
116+
# Cleanup twice daily, keep jobs for 2 weeks, random startup delay up to 5 minutes
117+
cleanup_interval: {12, :hour},
118+
job_retention: {14, :day},
119+
job_cleaner_max_initial_delay: 300_000
120+
```
121+
96122
A Queuetopia expects a performer to exist.
97123
For example, the performer can be implemented like this:
98124

lib/job_cleaner.ex

Lines changed: 0 additions & 46 deletions
This file was deleted.

lib/queuetopia.ex

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,17 @@ defmodule Queuetopia do
5656
@repo Keyword.fetch!(opts, :repo)
5757
@performer Keyword.fetch!(opts, :performer) |> to_string()
5858
@scope __MODULE__ |> to_string()
59-
@cleanup_interval Keyword.get(opts, :cleanup_interval, nil)
59+
@cleanup_interval Keyword.get(opts, :cleanup_interval)
6060
@job_retention Keyword.get(opts, :job_retention, {7, :day})
61+
@job_cleaner_max_initial_delay Keyword.get(opts, :job_cleaner_max_initial_delay, 1000)
6162
@default_poll_interval 60 * 1_000
6263

6364
defp config(otp_app, queue) when is_atom(otp_app) and is_atom(queue) do
6465
config = Application.get_env(otp_app, queue, [])
6566
[otp_app: otp_app] ++ config
6667
end
6768

68-
defp to_ms({duration, unit}) do
69+
defp to_ms({duration, unit}) when is_integer(duration) and is_atom(unit) do
6970
timestamp = DateTime.utc_now()
7071

7172
timestamp
@@ -86,17 +87,22 @@ defmodule Queuetopia do
8687
Keyword.get(config, :poll_interval) ||
8788
@default_poll_interval
8889

89-
cleanup_interval = Keyword.get(config, :cleanup_interval) || @cleanup_interval
90+
cleanup_interval = Keyword.get(opts, :cleanup_interval) || @cleanup_interval
91+
9092
cleanup_interval_ms = cleanup_interval && to_ms(cleanup_interval)
9193

94+
job_cleaner_max_initial_delay =
95+
Keyword.get(opts, :job_cleaner_max_initial_delay) || @job_cleaner_max_initial_delay
96+
9297
disable? = Keyword.get(config, :disable?, false)
9398

9499
opts = [
95100
repo: @repo,
96101
poll_interval: poll_interval,
97102
number_of_concurrent_jobs: Keyword.get(config, :number_of_concurrent_jobs),
98103
cleanup_interval: cleanup_interval_ms,
99-
job_retention: Keyword.get(config, :job_retention) || @job_retention
104+
job_retention: @job_retention,
105+
job_cleaner_max_initial_delay: job_cleaner_max_initial_delay
100106
]
101107

102108
if disable?, do: :ignore, else: Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
@@ -146,7 +152,8 @@ defmodule Queuetopia do
146152
repo: Keyword.fetch!(args, :repo),
147153
scope: @scope,
148154
cleanup_interval: Keyword.fetch!(args, :cleanup_interval),
149-
job_retention: Keyword.fetch!(args, :job_retention)
155+
job_retention: Keyword.fetch!(args, :job_retention),
156+
job_cleaner_max_initial_delay: Keyword.fetch!(args, :job_cleaner_max_initial_delay)
150157
]}
151158
end
152159

lib/queuetopia/job_cleaner.ex

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
defmodule Queuetopia.JobCleaner do
2+
@moduledoc """
3+
Removes completed jobs from the queue periodically.
4+
5+
This GenServer runs in the background and cleans up
6+
old completed jobs based on the configured interval.
7+
"""
8+
9+
use GenServer
10+
alias Queuetopia.Queue
11+
12+
def start_link(opts) do
13+
GenServer.start_link(__MODULE__, opts, name: opts[:name])
14+
end
15+
16+
@impl true
17+
def init(opts) do
18+
# Add random dalay to mitigate deadlocks on starts
19+
job_cleaner_initial_delay =
20+
opts |> Keyword.fetch!(:job_cleaner_max_initial_delay) |> random_delay()
21+
22+
Process.send_after(self(), :cleanup, job_cleaner_initial_delay)
23+
24+
state = opts |> Keyword.take([:repo, :scope, :cleanup_interval, :job_retention]) |> Map.new()
25+
26+
{:ok, state}
27+
end
28+
29+
@impl true
30+
def handle_info(:cleanup, state) do
31+
%{
32+
repo: repo,
33+
scope: scope,
34+
cleanup_interval: cleanup_interval,
35+
job_retention: job_retention
36+
} = state
37+
38+
Queue.cleanup_completed_jobs(repo, scope, job_retention)
39+
40+
Process.send_after(self(), :cleanup, cleanup_interval)
41+
{:noreply, state}
42+
end
43+
44+
defp random_delay(0), do: 0
45+
defp random_delay(max) when max > 0, do: :rand.uniform(max)
46+
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule Queuetopia.MixProject do
22
use Mix.Project
33

44
@source_url "https://github.com/annatel/queuetopia"
5-
@version "2.7.0"
5+
@version "2.7.1"
66

77
def project do
88
[
Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,10 @@ defmodule Queuetopia.JobCleanerTest do
1111
|> DateTime.truncate(:second)
1212
end
1313

14-
setup do
15-
Application.put_env(:queuetopia, TestQueuetopia, cleanup_interval: {50, :millisecond})
16-
17-
on_exit(fn ->
18-
Application.put_env(:queuetopia, TestQueuetopia, [])
19-
end)
20-
21-
:ok
22-
end
23-
2414
test "removes completed jobs older than 7 days retention period during periodic cleanup" do
25-
start_supervised!(TestQueuetopia)
15+
start_supervised!(
16+
{TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0}
17+
)
2618

2719
:timer.sleep(100)
2820

@@ -60,7 +52,10 @@ defmodule Queuetopia.JobCleanerTest do
6052
done_at: datetime_days_ago(8)
6153
)
6254

63-
start_supervised!(TestQueuetopia)
55+
start_supervised!(
56+
{TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0}
57+
)
58+
6459
:timer.sleep(100)
6560

6661
assert is_nil(TestRepo.get(Job, our_eight_days_old_job.id))
@@ -78,9 +73,11 @@ defmodule Queuetopia.JobCleanerTest do
7873

7974
assert TestRepo.get(Job, eight_days_old_completed_job.id)
8075

81-
start_supervised!(TestQueuetopia)
76+
start_supervised!(
77+
{TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0}
78+
)
8279

83-
:timer.sleep(10)
80+
:timer.sleep(50)
8481

8582
assert is_nil(TestRepo.get(Job, eight_days_old_completed_job.id))
8683
end
@@ -97,4 +94,26 @@ defmodule Queuetopia.JobCleanerTest do
9794
assert TestRepo.get(Job, old_job.id)
9895
assert TestRepo.get(Job, recent_job.id)
9996
end
97+
98+
test "respects job_cleaner_max_initial_delay before first cleanup" do
99+
scope = TestQueuetopia.scope()
100+
101+
eight_days_old_job =
102+
insert!(:job,
103+
scope: scope,
104+
done_at: datetime_days_ago(8)
105+
)
106+
107+
start_supervised!(
108+
{TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 20}
109+
)
110+
111+
:timer.sleep(10)
112+
assert TestRepo.get(Job, eight_days_old_job.id), "Job should still exist before initial delay"
113+
114+
:timer.sleep(15)
115+
116+
assert is_nil(TestRepo.get(Job, eight_days_old_job.id)),
117+
"Job should be deleted after initial delay"
118+
end
100119
end

0 commit comments

Comments
 (0)