Skip to content

Commit fe58bad

Browse files
committed
feat(zebra): implement data deletion worker
1 parent 0f2977d commit fe58bad

File tree

7 files changed

+147
-0
lines changed

7 files changed

+147
-0
lines changed

zebra/config/config.exs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ config :zebra, Zebra.Workers.TaskFinisher, timeout: 10_000
2424
config :zebra, Zebra.Workers.Dispatcher, timeout: 1_000
2525
config :zebra, Zebra.Workers.Monitor, timeout: 60_000
2626

27+
config :zebra, Zebra.Workers.JobDeletionPolicyWorker,
28+
naptime: 1_000, # 1 second
29+
longnaptime: 3_600_000, # 1 hour
30+
limit: 100
31+
2732
config :zebra, Zebra.Workers.Scheduler,
2833
cooldown_period: 1_000,
2934
batch_size: 3

zebra/lib/zebra/models/job.ex

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ defmodule Zebra.Models.Job do
7979
field(:scheduled_at, :utc_datetime)
8080
field(:started_at, :utc_datetime)
8181
field(:finished_at, :utc_datetime)
82+
field(:expires_at, :utc_datetime)
8283
end
8384

8485
def create(params) do
@@ -356,6 +357,49 @@ defmodule Zebra.Models.Job do
356357
)
357358
end
358359

360+
def delete_old_job_stop_requests(limit) do
361+
import Ecto.Query,
362+
only: [from: 2, where: 3, subquery: 1, limit: 2, order_by: 2]
363+
364+
jobs_subquery =
365+
from(j in Zebra.Models.Job,
366+
where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"),
367+
order_by: [asc: j.created_at],
368+
limit: ^limit,
369+
select: j.id
370+
)
371+
372+
query =
373+
from(jsr in Zebra.Models.JobStopRequest,
374+
where: jsr.job_id in subquery(jobs_subquery)
375+
)
376+
377+
{deleted_count, _} = Zebra.LegacyRepo.delete_all(query)
378+
379+
{:ok, deleted_count}
380+
end
381+
382+
def delete_old_jobs(limit) do
383+
import Ecto.Query, only: [from: 2]
384+
385+
jobs_subquery =
386+
from(j in Zebra.Models.Job,
387+
where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"),
388+
order_by: [asc: j.created_at],
389+
limit: ^limit,
390+
select: j.id
391+
)
392+
393+
query =
394+
from(j in Zebra.Models.Job,
395+
where: j.id in subquery(jobs_subquery)
396+
)
397+
398+
{deleted_count, _} = Zebra.LegacyRepo.delete_all(query)
399+
400+
{:ok, deleted_count}
401+
end
402+
359403
def wait_for_agent(job) do
360404
if valid_transition?(job.aasm_state, state_waiting_for_agent()) do
361405
update(job, %{aasm_state: state_waiting_for_agent()})

zebra/lib/zebra/workers.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
defmodule Zebra.Workers do
22
@all [
3+
%{name: Zebra.Workers.JobDeletionPolicyWorker, flag: "START_JOB_DELETION_POLICY_WORKER"},
34
%{name: Zebra.Workers.JobStartedCallbackWorker, flag: "START_JOB_STARTED_CALLBACK_WORKER"},
45
%{name: Zebra.Workers.JobFinishedCallbackWorker, flag: "START_JOB_FINISHED_CALLBACK_WORKER"},
56
%{name: Zebra.Workers.JobTeardownCallbackWorker, flag: "START_JOB_TEARDOWN_CALLBACK_WORKER"},
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
defmodule Zebra.Workers.JobDeletionPolicyWorker do
2+
require Logger
3+
4+
@self_hosted_prefix "s1-%"
5+
6+
defstruct [
7+
# period of sleep between worker ticks
8+
:naptime,
9+
# longer period of sleep when there is nothing to delete
10+
:longnaptime,
11+
# limit for deletions per batch
12+
:limit
13+
]
14+
15+
def start_link do
16+
worker_config = Application.fetch_env!(:zebra, __MODULE__)
17+
worker = struct(__MODULE__, worker_config)
18+
19+
pid =
20+
spawn_link(fn ->
21+
loop(worker)
22+
end)
23+
24+
{:ok, pid}
25+
end
26+
27+
def loop(worker) do
28+
# Perform a tick (cleaning operation)
29+
deleted_any? = Task.async(fn -> tick(worker) end) |> Task.await(:infinity)
30+
31+
sleep_for =
32+
if deleted_any? do
33+
worker.naptime
34+
else
35+
worker.longnaptime || worker.naptime
36+
end
37+
38+
:timer.sleep(sleep_for)
39+
40+
# Recursively call loop to continue periodic execution
41+
loop(worker)
42+
end
43+
44+
def tick(worker) do
45+
Logger.info("Starting cleanup tick...")
46+
47+
limit = worker.limit
48+
49+
{:ok, deleted_stop_requests} = Zebra.Models.Job.delete_old_job_stop_requests(limit)
50+
{:ok, deleted_jobs} = Zebra.Models.Job.delete_old_jobs(limit)
51+
52+
total_deleted = deleted_stop_requests + deleted_jobs
53+
54+
if total_deleted == 0 do
55+
Logger.info("No jobs found for deletion.")
56+
false
57+
else
58+
Logger.info("Deleted #{deleted_stop_requests} job stop requests and #{deleted_jobs} jobs.")
59+
true
60+
end
61+
end
62+
end
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
defmodule Zebra.LegacyRepo.Migrations.AddExpiresAtToJobs do
2+
use Ecto.Migration
3+
4+
def change do
5+
alter table(:jobs) do
6+
add :expires_at, :utc_datetime
7+
end
8+
end
9+
end
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
defmodule Zebra.LegacyRepo.Migrations.AddExpiresCreatedIndexAtJobsTable do
2+
use Ecto.Migration
3+
@disable_migration_lock true
4+
@disable_ddl_transaction true
5+
6+
def change do
7+
create index(:jobs, [:expires_at, :created_at],
8+
name: "index_jobs_on_expires_created_not_null",
9+
concurrently: true,
10+
where: "expires_at IS NOT NULL"
11+
)
12+
end
13+
end
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
defmodule Zebra.LegacyRepo.Migrations.AddOrganizationCreatedIndexAtJobsTable do
2+
use Ecto.Migration
3+
@disable_migration_lock true
4+
@disable_ddl_transaction true
5+
6+
def change do
7+
create index(:jobs, [:organization_id, :created_at],
8+
name: "index_jobs_on_organization_created_expires_is_null",
9+
concurrently: true,
10+
where: "expires_at IS NULL"
11+
)
12+
end
13+
end

0 commit comments

Comments
 (0)