Skip to content

Commit c92692b

Browse files
committed
New job queue: worker registration and leader election
1 parent 1871b2c commit c92692b

17 files changed

+639
-4
lines changed

crates/storage-pg/.sqlx/query-12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62.json

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/storage-pg/.sqlx/query-5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35.json

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/storage-pg/.sqlx/query-6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379.json

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/storage-pg/.sqlx/query-966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d.json

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
-- Copyright 2024 New Vector Ltd.
2+
--
3+
-- SPDX-License-Identifier: AGPL-3.0-only
4+
-- Please see LICENSE in the repository root for full details.
5+
6+
-- This table stores informations about worker, mostly to track their health
7+
CREATE TABLE queue_workers (
8+
queue_worker_id UUID NOT NULL PRIMARY KEY,
9+
10+
-- When the worker was registered
11+
registered_at TIMESTAMP WITH TIME ZONE NOT NULL,
12+
13+
-- When the worker was last seen
14+
last_seen_at TIMESTAMP WITH TIME ZONE NOT NULL,
15+
16+
-- When the worker was shut down
17+
shutdown_at TIMESTAMP WITH TIME ZONE
18+
);
19+
20+
-- This single-row table stores the leader of the queue
21+
-- The leader is responsible for running maintenance tasks
22+
CREATE UNLOGGED TABLE queue_leader (
23+
-- This makes the row unique
24+
active BOOLEAN NOT NULL DEFAULT TRUE UNIQUE,
25+
26+
-- When the leader was elected
27+
elected_at TIMESTAMP WITH TIME ZONE NOT NULL,
28+
29+
-- Until when the lease is valid
30+
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
31+
32+
-- The worker ID of the leader
33+
queue_worker_id UUID NOT NULL REFERENCES queue_workers (queue_worker_id),
34+
35+
-- This, combined with the unique constraint, makes sure we only ever have a single row
36+
CONSTRAINT queue_leader_active CHECK (active IS TRUE)
37+
);

crates/storage-pg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ pub mod app_session;
166166
pub mod compat;
167167
pub mod job;
168168
pub mod oauth2;
169+
pub mod queue;
169170
pub mod upstream_oauth2;
170171
pub mod user;
171172

