Skip to content

Commit 8dbc914

Browse files
committed
Cron-like recurring jobs
1 parent 7b120f1 commit 8dbc914

File tree

17 files changed

+479
-96
lines changed

17 files changed

+479
-96
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ features = ["serde", "clock"]
102102
version = "4.5.21"
103103
features = ["derive"]
104104

105+
# Cron expressions
106+
[workspace.dependencies.cron]
107+
version = "0.13.0"
108+
105109
# Elliptic curve cryptography
106110
[workspace.dependencies.elliptic-curve]
107111
version = "0.13.8"
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
-- Copyright 2024 New Vector Ltd.
2+
--
3+
-- SPDX-License-Identifier: AGPL-3.0-only
4+
-- Please see LICENSE in the repository root for full details.
5+
6+
-- Add a table to track the state of scheduled recurring jobs.
7+
CREATE TABLE queue_schedules (
8+
-- A unique name for the schedule
9+
schedule_name TEXT PRIMARY KEY,
10+
11+
-- The cron expression to use to schedule the job. This is there just for
12+
-- convenience, as this is defined by the backend
13+
schedule_expression TEXT NOT NULL,
14+
15+
-- The last time the job was scheduled. If NULL, it means that the job was
16+
-- never scheduled.
17+
last_scheduled_at TIMESTAMP WITH TIME ZONE,
18+
19+
-- The job that was scheduled last time. If NULL, it means that either the
20+
-- job was never scheduled, or the job cleaned up from the database
21+
last_scheduled_job_id UUID
22+
REFERENCES queue_jobs (queue_job_id)
23+
);
24+
25+
-- When a job is scheduled from a recurreing schedule, we keep a column
26+
-- referencing the name of the schedule
27+
ALTER TABLE queue_jobs
28+
ADD COLUMN schedule_name TEXT
29+
REFERENCES queue_schedules (schedule_name);

crates/storage-pg/src/queue/job.rs

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ use mas_storage::{
1212
queue::{Job, QueueJobRepository, Worker},
1313
Clock,
1414
};
15+
use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
1516
use rand::RngCore;
1617
use sqlx::PgConnection;
18+
use tracing::Instrument;
1719
use ulid::Ulid;
1820
use uuid::Uuid;
1921

@@ -137,6 +139,7 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
137139
payload: serde_json::Value,
138140
metadata: serde_json::Value,
139141
scheduled_at: DateTime<Utc>,
142+
schedule_name: Option<&str>,
140143
) -> Result<(), Self::Error> {
141144
let created_at = clock.now();
142145
let id = Ulid::from_datetime_with_source(created_at.into(), rng);
@@ -145,20 +148,47 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
145148
sqlx::query!(
146149
r#"
147150
INSERT INTO queue_jobs
148-
(queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, status)
149-
VALUES ($1, $2, $3, $4, $5, $6, 'scheduled')
151+
(queue_job_id, queue_name, payload, metadata, created_at, scheduled_at, schedule_name, status)
152+
VALUES ($1, $2, $3, $4, $5, $6, $7, 'scheduled')
150153
"#,
151154
Uuid::from(id),
152155
queue_name,
153156
payload,
154157
metadata,
155158
created_at,
156159
scheduled_at,
160+
schedule_name,
157161
)
158162
.traced()
159163
.execute(&mut *self.conn)
160164
.await?;
161165

166+
// If there was a schedule name supplied, update the queue_schedules table
167+
if let Some(schedule_name) = schedule_name {
168+
let span = tracing::info_span!(
169+
"db.queue_job.schedule_later.update_schedules",
170+
{ DB_QUERY_TEXT } = tracing::field::Empty,
171+
);
172+
173+
let res = sqlx::query!(
174+
r#"
175+
UPDATE queue_schedules
176+
SET last_scheduled_at = $1,
177+
last_scheduled_job_id = $2
178+
WHERE schedule_name = $3
179+
"#,
180+
scheduled_at,
181+
Uuid::from(id),
182+
schedule_name,
183+
)
184+
.record(&span)
185+
.execute(&mut *self.conn)
186+
.instrument(span)
187+
.await?;
188+
189+
DatabaseError::ensure_affected_rows(&res, 1)?;
190+
}
191+
162192
Ok(())
163193
}
164194

@@ -315,14 +345,19 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
315345
let scheduled_at = now + delay;
316346
let new_id = Ulid::from_datetime_with_source(now.into(), rng);
317347

