Skip to content

Commit 9de2747

Browse files
committed
Lock jobs before starting processing
1 parent 8d8e0b6 commit 9de2747

File tree

8 files changed

+70
-25
lines changed

8 files changed

+70
-25
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE job DROP COLUMN started_at;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE job ADD COLUMN started_at TIMESTAMPTZ;

rfd-model/src/db.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub struct JobModel {
7272
pub committed_at: DateTime<Utc>,
7373
pub processed: bool,
7474
pub created_at: DateTime<Utc>,
75+
pub started_at: Option<DateTime<Utc>>,
7576
}
7677

7778
#[derive(Debug, Deserialize, Serialize, Queryable, Insertable)]

rfd-model/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ pub struct Job {
145145
pub processed: bool,
146146
#[partial(NewJob(skip))]
147147
pub created_at: DateTime<Utc>,
148+
#[partial(NewJob(skip))]
149+
pub started_at: Option<DateTime<Utc>>,
148150
}
149151

150152
impl From<JobModel> for Job {
@@ -160,6 +162,7 @@ impl From<JobModel> for Job {
160162
committed_at: value.committed_at,
161163
processed: value.processed,
162164
created_at: value.created_at,
165+
started_at: value.started_at,
163166
}
164167
}
165168
}

rfd-model/src/schema.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ diesel::table! {
8888
committed_at -> Timestamptz,
8989
processed -> Bool,
9090
created_at -> Timestamptz,
91+
started_at -> Nullable<Timestamptz>,
9192
}
9293
}
9394

rfd-model/src/storage/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ pub struct JobFilter {
221221
pub id: Option<Vec<i32>>,
222222
pub sha: Option<Vec<String>>,
223223
pub processed: Option<bool>,
224+
pub started: Option<bool>,
224225
}
225226

226227
impl JobFilter {
@@ -238,6 +239,11 @@ impl JobFilter {
238239
self.processed = processed;
239240
self
240241
}
242+
243+
pub fn started(mut self, started: Option<bool>) -> Self {
244+
self.started = started;
245+
self
246+
}
241247
}
242248

243249
#[cfg_attr(feature = "mock", automock)]
@@ -250,6 +256,7 @@ pub trait JobStore {
250256
pagination: &ListPagination,
251257
) -> Result<Vec<Job>, StoreError>;
252258
async fn upsert(&self, new_job: NewJob) -> Result<Job, StoreError>;
259+
async fn start(&self, id: i32) -> Result<Option<Job>, StoreError>;
253260
async fn complete(&self, id: i32) -> Result<Option<Job>, StoreError>;
254261
}
255262

rfd-model/src/storage/postgres.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ impl JobStore for PostgresStore {
430430
) -> Result<Vec<Job>, StoreError> {
431431
let mut query = job::dsl::job.into_boxed();
432432

433-
let JobFilter { id, sha, processed } = filter;
433+
let JobFilter { id, sha, processed, started } = filter;
434434

435435
if let Some(id) = id {
436436
query = query.filter(job::id.eq_any(id));
@@ -444,6 +444,14 @@ impl JobStore for PostgresStore {
444444
query = query.filter(job::processed.eq(processed));
445445
}
446446

447+
if let Some(started) = started {
448+
if started {
449+
query = query.filter(job::started_at.is_null());
450+
} else {
451+
query = query.filter(job::started_at.is_not_null());
452+
}
453+
}
454+
447455
let results = query
448456
.offset(pagination.offset)
449457
.limit(pagination.limit)
@@ -474,6 +482,17 @@ impl JobStore for PostgresStore {
474482
Ok(rfd.into())
475483
}
476484

485+
async fn start(&self, id: i32) -> Result<Option<Job>, StoreError> {
486+
let _ = update(job::dsl::job)
487+
.filter(job::id.eq(id))
488+
.filter(job::started_at.is_null())
489+
.set(job::started_at.eq(Utc::now()))
490+
.execute_async(&*self.pool.get().await?)
491+
.await?;
492+
493+
JobStore::get(self, id).await
494+
}
495+
477496
async fn complete(&self, id: i32) -> Result<Option<Job>, StoreError> {
478497
let _ = update(job::dsl::job)
479498
.filter(job::id.eq(id))

rfd-processor/src/processor.rs

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,42 +25,54 @@ pub async fn processor(ctx: Arc<Context>) -> Result<(), JobError> {
2525
loop {
2626
let jobs = JobStore::list(
2727
&ctx.db.storage,
28-
JobFilter::default().processed(Some(false)),
28+
JobFilter::default().processed(Some(false)).started(Some(false)),
2929
&pagination,
3030
)
3131
.await?;
3232

3333
for job in jobs {
3434
let ctx = ctx.clone();
3535
tokio::spawn(async move {
36-
let location = GitHubRfdLocation {
37-
owner: job.owner.clone(),
38-
repo: job.repository.clone(),
39-
branch: job.branch.clone(),
40-
commit: job.sha.clone(),
41-
default_branch: ctx.github.repository.default_branch.clone(),
42-
};
36+
// Make the job as started
37+
match JobStore::start(&ctx.db.storage, job.id).await {
38+
Ok(Some(job)) => {
4339

44-
let update = GitHubRfdUpdate {
45-
location,
46-
number: job.rfd.into(),
47-
committed_at: job.committed_at,
48-
};
40+
let location = GitHubRfdLocation {
41+
owner: job.owner.clone(),
42+
repo: job.repository.clone(),
43+
branch: job.branch.clone(),
44+
commit: job.sha.clone(),
45+
default_branch: ctx.github.repository.default_branch.clone(),
46+
};
4947

50-
let updater = RfdUpdater::new(&ctx.actions, ctx.processor.update_mode);
48+
let update = GitHubRfdUpdate {
49+
location,
50+
number: job.rfd.into(),
51+
committed_at: job.committed_at,
52+
};
5153

52-
match updater.handle(&ctx, &[update]).await {
53-
Ok(_) => {
54-
let _ = JobStore::complete(&ctx.db.storage, job.id)
55-
.await
56-
.tap_err(|err| {
57-
tracing::error!(?err, "Failed to mark job as completed")
58-
});
54+
let updater = RfdUpdater::new(&ctx.actions, ctx.processor.update_mode);
55+
56+
match updater.handle(&ctx, &[update]).await {
57+
Ok(_) => {
58+
let _ = JobStore::complete(&ctx.db.storage, job.id)
59+
.await
60+
.tap_err(|err| {
61+
tracing::error!(?err, "Failed to mark job as completed")
62+
});
63+
}
64+
Err(err) => {
65+
tracing::error!(?err, "RFD update failed");
66+
67+
// TODO: Mark job as failed or retry?
68+
}
69+
}
70+
}
71+
Ok(None) => {
72+
tracing::error!(?job, "Job that was scheduled to run has gone missing! Was it started by a different task?");
5973
}
6074
Err(err) => {
61-
tracing::error!(?err, "RFD update failed");
62-
63-
// TODO: Mark job as failed or retry?
75+
tracing::warn!(?job, ?err, "Failed to start job. Was it previously started?");
6476
}
6577
}
6678

0 commit comments

Comments
 (0)