Skip to content

Commit 0593e07

Browse files
author
Mauricio Cassola
committed
Rename to jobs and some refactors
1 parent b4eb9dd commit 0593e07

File tree

7 files changed

+170
-156
lines changed

7 files changed

+170
-156
lines changed

src/db.rs

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use postgres_native_tls::MakeTlsConnector;
44
use std::sync::{Arc, Mutex};
55
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
66
use tokio_postgres::Client as DbClient;
7-
use crate::db::events::*;
8-
use crate::handlers::events::handle_event;
7+
use crate::db::jobs::*;
8+
use crate::handlers::jobs::handle_job;
99

10-
pub mod events;
10+
pub mod jobs;
1111
pub mod issue_data;
1212
pub mod notifications;
1313
pub mod rustc_commits;
@@ -182,26 +182,26 @@ pub async fn run_migrations(client: &DbClient) -> anyhow::Result<()> {
182182
Ok(())
183183
}
184184

185-
pub async fn run_scheduled_events(db: &DbClient) -> anyhow::Result<()> {
186-
let events = get_events_to_execute(&db).await.unwrap();
187-
println!("events to execute: {:#?}", events);
188-
tracing::trace!("events to execute: {:#?}", events);
185+
pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
186+
let jobs = get_jobs_to_execute(&db).await.unwrap();
187+
println!("jobs to execute: {:#?}", jobs);
188+
tracing::trace!("jobs to execute: {:#?}", jobs);
189189

190-
for event in events.iter() {
191-
update_event_executed_at(&db, &event.event_id).await?;
190+
for job in jobs.iter() {
191+
update_job_executed_at(&db, &job.id).await?;
192192

193-
match handle_event(&event.event_name, &event.event_metadata).await {
193+
match handle_job(&job.name, &job.metadata).await {
194194
Ok(_) => {
195-
println!("event succesfully executed (id={})", event.event_id);
196-
tracing::trace!("event succesfully executed (id={})", event.event_id);
195+
println!("job succesfully executed (id={})", job.id);
196+
tracing::trace!("job succesfully executed (id={})", job.id);
197197

198-
delete_event(&db, &event.event_id).await?;
198+
delete_job(&db, &job.id).await?;
199199
},
200200
Err(e) => {
201-
println!("event failed on execution (id={:?}, error={:?})", event.event_id, e);
202-
tracing::trace!("event failed on execution (id={:?}, error={:?})", event.event_id, e);
201+
println!("job failed on execution (id={:?}, error={:?})", job.id, e);
202+
tracing::trace!("job failed on execution (id={:?}, error={:?})", job.id, e);
203203

204-
update_event_failed_message(&db, &event.event_id, &e.to_string()).await?;
204+
update_job_error_message(&db, &job.id, &e.to_string()).await?;
205205
},
206206
}
207207
}
@@ -247,13 +247,19 @@ CREATE TABLE issue_data (
247247
);
248248
",
249249
"
250-
CREATE TABLE events (
251-
event_id UUID PRIMARY KEY,
252-
event_name TEXT NOT NULL,
253-
expected_event_time TIMESTAMP WITH TIME ZONE NOT NULL,
254-
event_metadata JSONB,
255-
executed_at TIMESTAMP WITH TIME ZONE NOT NULL,
256-
failed TEXT
250+
CREATE TABLE jobs (
251+
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
252+
name TEXT NOT NULL,
253+
expected_time TIMESTAMP WITH TIME ZONE NOT NULL,
254+
metadata JSONB,
255+
executed_at TIMESTAMP WITH TIME ZONE,
256+
error_message TEXT
257257
);
258258
",
259+
"
260+
CREATE UNIQUE INDEX jobs_name_expected_time_unique_index
261+
ON jobs (
262+
name, expected_time
263+
);
264+
"
259265
];

src/db/events.rs

Lines changed: 0 additions & 103 deletions
This file was deleted.

src/db/jobs.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
//! The `jobs` table provides a way to have scheduled jobs
2+
use anyhow::{Result, Context as _};
3+
use chrono::{DateTime, FixedOffset};
4+
use tokio_postgres::{Client as DbClient};
5+
use uuid::Uuid;
6+
use serde::{Deserialize, Serialize};
7+
8+
#[derive(Serialize, Deserialize, Debug)]
9+
pub struct Job {
10+
pub id: Uuid,
11+
pub name: String,
12+
pub expected_time: DateTime<FixedOffset>,
13+
pub metadata: serde_json::Value,
14+
pub executed_at: Option<DateTime<FixedOffset>>,
15+
pub error_message: Option<String>,
16+
}
17+
18+
pub async fn insert_job(
19+
db: &DbClient,
20+
name: &String,
21+
expected_time: &DateTime<FixedOffset>,
22+
metadata: &serde_json::Value
23+
) -> Result<()> {
24+
tracing::trace!("insert_job(name={})", name);
25+
26+
db.execute(
27+
"INSERT INTO jobs (name, expected_time, metadata) VALUES ($1, $2, $3)
28+
ON CONFLICT (name, expected_time) DO UPDATE SET metadata = EXCLUDED.metadata",
29+
&[&name, &expected_time, &metadata],
30+
)
31+
.await
32+
.context("Inserting job")?;
33+
34+
Ok(())
35+
}
36+
37+
pub async fn delete_job(db: &DbClient, id: &Uuid) -> Result<()> {
38+
tracing::trace!("delete_job(id={})", id);
39+
40+
db.execute(
41+
"DELETE FROM jobs WHERE id = $1",
42+
&[&id],
43+
)
44+
.await
45+
.context("Deleting job")?;
46+
47+
Ok(())
48+
}
49+
50+
pub async fn update_job_error_message(db: &DbClient, id: &Uuid, message: &String) -> Result<()> {
51+
tracing::trace!("update_job_error_message(id={})", id);
52+
53+
db.execute(
54+
"UPDATE jobs SET error_message = $2 WHERE id = $1",
55+
&[&id, &message],
56+
)
57+
.await
58+
.context("Updating job error message")?;
59+
60+
Ok(())
61+
}
62+
63+
pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> {
64+
tracing::trace!("update_job_executed_at(id={})", id);
65+
66+
db.execute(
67+
"UPDATE jobs SET executed_at = now() WHERE id = $1",
68+
&[&id],
69+
)
70+
.await
71+
.context("Updating job executed at")?;
72+
73+
Ok(())
74+
}
75+
76+
// Selects all jobs with:
77+
// - expected_time in the past
78+
// - error_message is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
79+
pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
80+
let jobs = db
81+
.query(
82+
"
83+
SELECT * FROM jobs WHERE expected_time <= now() AND (error_message IS NULL OR executed_at <= now() - INTERVAL '60 minutes')",
84+
&[],
85+
)
86+
.await
87+
.context("Getting jobs data")?;
88+
89+
let mut data = Vec::with_capacity(jobs.len());
90+
for job in jobs {
91+
let id: Uuid = job.get(0);
92+
let name: String = job.get(1);
93+
let expected_time: DateTime<FixedOffset> = job.get(2);
94+
let metadata: serde_json::Value = job.get(3);
95+
let executed_at: Option<DateTime<FixedOffset>> = job.get(4);
96+
let error_message: Option<String> = job.get(5);
97+
98+
data.push(Job {
99+
id,
100+
name,
101+
expected_time,
102+
metadata: metadata,
103+
executed_at: executed_at,
104+
error_message
105+
});
106+
}
107+
108+
Ok(data)
109+
}

src/handlers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ mod review_submitted;
4343
mod rfc_helper;
4444
mod rustc_commits;
4545
mod shortcut;
46-
pub mod events;
46+
pub mod jobs;
4747

4848
pub async fn handle(ctx: &Context, event: &Event) -> Vec<HandlerError> {
4949
let config = config::get(&ctx.github, event.repo()).await;

src/handlers/events.rs

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/handlers/jobs.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Function to match the scheduled job function with its corresponding handler.
2+
// In case you want to add a new one, just add a new clause to the match with
3+
// the job name and the corresponding function.
4+
5+
// The metadata is a serde_json::Value
6+
// Please refer to https://docs.rs/serde_json/latest/serde_json/value/fn.from_value.html
7+
// on how to interpret it as an instance of type T, implementing Deserialize.
8+
9+
pub async fn handle_job(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
10+
match name {
11+
_ => default(&name, &metadata)
12+
}
13+
}
14+
15+
fn default(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
16+
println!("handle_job fell into default case: (name={:?}, metadata={:?})", name, metadata);
17+
tracing::trace!("handle_job fell into default case: (name={:?}, metadata={:?})", name, metadata);
18+
19+
Ok(())
20+
}

src/main.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ use futures::StreamExt;
66
use hyper::{header, Body, Request, Response, Server, StatusCode};
77
use reqwest::Client;
88
use route_recognizer::Router;
9-
use std::{env, net::SocketAddr, sync::Arc};
9+
use std::{env, net::SocketAddr, sync::Arc, time::Duration};
1010
use tower::{Service, ServiceExt};
1111
use tracing as log;
1212
use tracing::Instrument;
1313
use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
14-
use std::{time::Duration, thread};
15-
use tokio::task;
14+
use tokio::{task, time::sleep};
15+
16+
const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60;
1617

1718
async fn handle_agenda_request(req: String) -> anyhow::Result<String> {
1819
if req == "/agenda/lang/triage" {
@@ -239,15 +240,17 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
239240
.await
240241
.context("database migrations")?;
241242

243+
// spawning a background task that will run the scheduled jobs
244+
// every JOB_PROCESSING_CADENCE_IN_SECS
242245
task::spawn(async move {
243246
let pool = db::ClientPool::new();
244247

245-
loop {
246-
thread::sleep(Duration::from_secs(60)); // every one minute
247-
248-
db::run_scheduled_events(&*pool.get().await)
248+
loop {
249+
db::run_scheduled_jobs(&*pool.get().await)
249250
.await
250-
.context("database scheduled_events").unwrap();
251+
.context("run database scheduled jobs").unwrap();
252+
253+
sleep(Duration::from_secs(JOB_PROCESSING_CADENCE_IN_SECS)).await;
251254
}
252255
});
253256

0 commit comments

Comments
 (0)