Skip to content

Commit ac1d005

Browse files
author
Mauricio Cassola
committed
Cron jobs support in main runner
1 parent 0593e07 commit ac1d005

File tree

4 files changed

+71
-12
lines changed

4 files changed

+71
-12
lines changed

Cargo.lock

Lines changed: 17 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ tower = { version = "0.4.13", features = ["util", "limit", "buffer", "load-shed"
4242
github-graphql = { path = "github-graphql" }
4343
rand = "0.8.5"
4444
ignore = "0.4.18"
45+
postgres-types = { version = "0.2.4", features = ["derive"] }
4546

4647
[dependencies.serde]
4748
version = "1"

src/db.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,15 @@ pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
195195
println!("job succesfully executed (id={})", job.id);
196196
tracing::trace!("job succesfully executed (id={})", job.id);
197197

198+
if let Some(frequency) = job.frequency {
199+
let duration = get_duration_from_cron(frequency, job.frequency_unit.as_ref().unwrap());
200+
let new_expected_time = job.expected_time.checked_add_signed(duration).unwrap();
201+
202+
insert_job(&db, &job.name, &new_expected_time, &Some(frequency), &job.frequency_unit, &job.metadata).await?;
203+
println!("job succesfully reinserted (name={})", job.name);
204+
tracing::trace!("job succesfully reinserted (name={})", job.name);
205+
}
206+
198207
delete_job(&db, &job.id).await?;
199208
},
200209
Err(e) => {
@@ -245,12 +254,17 @@ CREATE TABLE issue_data (
245254
data JSONB,
246255
PRIMARY KEY (repo, issue_number, key)
247256
);
257+
",
258+
"
259+
CREATE TYPE frequency_unit AS ENUM ('days', 'hours', 'minutes', 'seconds');
248260
",
249261
"
250262
CREATE TABLE jobs (
251263
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
252264
name TEXT NOT NULL,
253265
expected_time TIMESTAMP WITH TIME ZONE NOT NULL,
266+
frequency INTEGER,
267+
frequency_unit frequency_unit,
254268
metadata JSONB,
255269
executed_at TIMESTAMP WITH TIME ZONE,
256270
error_message TEXT

src/db/jobs.rs

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,50 @@
11
//! The `jobs` table provides a way to have scheduled jobs
22
use anyhow::{Result, Context as _};
3-
use chrono::{DateTime, FixedOffset};
3+
use chrono::{DateTime, FixedOffset, Duration};
44
use tokio_postgres::{Client as DbClient};
55
use uuid::Uuid;
66
use serde::{Deserialize, Serialize};
7+
use postgres_types::{ToSql, FromSql};
78

89
#[derive(Serialize, Deserialize, Debug)]
910
pub struct Job {
1011
pub id: Uuid,
1112
pub name: String,
1213
pub expected_time: DateTime<FixedOffset>,
14+
pub frequency: Option<i32>,
15+
pub frequency_unit: Option<FrequencyUnit>,
1316
pub metadata: serde_json::Value,
1417
pub executed_at: Option<DateTime<FixedOffset>>,
1518
pub error_message: Option<String>,
1619
}
1720

21+
#[derive(Serialize, Deserialize, Debug, ToSql, FromSql)]
22+
#[postgres(name = "frequency_unit")]
23+
pub enum FrequencyUnit {
24+
#[postgres(name = "days")]
25+
Days,
26+
#[postgres(name = "hours")]
27+
Hours,
28+
#[postgres(name = "minutes")]
29+
Minutes,
30+
#[postgres(name = "seconds")]
31+
Seconds,
32+
}
33+
1834
pub async fn insert_job(
1935
db: &DbClient,
2036
name: &String,
2137
expected_time: &DateTime<FixedOffset>,
38+
frequency: &Option<i32>,
39+
frequency_unit: &Option<FrequencyUnit>,
2240
metadata: &serde_json::Value
2341
) -> Result<()> {
2442
tracing::trace!("insert_job(name={})", name);
2543

2644
db.execute(
27-
"INSERT INTO jobs (name, expected_time, metadata) VALUES ($1, $2, $3)
45+
"INSERT INTO jobs (name, expected_time, frequency, frequency_unit, metadata) VALUES ($1, $2, $3, $4, $5)
2846
ON CONFLICT (name, expected_time) DO UPDATE SET metadata = EXCLUDED.metadata",
29-
&[&name, &expected_time, &metadata],
47+
&[&name, &expected_time, &frequency, &frequency_unit, &metadata],
3048
)
3149
.await
3250
.context("Inserting job")?;
@@ -91,19 +109,32 @@ pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
91109
let id: Uuid = job.get(0);
92110
let name: String = job.get(1);
93111
let expected_time: DateTime<FixedOffset> = job.get(2);
94-
let metadata: serde_json::Value = job.get(3);
95-
let executed_at: Option<DateTime<FixedOffset>> = job.get(4);
96-
let error_message: Option<String> = job.get(5);
112+
let frequency: Option<i32> = job.get(3);
113+
let frequency_unit: Option<FrequencyUnit> = job.get(4);
114+
let metadata: serde_json::Value = job.get(5);
115+
let executed_at: Option<DateTime<FixedOffset>> = job.get(6);
116+
let error_message: Option<String> = job.get(7);
97117

98118
data.push(Job {
99119
id,
100120
name,
101121
expected_time,
102-
metadata: metadata,
103-
executed_at: executed_at,
122+
frequency,
123+
frequency_unit,
124+
metadata,
125+
executed_at,
104126
error_message
105127
});
106128
}
107129

108130
Ok(data)
109131
}
132+
133+
pub fn get_duration_from_cron(cron_period: i32, cron_unit: &FrequencyUnit) -> Duration {
134+
match cron_unit {
135+
FrequencyUnit::Days => Duration::days(cron_period as i64),
136+
FrequencyUnit::Hours => Duration::hours(cron_period as i64),
137+
FrequencyUnit::Minutes => Duration::minutes(cron_period as i64),
138+
FrequencyUnit::Seconds => Duration::seconds(cron_period as i64),
139+
}
140+
}

0 commit comments

Comments
 (0)