Skip to content

Commit c67e942

Browse files
committed
feat(zebra): implement data deletion worker
1 parent 7f6cf6e commit c67e942

File tree

4 files changed

+100
-0
lines changed

4 files changed

+100
-0
lines changed

zebra/lib/zebra/models/job.ex

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,49 @@ defmodule Zebra.Models.Job do
356356
)
357357
end
358358

359+
def delete_old_job_stop_requests(limit) do
360+
import Ecto.Query,
361+
only: [from: 2, where: 3, subquery: 1, limit: 2, order_by: 2]
362+
363+
jobs_subquery =
364+
from(j in Zebra.Models.Job,
365+
where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"),
366+
order_by: [asc: j.created_at],
367+
limit: ^limit,
368+
select: j.id
369+
)
370+
371+
query =
372+
from(jsr in Zebra.Models.JobStopRequest,
373+
where: jsr.job_id in subquery(jobs_subquery)
374+
)
375+
376+
{deleted_count, _} = Zebra.LegacyRepo.delete_all(query)
377+
378+
{:ok, deleted_count}
379+
end
380+
381+
def delete_old_jobs(limit) do
382+
import Ecto.Query, only: [from: 2]
383+
384+
jobs_subquery =
385+
from(j in Zebra.Models.Job,
386+
where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"),
387+
order_by: [asc: j.created_at],
388+
limit: ^limit,
389+
select: j.id
390+
)
391+
392+
query =
393+
from(j in Zebra.Models.Job,
394+
where: j.id in subquery(jobs_subquery)
395+
)
396+
397+
{deleted_count, _} = Zebra.LegacyRepo.delete_all(query)
398+
399+
{:ok, deleted_count}
400+
end
401+
359402
def wait_for_agent(job) do
360403
if valid_transition?(job.aasm_state, state_waiting_for_agent()) do
361404
update(job, %{aasm_state: state_waiting_for_agent()})

zebra/lib/zebra/workers.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
defmodule Zebra.Workers do
22
@all [
3+
%{name: Zebra.Workers.JobDeletionPolicyWorker, flag: "START_JOB_DELETION_POLICY_WORKER"},
34
%{name: Zebra.Workers.JobStartedCallbackWorker, flag: "START_JOB_STARTED_CALLBACK_WORKER"},
45
%{name: Zebra.Workers.JobFinishedCallbackWorker, flag: "START_JOB_FINISHED_CALLBACK_WORKER"},
56
%{name: Zebra.Workers.JobTeardownCallbackWorker, flag: "START_JOB_TEARDOWN_CALLBACK_WORKER"},
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
defmodule Zebra.Workers.JobDeletionPolicyWorker do
2+
require Logger
3+
4+
@self_hosted_prefix "s1-%"
5+
6+
defstruct [
7+
# period of sleep between worker ticks
8+
:naptime,
9+
# limit for deletions per batch
10+
:limit,
11+
]
12+
13+
def start_link(worker) do
14+
pid =
15+
spawn_link(fn ->
16+
loop(worker)
17+
end)
18+
19+
{:ok, pid}
20+
end
21+
22+
def loop(worker) do
23+
# Perform a tick (cleaning operation)
24+
Task.async(fn -> tick(worker) end) |> Task.await(:infinity)
25+
26+
:timer.sleep(worker.naptime)
27+
28+
# Recursively call loop to continue periodic execution
29+
loop(worker)
30+
end
31+
32+
def tick(worker) do
33+
Logger.info("Starting cleanup tick...")
34+
35+
limit = worker.limit
36+
37+
Zebra.Models.Job.delete_old_job_stop_requests(limit)
38+
Zebra.Models.Job.delete_old_jobs(limit)
39+
40+
Logger.info("Cleanup tick completed.")
41+
end
42+
end
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
defmodule Zebra.LegacyRepo.Migrations.AddExpiresAtToJobs do
2+
use Ecto.Migration
3+
4+
def change do
5+
alter table(:jobs) do
6+
add :expires_at, :utc_datetime
7+
end
8+
create index(:jobs, [:expires_at, :created_at],
9+
name: "index_jobs_on_expires_created_not_null",
10+
concurrently: true,
11+
where: "expires_at IS NOT NULL"
12+
)
13+
end
14+
end

0 commit comments

Comments
 (0)