Skip to content

Commit 6fad1e8

Browse files
committed
feat(zebra): implement data deletion worker
1 parent 7f6cf6e commit 6fad1e8

File tree

7 files changed

+141
-0
lines changed

7 files changed

+141
-0
lines changed

zebra/config/config.exs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ config :zebra, Zebra.Workers.TaskFinisher, timeout: 10_000
2424
config :zebra, Zebra.Workers.Dispatcher, timeout: 1_000
2525
config :zebra, Zebra.Workers.Monitor, timeout: 60_000
2626

27+
config :zebra, Zebra.Workers.JobDeletionPolicyWorker,
28+
naptime: 1_000, # 1 second
29+
longnaptime: 3_600_000, # 1 hour
30+
limit: 100
31+
2732
config :zebra, Zebra.Workers.Scheduler,
2833
cooldown_period: 1_000,
2934
batch_size: 3

zebra/lib/zebra/models/job.ex

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,49 @@ defmodule Zebra.Models.Job do
356356
)
357357
end
358358

359+
def delete_old_job_stop_requests(limit) do
360+
import Ecto.Query,
361+
only: [from: 2, where: 3, subquery: 1, limit: 2, order_by: 2]
362+
363+
jobs_subquery =
364+
from(j in Zebra.Models.Job,
365+
where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"),
366+
order_by: [asc: j.created_at],
367+
limit: ^limit,
368+
select: j.id
369+
)
370+
371+
query =
372+
from(jsr in Zebra.Models.JobStopRequest,
373+
where: jsr.job_id in subquery(jobs_subquery)
374+
)
375+
376+
{deleted_count, _} = Zebra.LegacyRepo.delete_all(query)
377+
378+
{:ok, deleted_count}
379+
end
380+
381+
def delete_old_jobs(limit) do
382+
import Ecto.Query, only: [from: 2]
383+
384+
jobs_subquery =
385+
from(j in Zebra.Models.Job,
386+
where: not is_nil(j.expires_at) and j.expires_at <= fragment("CURRENT_TIMESTAMP"),
387+
order_by: [asc: j.created_at],
388+
limit: ^limit,
389+
select: j.id
390+
)
391+
392+
query =
393+
from(j in Zebra.Models.Job,
394+
where: j.id in subquery(jobs_subquery)
395+
)
396+
397+
{deleted_count, _} = Zebra.LegacyRepo.delete_all(query)
398+
399+
{:ok, deleted_count}
400+
end
401+
359402
def wait_for_agent(job) do
360403
if valid_transition?(job.aasm_state, state_waiting_for_agent()) do
361404
update(job, %{aasm_state: state_waiting_for_agent()})

zebra/lib/zebra/workers.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
defmodule Zebra.Workers do
22
@all [
3+
%{name: Zebra.Workers.JobDeletionPolicyWorker, flag: "START_JOB_DELETION_POLICY_WORKER"},
34
%{name: Zebra.Workers.JobStartedCallbackWorker, flag: "START_JOB_STARTED_CALLBACK_WORKER"},
45
%{name: Zebra.Workers.JobFinishedCallbackWorker, flag: "START_JOB_FINISHED_CALLBACK_WORKER"},
56
%{name: Zebra.Workers.JobTeardownCallbackWorker, flag: "START_JOB_TEARDOWN_CALLBACK_WORKER"},
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
defmodule Zebra.Workers.JobDeletionPolicyWorker do
2+
require Logger
3+
4+
@self_hosted_prefix "s1-%"
5+
6+
defstruct [
7+
# period of sleep between worker ticks
8+
:naptime,
9+
# longer period of sleep when there is nothing to delete
10+
:longnaptime,
11+
# limit for deletions per batch
12+
:limit
13+
]
14+
15+
def start_link(worker) do
16+
pid =
17+
spawn_link(fn ->
18+
loop(worker)
19+
end)
20+
21+
{:ok, pid}
22+
end
23+
24+
def loop(worker) do
25+
# Perform a tick (cleaning operation)
26+
deleted_any? = Task.async(fn -> tick(worker) end) |> Task.await(:infinity)
27+
28+
sleep_for =
29+
if deleted_any? do
30+
worker.naptime
31+
else
32+
worker.longnaptime || worker.naptime
33+
end
34+
35+
:timer.sleep(sleep_for)
36+
37+
# Recursively call loop to continue periodic execution
38+
loop(worker)
39+
end
40+
41+
def tick(worker) do
42+
Logger.info("Starting cleanup tick...")
43+
44+
limit = worker.limit
45+
46+
{:ok, deleted_stop_requests} = Zebra.Models.Job.delete_old_job_stop_requests(limit)
47+
{:ok, deleted_jobs} = Zebra.Models.Job.delete_old_jobs(limit)
48+
49+
total_deleted = deleted_stop_requests + deleted_jobs
50+
51+
if total_deleted == 0 do
52+
Logger.info("No jobs found for deletion.")
53+
false
54+
else
55+
Logger.info("Deleted #{deleted_stop_requests} job stop requests and #{deleted_jobs} jobs.")
56+
true
57+
end
58+
end
59+
end
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
defmodule Zebra.LegacyRepo.Migrations.AddExpiresAtToJobs do
2+
use Ecto.Migration
3+
4+
def change do
5+
alter table(:jobs) do
6+
add :expires_at, :utc_datetime
7+
end
8+
end
9+
end
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
defmodule Zebra.LegacyRepo.Migrations.AddExpiresCreatedIndexAtJobsTable do
2+
use Ecto.Migration
3+
@disable_ddl_transaction true
4+
5+
def change do
6+
create index(:jobs, [:expires_at, :created_at],
7+
name: "index_jobs_on_expires_created_not_null",
8+
concurrently: true,
9+
where: "expires_at IS NOT NULL"
10+
)
11+
end
12+
end
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
defmodule Zebra.LegacyRepo.Migrations.AddOrganizationCreatedIndexAtJobsTable do
2+
use Ecto.Migration
3+
@disable_ddl_transaction true
4+
5+
def change do
6+
create index(:jobs, [:organization_id, :created_at],
7+
name: "index_jobs_on_organization_created_expires_is_null",
8+
concurrently: true,
9+
where: "expires_at IS NULL"
10+
)
11+
end
12+
end

0 commit comments

Comments
 (0)