348+
let span = tracing::info_span!(
349+
"db.queue_job.retry.insert_job",
350+
{ DB_QUERY_TEXT } = tracing::field::Empty
351+
);
318352
// Create a new job with the same payload and metadata, but a new ID and
319353
// increment the attempt
320354
// We make sure we do this only for 'failed' jobs
321355
let res = sqlx::query!(
322356
r#"
323357
INSERT INTO queue_jobs
324-
(queue_job_id, queue_name, payload, metadata, created_at, attempt, scheduled_at, status)
325-
SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, 'scheduled'
358+
(queue_job_id, queue_name, payload, metadata, created_at,
359+
attempt, scheduled_at, schedule_name, status)
360+
SELECT $1, queue_name, payload, metadata, $2, attempt + 1, $3, schedule_name, 'scheduled'
326361
FROM queue_jobs
327362
WHERE queue_job_id = $4
328363
AND status = 'failed'
@@ -332,13 +367,39 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
332367
scheduled_at,
333368
Uuid::from(id),
334369
)
335-
.traced()
370+
.record(&span)
336371
.execute(&mut *self.conn)
372+
.instrument(span)
337373
.await?;
338374

339375
DatabaseError::ensure_affected_rows(&res, 1)?;
340376

377+
// If that job was referenced by a schedule, update the schedule
378+
let span = tracing::info_span!(
379+
"db.queue_job.retry.update_schedule",
380+
{ DB_QUERY_TEXT } = tracing::field::Empty
381+
);
382+
sqlx::query!(
383+
r#"
384+
UPDATE queue_schedules
385+
SET last_scheduled_at = $1,
386+
last_scheduled_job_id = $2
387+
WHERE last_scheduled_job_id = $3
388+
"#,
389+
scheduled_at,
390+
Uuid::from(new_id),
391+
Uuid::from(id),
392+
)
393+
.record(&span)
394+
.execute(&mut *self.conn)
395+
.instrument(span)
396+
.await?;
397+
341398
// Update the old job to point to the new attempt
399+
let span = tracing::info_span!(
400+
"db.queue_job.retry.update_old_job",
401+
{ DB_QUERY_TEXT } = tracing::field::Empty
402+
);
342403
let res = sqlx::query!(
343404
r#"
344405
UPDATE queue_jobs
@@ -348,8 +409,9 @@ impl QueueJobRepository for PgQueueJobRepository<'_> {
348409
Uuid::from(new_id),
349410
Uuid::from(id),
350411
)
351-
.traced()
412+
.record(&span)
352413
.execute(&mut *self.conn)
414+
.instrument(span)
353415
.await?;
354416

355417
DatabaseError::ensure_affected_rows(&res, 1)?;