crates/storage-pg/src/queue/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
// Copyright 2024 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
//! A module containing the PostgreSQL implementation of the job queue
7+
8+
pub mod worker;
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
// Copyright 2024 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
//! A module containing the PostgreSQL implementation of the
7+
//! [`QueueWorkerRepository`].
8+
9+
use async_trait::async_trait;
10+
use chrono::Duration;
11+
use mas_storage::{
12+
queue::{QueueWorkerRepository, Worker},
13+
Clock,
14+
};
15+
use rand::RngCore;
16+
use sqlx::PgConnection;
17+
use ulid::Ulid;
18+
use uuid::Uuid;
19+
20+
use crate::{DatabaseError, ExecuteExt};
21+
22+
/// An implementation of [`QueueWorkerRepository`] for a PostgreSQL connection.
23+
pub struct PgQueueWorkerRepository<'c> {
24+
conn: &'c mut PgConnection,
25+
}
26+
27+
impl<'c> PgQueueWorkerRepository<'c> {
28+
/// Create a new [`PgQueueWorkerRepository`] from an active PostgreSQL
29+
/// connection.
30+
#[must_use]
31+
pub fn new(conn: &'c mut PgConnection) -> Self {
32+
Self { conn }
33+
}
34+
}
35+
36+
#[async_trait]
37+
impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {
38+
type Error = DatabaseError;
39+
40+
#[tracing::instrument(
41+
name = "db.queue_worker.register",
42+
skip_all,
43+
fields(
44+
worker.id,
45+
db.query.text,
46+
),
47+
err,
48+
)]
49+
async fn register(
50+
&mut self,
51+
rng: &mut (dyn RngCore + Send),
52+
clock: &dyn Clock,
53+
) -> Result<Worker, Self::Error> {
54+
let now = clock.now();
55+
let worker_id = Ulid::from_datetime_with_source(now.into(), rng);
56+
tracing::Span::current().record("worker.id", tracing::field::display(worker_id));
57+
58+
sqlx::query!(
59+
r#"
60+
INSERT INTO queue_workers (queue_worker_id, registered_at, last_seen_at)
61+
VALUES ($1, $2, $2)
62+
"#,
63+
Uuid::from(worker_id),
64+
now,
65+
)
66+
.traced()
67+
.execute(&mut *self.conn)
68+
.await?;
69+
70+
Ok(Worker { id: worker_id })
71+
}
72+
73+
#[tracing::instrument(
74+
name = "db.queue_worker.heartbeat",
75+
skip_all,
76+
fields(
77+
%worker.id,
78+
db.query.text,
79+
),
80+
err,
81+
)]
82+
async fn heartbeat(
83+
&mut self,
84+
clock: &dyn Clock,
85+
worker: Worker,
86+
) -> Result<Worker, Self::Error> {
87+
let now = clock.now();
88+
let res = sqlx::query!(
89+
r#"
90+
UPDATE queue_workers
91+
SET last_seen_at = $2
92+
WHERE queue_worker_id = $1 AND shutdown_at IS NULL
93+
"#,
94+
Uuid::from(worker.id),
95+
now,
96+
)
97+
.traced()
98+
.execute(&mut *self.conn)
99+
.await?;
100+
101+
// If no row was updated, the worker was shutdown so we return an error
102+
DatabaseError::ensure_affected_rows(&res, 1)?;
103+
104+
Ok(worker)
105+
}
106+
107+
#[tracing::instrument(
108+
name = "db.queue_worker.shutdown",
109+
skip_all,
110+
fields(
111+
%worker.id,
112+
db.query.text,
113+
),
114+
err,
115+
)]
116+
async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error> {
117+
let now = clock.now();
118+
let res = sqlx::query!(
119+
r#"
120+
UPDATE queue_workers
121+
SET shutdown_at = $2
122+
WHERE queue_worker_id = $1
123+
"#,
124+
Uuid::from(worker.id),
125+
now,
126+
)
127+
.traced()
128+
.execute(&mut *self.conn)
129+
.await?;
130+
131+
DatabaseError::ensure_affected_rows(&res, 1)?;
132+
133+
Ok(())
134+
}
135+
136+
#[tracing::instrument(
137+
name = "db.queue_worker.shutdown_dead_workers",
138+
skip_all,
139+
fields(
140+
db.query.text,
141+
),
142+
err,
143+
)]
144+
async fn shutdown_dead_workers(
145+
&mut self,
146+
clock: &dyn Clock,
147+
threshold: Duration,
148+
) -> Result<(), Self::Error> {
149+
let now = clock.now();
150+
sqlx::query!(
151+
r#"
152+
UPDATE queue_workers
153+
SET shutdown_at = $1
154+
WHERE shutdown_at IS NULL
155+
AND last_seen_at < $2
156+
"#,
157+
now,
158+
now - threshold,
159+
)
160+
.traced()
161+
.execute(&mut *self.conn)
162+
.await?;
163+
164+
Ok(())
165+
}
166+
167+
#[tracing::instrument(
168+
name = "db.queue_worker.remove_leader_lease_if_expired",
169+
skip_all,
170+
fields(
171+
db.query.text,
172+
),
173+
err,
174+
)]
175+
async fn remove_leader_lease_if_expired(
176+
&mut self,
177+
clock: &dyn Clock,
178+
) -> Result<(), Self::Error> {
179+
let now = clock.now();
180+
sqlx::query!(
181+
r#"
182+
DELETE FROM queue_leader
183+
WHERE expires_at < $1
184+
"#,
185+
now,
186+
)
187+
.traced()
188+
.execute(&mut *self.conn)
189+
.await?;
190+
191+
Ok(())
192+
}
193+
194+
#[tracing::instrument(
195+
name = "db.queue_worker.try_get_leader_lease",
196+
skip_all,
197+
fields(
198+
%worker.id,
199+
db.query.text,
200+
),
201+
err,
202+
)]
203+
async fn try_get_leader_lease(
204+
&mut self,
205+
clock: &dyn Clock,
206+
worker: &Worker,
207+
) -> Result<bool, Self::Error> {
208+
let now = clock.now();
209+
let ttl = Duration::seconds(5);
210+
// The queue_leader table is meant to only have a single row, which conflicts on
211+
// the `active` column
212+
213+
// If there is a conflict, we update the `expires_at` column ONLY IF the current
214+
// leader is ourselves.
215+
let res = sqlx::query!(
216+
r#"
217+
INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id)
218+
VALUES ($1, $2, $3)
219+
ON CONFLICT (active)
220+
DO UPDATE SET expires_at = EXCLUDED.expires_at
221+
WHERE queue_leader.queue_worker_id = $3
222+
"#,
223+
now,
224+
now + ttl,
225+
Uuid::from(worker.id)
226+
)
227+
.traced()
228+
.execute(&mut *self.conn)
229+
.await?;
230+
231+
// We can then detect whether we are the leader or not by checking how many rows
232+
// were affected by the upsert
233+
let am_i_the_leader = res.rows_affected() == 1;
234+
235+
Ok(am_i_the_leader)
236+
}
237+
}

0 commit comments

Comments
 (0)