Skip to content

Commit e716b51

Browse files
author
Mauricio Cassola
committed
Run scheduled events once at the beginning
1 parent e08fc2b commit e716b51

File tree

5 files changed

+122
-14
lines changed

5 files changed

+122
-14
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ hyper = { version = "0.14.4", features = ["server", "stream"]}
2323
tokio = { version = "1.7.1", features = ["macros", "time", "rt"] }
2424
futures = { version = "0.3", default-features = false, features = ["std"] }
2525
async-trait = "0.1.31"
26-
uuid = { version = "0.8", features = ["v4"] }
26+
uuid = { version = "0.8", features = ["v4", "serde"] }
2727
tracing = "0.1"
2828
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
2929
url = "2.1.0"
3030
once_cell = "1"
3131
chrono = { version = "0.4", features = ["serde"] }
32-
tokio-postgres = { version = "0.7.2", features = ["with-chrono-0_4", "with-serde_json-1"] }
32+
tokio-postgres = { version = "0.7.2", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-0_8"] }
3333
postgres-native-tls = "0.5.0"
3434
native-tls = "0.2"
3535
serde_path_to_error = "0.1.2"

src/db.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use postgres_native_tls::MakeTlsConnector;
44
use std::sync::{Arc, Mutex};
55
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
66
use tokio_postgres::Client as DbClient;
7+
use crate::db::events::*;
78

89
pub mod events;
910
pub mod issue_data;
@@ -180,6 +181,29 @@ pub async fn run_migrations(client: &DbClient) -> anyhow::Result<()> {
180181
Ok(())
181182
}
182183

184+
pub async fn run_scheduled_events(db: &DbClient) -> anyhow::Result<()> {
185+
// table lock ????
186+
187+
let events = get_events_to_execute(&db).await;
188+
println!("events to execute: {:#?}", events);
189+
190+
for event in events.unwrap().iter() {
191+
update_event_executed_at(&db, &event.event_id).await;
192+
match call_event_handler_based_on_event_name {
193+
Ok(r) => {
194+
tracing::trace!("event succesfully executed (id={})", event.event_id);
195+
delete_event(&db, &event.event_id).await;
196+
},
197+
Err(e) => {
198+
tracing::trace!("event failed on execution (id={:?}, error={:?})", event.event_id, e);
199+
update_event_failed_message(&db, &event.event_id, &e).await;
200+
},
201+
}
202+
}
203+
204+
Ok(())
205+
}
206+
183207
static MIGRATIONS: &[&str] = &[
184208
"
185209
CREATE TABLE notifications (

src/db/events.rs

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,103 @@
11
//! The `events` table provides a way to have scheduled events
2-
3-
use anyhow::{Result};
2+
use anyhow::{Result, Context as _};
43
use chrono::{DateTime, FixedOffset};
54
use tokio_postgres::{Client as DbClient};
65
use uuid::Uuid;
6+
use serde::{Deserialize, Serialize};
77

8-
#[derive(Debug)]
8+
#[derive(Serialize, Deserialize, Debug)]
99
pub struct Event {
1010
pub event_id: Uuid,
1111
pub event_name: String,
1212
pub expected_event_time: DateTime<FixedOffset>,
13-
pub event_metadata: String,
13+
// pub event_metadata: String,
1414
pub executed_at: DateTime<FixedOffset>,
1515
pub failed: Option<String>,
1616
}
1717

18-
pub async fn insert_failed(db: &DbClient) -> Result<()> {
19-
unimplemented!();
18+
pub async fn insert_event(db: &DbClient, event: &Event) -> Result<()> {
19+
tracing::trace!("insert_event(id={})", event.event_id);
20+
21+
db.execute(
22+
"INSERT INTO events (event_id, event_name, expected_event_time, event_metadata, executed_at, failed) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING",
23+
&[&event.event_id, &event.event_name, &event.expected_event_time, &"", &event.executed_at, &event.failed],
24+
)
25+
.await
26+
.context("inserting event")?;
27+
28+
Ok(())
29+
}
30+
31+
pub async fn delete_event(db: &DbClient, event_id: &Uuid) -> Result<()> {
32+
tracing::trace!("delete_event(id={})", event_id);
33+
34+
db.execute(
35+
"DELETE FROM events WHERE event_id = $1",
36+
&[&event_id],
37+
)
38+
.await
39+
.context("deleting event")?;
40+
41+
Ok(())
2042
}
2143

22-
pub async fn delete_event(db: &DbClient) -> Result<()> {
23-
unimplemented!();
44+
pub async fn update_event_failed_message(db: &DbClient, event_id: &Uuid, message: &String) -> Result<()> {
45+
tracing::trace!("update_event_failed_message(id={})", event_id);
46+
47+
db.execute(
48+
"UPDATE events SET failed = $2 WHERE event_id = $1",
49+
&[&event_id, &message],
50+
)
51+
.await
52+
.context("updating event failed message")?;
53+
54+
Ok(())
2455
}
2556

26-
pub async fn get_events_to_execute(db: &DbClient) -> Result<Vec<Event>> {
57+
pub async fn update_event_executed_at(db: &DbClient, event_id: &Uuid) -> Result<()> {
58+
tracing::trace!("update_event_executed_at(id={})", event_id);
59+
60+
db.execute(
61+
"UPDATE events SET executed_at = now() WHERE event_id = $1",
62+
&[&event_id],
63+
)
64+
.await
65+
.context("updating event executed at")?;
66+
67+
Ok(())
68+
}
69+
70+
// Selects all events with:
71+
// - event_time's in the past
72+
// - failed is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
73+
pub async fn get_events_to_execute(db: &DbClient) -> Result<Vec<Event>> {
2774
let events = db
2875
.query(
2976
"
30-
SELECT * FROM events",
77+
SELECT * FROM events WHERE expected_event_time <= now() AND (failed IS NULL OR executed_at <= now() - INTERVAL '60 minutes')",
3178
&[],
3279
)
3380
.await
34-
.unwrap();
81+
.context("Getting events data")?;
82+
83+
let mut data = Vec::with_capacity(events.len());
84+
for event in events {
85+
let event_id: Uuid = event.get(0);
86+
let event_name: String = event.get(1);
87+
let expected_event_time: DateTime<FixedOffset> = event.get(2);
88+
// let event_metadata: String = event.get(3);
89+
let executed_at: DateTime<FixedOffset> = event.get(4);
90+
let failed: Option<String> = event.get(5);
91+
92+
data.push(Event {
93+
event_id,
94+
event_name,
95+
expected_event_time,
96+
// event_metadata,
97+
executed_at,
98+
failed
99+
});
100+
}
35101

36-
Ok(vec![])
102+
Ok(data)
37103
}

src/main.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use tower::{Service, ServiceExt};
1111
use tracing as log;
1212
use tracing::Instrument;
1313
use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
14+
// use std::{time::Duration, thread};
15+
// use tokio::task;
1416

1517
async fn handle_agenda_request(req: String) -> anyhow::Result<String> {
1618
if req == "/agenda/lang/triage" {
@@ -237,6 +239,20 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
237239
.await
238240
.context("database migrations")?;
239241

242+
db::run_scheduled_events(&*pool.get().await)
243+
.await
244+
.context("database scheduled_events")?;
245+
246+
// task::spawn(async move {
247+
// loop {
248+
// thread::sleep(Duration::from_secs(60)); // every one minute
249+
250+
// db::run_scheduled_events(&*pool.get().await)
251+
// .await
252+
// .context("database scheduled_events")?;
253+
// }
254+
// });
255+
240256
let client = Client::new();
241257
let gh = github::GithubClient::new_with_default_token(client.clone());
242258
let oc = octocrab::OctocrabBuilder::new()

0 commit comments

Comments
 (0)