crates/storage-pg/src/queue/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@
66
//! A module containing the PostgreSQL implementation of the job queue
77
88
pub mod job;
9+
pub mod schedule;
910
pub mod worker;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2024 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
//! A module containing the PostgreSQL implementation of the
7+
//! [`QueueScheduleRepository`].
8+
9+
use async_trait::async_trait;
10+
use chrono::{DateTime, Utc};
11+
use mas_storage::queue::{QueueScheduleRepository, Schedule, ScheduleStatus};
12+
use sqlx::PgConnection;
13+
14+
use crate::{DatabaseError, ExecuteExt};
15+
16+
/// An implementation of [`QueueScheduleRepository`] for a PostgreSQL
17+
/// connection.
18+
pub struct PgQueueScheduleRepository<'c> {
19+
conn: &'c mut PgConnection,
20+
}
21+
22+
impl<'c> PgQueueScheduleRepository<'c> {
23+
/// Create a new [`PgQueueScheduleRepository`] from an active PostgreSQL
24+
/// connection.
25+
#[must_use]
26+
pub fn new(conn: &'c mut PgConnection) -> Self {
27+
Self { conn }
28+
}
29+
}
30+
31+
struct ScheduleLookup {
32+
schedule_name: String,
33+
last_scheduled_at: Option<DateTime<Utc>>,
34+
last_scheduled_job_completed: Option<bool>,
35+
}
36+
37+
impl From<ScheduleLookup> for ScheduleStatus {
38+
fn from(value: ScheduleLookup) -> Self {
39+
ScheduleStatus {
40+
schedule_name: value.schedule_name,
41+
last_scheduled_at: value.last_scheduled_at,
42+
last_scheduled_job_completed: value.last_scheduled_job_completed,
43+
}
44+
}
45+
}
46+
47+
#[async_trait]
48+
impl<'c> QueueScheduleRepository for PgQueueScheduleRepository<'c> {
49+
type Error = DatabaseError;
50+
51+
async fn setup(&mut self, schedules: &[(&'static str, Schedule)]) -> Result<(), Self::Error> {
52+
sqlx::query!(
53+
r#"
54+
INSERT INTO queue_schedules (schedule_name, schedule_expression)
55+
SELECT * FROM UNNEST($1::text[], $2::text[]) AS t (schedule_name, schedule_expression)
56+
ON CONFLICT (schedule_name) DO UPDATE
57+
SET schedule_expression = EXCLUDED.schedule_expression
58+
"#,
59+
&schedules.iter().map(|(name, _)| (*name).to_owned()).collect::<Vec<_>>(),
60+
&schedules
61+
.iter()
62+
.map(|(_, schedule)| schedule.source().to_owned())
63+
.collect::<Vec<_>>()
64+
)
65+
.traced()
66+
.execute(&mut *self.conn)
67+
.await?;
68+
69+
Ok(())
70+
}
71+
72+
async fn list(&mut self) -> Result<Vec<ScheduleStatus>, Self::Error> {
73+
let res = sqlx::query_as!(
74+
ScheduleLookup,
75+
r#"
76+
SELECT
77+
queue_schedules.schedule_name as "schedule_name!",
78+
queue_schedules.last_scheduled_at,
79+
queue_jobs.status IN ('completed', 'failed') as last_scheduled_job_completed
80+
FROM queue_schedules
81+
LEFT JOIN queue_jobs
82+
ON queue_jobs.queue_job_id = queue_schedules.last_scheduled_job_id
83+
"#
84+
)
85+
.traced()
86+
.fetch_all(&mut *self.conn)
87+
.await?;
88+
89+
Ok(res.into_iter().map(Into::into).collect())
90+
}
91+
}

crates/storage-pg/src/repository.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use mas_storage::{
1717
OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository,
1818
OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository,
1919
},
20+
queue::{QueueJobRepository, QueueScheduleRepository, QueueWorkerRepository},
2021
upstream_oauth2::{
2122
UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository,
2223
UpstreamOAuthSessionRepository,
@@ -38,7 +39,10 @@ use crate::{
3839
PgOAuth2ClientRepository, PgOAuth2DeviceCodeGrantRepository,
3940
PgOAuth2RefreshTokenRepository, PgOAuth2SessionRepository,
4041
},
41-
queue::{job::PgQueueJobRepository, worker::PgQueueWorkerRepository},
42+
queue::{
43+
job::PgQueueJobRepository, schedule::PgQueueScheduleRepository,
44+
worker::PgQueueWorkerRepository,
45+
},
4246
upstream_oauth2::{
4347
PgUpstreamOAuthLinkRepository, PgUpstreamOAuthProviderRepository,
4448
PgUpstreamOAuthSessionRepository,
@@ -259,15 +263,17 @@ where
259263
Box::new(PgCompatRefreshTokenRepository::new(self.conn.as_mut()))
260264
}
261265

262-
fn queue_worker<'c>(
263-
&'c mut self,
264-
) -> Box<dyn mas_storage::queue::QueueWorkerRepository<Error = Self::Error> + 'c> {
266+
fn queue_worker<'c>(&'c mut self) -> Box<dyn QueueWorkerRepository<Error = Self::Error> + 'c> {
265267
Box::new(PgQueueWorkerRepository::new(self.conn.as_mut()))
266268
}
267269

268-
fn queue_job<'c>(
269-
&'c mut self,
270-
) -> Box<dyn mas_storage::queue::QueueJobRepository<Error = Self::Error> + 'c> {
270+
fn queue_job<'c>(&'c mut self) -> Box<dyn QueueJobRepository<Error = Self::Error> + 'c> {
271271
Box::new(PgQueueJobRepository::new(self.conn.as_mut()))
272272
}
273+
274+
fn queue_schedule<'c>(
275+
&'c mut self,
276+
) -> Box<dyn QueueScheduleRepository<Error = Self::Error> + 'c> {
277+
Box::new(PgQueueScheduleRepository::new(self.conn.as_mut()))
278+
}
273279
}

crates/storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ workspace = true
1414
[dependencies]
1515
async-trait.workspace = true
1616
chrono.workspace = true
17+
cron.workspace = true
1718
futures-util.workspace = true
1819
opentelemetry.workspace = true
1920
rand_core = "0.6.4"

0 commit comments

Comments
 (0)