Skip to content

Commit 40edabf

Browse files
committed
server, bin: purge old already-archived jobs to avoid perpetual database growth
1 parent 4366484 commit 40edabf

File tree

11 files changed

+354
-19
lines changed

11 files changed

+354
-19
lines changed

bin/src/main.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 Oxide Computer Company
2+
* Copyright 2025 Oxide Computer Company
33
*/
44

55
#![allow(clippy::many_single_char_names)]
@@ -2378,6 +2378,25 @@ async fn do_admin_job_archive(mut l: Level<Stuff>) -> Result<()> {
23782378
Ok(())
23792379
}
23802380

2381+
async fn do_admin_job_purge(mut l: Level<Stuff>) -> Result<()> {
2382+
l.usage_args(Some("JOB..."));
2383+
2384+
let a = args!(l);
2385+
if a.args().is_empty() {
2386+
bad_args!(l, "specify a job to purge");
2387+
}
2388+
2389+
for arg in a.args() {
2390+
if let Err(e) =
2391+
l.context().admin().admin_job_purge_request().job(arg).send().await
2392+
{
2393+
bail!("ERROR: purging {}: {:?}", arg, e);
2394+
}
2395+
}
2396+
2397+
Ok(())
2398+
}
2399+
23812400
async fn do_admin_job_list(mut l: Level<Stuff>) -> Result<()> {
23822401
l.add_column("id", WIDTH_ID, true);
23832402
l.add_column("age", WIDTH_AGE, true);
@@ -2507,6 +2526,7 @@ async fn do_admin_job_list(mut l: Level<Stuff>) -> Result<()> {
25072526
async fn do_admin_job(mut l: Level<Stuff>) -> Result<()> {
25082527
l.cmda("list", "ls", "list jobs", cmd!(do_admin_job_list))?;
25092528
l.cmd("archive", "request archive of a job", cmd!(do_admin_job_archive))?;
2529+
l.cmd("purge", "request purge of a job", cmd!(do_admin_job_purge))?;
25102530
l.cmd("dump", "dump information about jobs", cmd!(do_admin_job_dump))?;
25112531

25122532
sel!(l).run().await

client/openapi.json

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"openapi": "3.0.3",
33
"info": {
44
"title": "Buildomat",
5-
"version": "1.0"
5+
"version": "1.0.0"
66
},
77
"paths": {
88
"/0/admin/factories": {
@@ -232,6 +232,32 @@
232232
}
233233
}
234234
},
235+
"/0/admin/jobs/{job}/purge": {
236+
"post": {
237+
"operationId": "admin_job_purge_request",
238+
"parameters": [
239+
{
240+
"in": "path",
241+
"name": "job",
242+
"required": true,
243+
"schema": {
244+
"type": "string"
245+
}
246+
}
247+
],
248+
"responses": {
249+
"204": {
250+
"description": "resource updated"
251+
},
252+
"4XX": {
253+
"$ref": "#/components/responses/Error"
254+
},
255+
"5XX": {
256+
"$ref": "#/components/responses/Error"
257+
}
258+
}
259+
}
260+
},
235261
"/0/admin/target": {
236262
"post": {
237263
"operationId": "target_create",
@@ -2972,6 +2998,16 @@
29722998
"$ref": "#/components/schemas/Task"
29732999
}
29743000
},
3001+
"time_archived": {
3002+
"nullable": true,
3003+
"type": "string",
3004+
"format": "date-time"
3005+
},
3006+
"time_purged": {
3007+
"nullable": true,
3008+
"type": "string",
3009+
"format": "date-time"
3010+
},
29753011
"times": {
29763012
"default": {},
29773013
"type": "object",

server/schema.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,3 +332,11 @@ ALTER TABLE worker ADD COLUMN
332332
-- v 55
333333
ALTER TABLE worker ADD COLUMN
334334
factory_ip TEXT;
335+
336+
-- v 56
337+
ALTER TABLE job ADD COLUMN
338+
time_purged TEXT;
339+
340+
-- v 57
341+
CREATE INDEX job_purging_queue ON job (id, complete, time_archived, time_purged)
342+
WHERE complete = true AND time_archived IS NOT NULL AND time_purged IS NULL;

server/src/api/admin.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 Oxide Computer Company
2+
* Copyright 2025 Oxide Computer Company
33
*/
44

55
use super::prelude::*;
@@ -464,6 +464,40 @@ pub(crate) async fn admin_job_archive_request(
464464
Ok(HttpResponseUpdatedNoContent())
465465
}
466466

467+
#[endpoint {
468+
method = POST,
469+
path = "/0/admin/jobs/{job}/purge",
470+
}]
471+
pub(crate) async fn admin_job_purge_request(
472+
rqctx: RequestContext<Arc<Central>>,
473+
path: TypedPath<JobPath>,
474+
) -> DSResult<HttpResponseUpdatedNoContent> {
475+
let c = rqctx.context();
476+
let log = &rqctx.log;
477+
478+
c.require_admin(log, &rqctx.request, "job.purge").await?;
479+
480+
let id = path.into_inner().job.parse::<db::JobId>().or_500()?;
481+
let job = c.db.job(id).or_500()?;
482+
483+
if !job.is_archived() {
484+
return Err(HttpError::for_bad_request(
485+
None,
486+
"job cannot be purge until archived".into(),
487+
));
488+
}
489+
490+
/*
491+
* If a job is archived, it should already have been complete previously:
492+
*/
493+
assert!(job.complete);
494+
495+
info!(log, "admin: requested purge of job {}", job.id);
496+
c.inner.lock().unwrap().purge_queue.push_back(job.id);
497+
498+
Ok(HttpResponseUpdatedNoContent())
499+
}
500+
467501
#[endpoint {
468502
method = POST,
469503
path = "/0/control/hold",

server/src/api/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 Oxide Computer Company
2+
* Copyright 2025 Oxide Computer Company
33
*/
44

55
mod prelude {
@@ -20,7 +20,7 @@ mod prelude {
2020
pub use rusty_ulid::Ulid;
2121
pub use schemars::JsonSchema;
2222
pub use serde::{Deserialize, Serialize};
23-
pub use slog::{error, info, warn, Logger};
23+
pub use slog::{error, info, o, warn, Logger};
2424
pub use std::collections::HashMap;
2525
pub use std::str::FromStr;
2626
pub use std::sync::Arc;

server/src/api/user.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
/*
2-
* Copyright 2024 Oxide Computer Company
2+
* Copyright 2025 Oxide Computer Company
33
*/
44

5-
use slog::o;
6-
75
use super::prelude::*;
86

97
use super::worker::UploadedChunk;
@@ -608,6 +606,8 @@ pub(crate) fn format_job(
608606
tags,
609607
cancelled: j.cancelled,
610608
times,
609+
time_archived: j.time_archived.map(|d| d.0),
610+
time_purged: j.time_purged.map(|d| d.0),
611611
}
612612
}
613613

@@ -665,7 +665,7 @@ pub(crate) async fn jobs_get_old(
665665
marker = Some(page.last().unwrap().id);
666666

667667
for job in page {
668-
out.push(super::user::Job::load(log, c, &job).await.or_500()?);
668+
out.push(Job::load(log, c, &job).await.or_500()?);
669669
}
670670

671671
/*
@@ -826,6 +826,8 @@ pub(crate) struct Job {
826826
cancelled: bool,
827827
#[serde(default)]
828828
times: HashMap<String, DateTime<Utc>>,
829+
time_archived: Option<DateTime<Utc>>,
830+
time_purged: Option<DateTime<Utc>>,
829831
}
830832

831833
impl Job {

server/src/archive/jobs.rs

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 Oxide Computer Company
2+
* Copyright 2025 Oxide Computer Company
33
*/
44

55
use core::mem::size_of;
@@ -981,10 +981,11 @@ fn archive_jobs_sync_work(
981981
waiting: _,
982982

983983
/*
984-
* This field tracks when the job was successfully archived, and thus
985-
* cannot appear in the archive itself.
984+
* These fields track when the job was successfully archived or purged,
985+
* and thus cannot appear in the archive itself.
986986
*/
987987
time_archived: _,
988+
time_purged: _,
988989

989990
/*
990991
* We use the target_id value we already fetched above, so ignore it
@@ -1001,7 +1002,7 @@ fn archive_jobs_sync_work(
10011002
* This structure should store things that don't grow substantially for
10021003
* longer-running jobs that produce more output. In particular, we store
10031004
* the job event stream in a different way. This is important because we
1004-
* wil need to deserialize this object (at a cost of CPU and memory) in
1005+
* will need to deserialise this object (at a cost of CPU and memory) in
10051006
* order to answer questions about the job: heavier jobs should not use more
10061007
* resources than lighter jobs.
10071008
*/
@@ -1198,3 +1199,100 @@ pub(crate) async fn archive_jobs(log: Logger, c: Arc<Central>) -> Result<()> {
11981199
tokio::time::sleep(delay).await;
11991200
}
12001201
}
1202+
1203+
fn purge_jobs_sync_work(
1204+
log: &Logger,
1205+
c: &Central,
1206+
) -> Result<Option<db::JobId>> {
1207+
let (reason, job) = if let Some(job) =
1208+
c.inner.lock().unwrap().purge_queue.pop_front()
1209+
{
1210+
/*
1211+
* Service explicit requests from the operator to purge a job first.
1212+
*/
1213+
let job = c.db.job(job)?;
1214+
if !job.is_archived() {
1215+
warn!(log, "job {} not archived; cannot purge yet", job.id);
1216+
return Ok(None);
1217+
}
1218+
if job.is_purged() {
1219+
warn!(log, "job {} was already purged; ignoring request", job.id);
1220+
return Ok(None);
1221+
}
1222+
("operator request", job)
1223+
} else if c.config.job.auto_purge {
1224+
/*
1225+
* Otherwise, if auto-purging is enabled, purge the next as-yet
1226+
* unpurged job.
1227+
*/
1228+
if let Some(job) = c.db.job_next_unpurged()? {
1229+
if let Some(time) = job.time_archived {
1230+
if time.age().as_secs() < 14 * 86400 {
1231+
/*
1232+
* Only purge once the job has been archived for at least a
1233+
* fortnight.
1234+
*/
1235+
return Ok(None);
1236+
}
1237+
}
1238+
1239+
("automatic", job)
1240+
} else {
1241+
return Ok(None);
1242+
}
1243+
} else {
1244+
return Ok(None);
1245+
};
1246+
1247+
assert!(job.complete);
1248+
assert!(job.time_archived.is_some());
1249+
assert!(job.time_purged.is_none());
1250+
1251+
info!(log, "purging job {} [{reason}]...", job.id);
1252+
1253+
/*
1254+
* Purging a job from the database involves removing the live records. This
1255+
* should have no impact on access to details about the job, as we will
1256+
* fetch those details from the archive file once the job has been archived.
1257+
*/
1258+
c.db.job_purge(job.id)?;
1259+
1260+
Ok(Some(job.id))
1261+
}
1262+
1263+
async fn purge_jobs_one(log: &Logger, c: &Arc<Central>) -> Result<bool> {
1264+
let start = Instant::now();
1265+
1266+
/*
1267+
* The work to purge a job is synchronous and may take several seconds for
1268+
* larger jobs. Avoid holding up other async tasks while we wait:
1269+
*/
1270+
let id = tokio::task::block_in_place(|| purge_jobs_sync_work(log, c))?;
1271+
1272+
if let Some(id) = id {
1273+
let dur = Instant::now().saturating_duration_since(start);
1274+
info!(log, "job {id} purged"; "duration_msec" => dur.as_millis());
1275+
}
1276+
1277+
Ok(id.is_some())
1278+
}
1279+
1280+
pub(crate) async fn purge_jobs(log: Logger, c: Arc<Central>) -> Result<()> {
1281+
let delay = Duration::from_secs(1);
1282+
let ok_delay = Duration::from_millis(c.config.job.purge_delay_msec);
1283+
1284+
info!(log, "start job purge task");
1285+
1286+
loop {
1287+
match purge_jobs_one(&log, &c).await {
1288+
Ok(true) => {
1289+
tokio::time::sleep(ok_delay).await;
1290+
continue;
1291+
}
1292+
Ok(false) => (),
1293+
Err(e) => error!(log, "job purge task error: {:?}", e),
1294+
}
1295+
1296+
tokio::time::sleep(delay).await;
1297+
}
1298+
}

server/src/config.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 Oxide Computer Company
2+
* Copyright 2025 Oxide Computer Company
33
*/
44

55
use std::path::Path;
@@ -38,6 +38,10 @@ pub struct ConfigFileJob {
3838
pub max_size_per_file_mb: u64,
3939
#[serde(default)]
4040
pub auto_archive: bool,
41+
#[serde(default)]
42+
pub auto_purge: bool,
43+
#[serde(default = "default_purge_delay_msec")]
44+
pub purge_delay_msec: u64,
4145
}
4246

4347
impl ConfigFileJob {
@@ -57,6 +61,15 @@ fn default_max_size_per_file_mb() -> u64 {
5761
1024
5862
}
5963

64+
fn default_purge_delay_msec() -> u64 {
65+
/*
66+
* By default, wait half a second after a successful purge before purging
67+
* another job. When there are a lot of jobs to purge, this can help to
68+
* keep the system responsive to active jobs.
69+
*/
70+
500
71+
}
72+
6073
#[derive(Deserialize, Debug, Default)]
6174
#[serde(deny_unknown_fields)]
6275
pub struct ConfigFileFile {

0 commit comments

Comments
 (0)