Skip to content

Commit ac7b0ff

Browse files
committed
Pr feedback - more rubust error handling
1 parent 28ad8cc commit ac7b0ff

File tree

4 files changed

+42
-28
lines changed

4 files changed

+42
-28
lines changed

database/src/pool.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,14 @@ pub trait Connection: Send + Sync {
190190
&self,
191191
statuses: &[BenchmarkRequestStatus],
192192
days: Option<i32>,
193-
) -> Vec<BenchmarkRequest>;
193+
) -> anyhow::Result<Vec<BenchmarkRequest>>;
194194

195195
/// Update the status of a `benchmark_request`
196196
async fn update_benchmark_request_status(
197197
&mut self,
198198
benchmark_request: &BenchmarkRequest,
199199
benchmark_request_status: BenchmarkRequestStatus,
200-
);
200+
) -> anyhow::Result<()>;
201201
}
202202

203203
#[async_trait::async_trait]

database/src/pool/postgres.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1419,7 +1419,7 @@ where
14191419
&self,
14201420
statuses: &[BenchmarkRequestStatus],
14211421
days: Option<i32>,
1422-
) -> Vec<BenchmarkRequest> {
1422+
) -> anyhow::Result<Vec<BenchmarkRequest>> {
14231423
let mut query = "
14241424
SELECT
14251425
tag,
@@ -1441,15 +1441,14 @@ where
14411441
// `::INT`, we can't do; `INTERVAL '$2 day'` which, while looking
14421442
// natural is invalid.
14431443
query += "AND created_at > current_date - $2::INT * INTERVAL '1 day'";
1444-
self.conn()
1445-
.query(&query, &[&statuses, &days])
1446-
.await
1447-
.unwrap()
1444+
self.conn().query(&query, &[&statuses, &days]).await
14481445
} else {
1449-
self.conn().query(&query, &[&statuses]).await.unwrap()
1450-
};
1446+
self.conn().query(&query, &[&statuses]).await
1447+
}
1448+
.context("Failed to get benchmark requests")?;
14511449

1452-
rows.iter()
1450+
let benchmark_requests = rows
1451+
.iter()
14531452
.map(|row| {
14541453
let tag = row.get::<_, String>(0);
14551454
let parent_sha = row.get::<_, Option<String>>(1);
@@ -1501,22 +1500,31 @@ where
15011500
),
15021501
}
15031502
})
1504-
.collect()
1503+
.collect();
1504+
Ok(benchmark_requests)
15051505
}
15061506

15071507
async fn update_benchmark_request_status(
15081508
&mut self,
15091509
benchmark_request: &BenchmarkRequest,
15101510
benchmark_request_status: BenchmarkRequestStatus,
1511-
) {
1512-
let tx = self.conn_mut().transaction().await.unwrap();
1511+
) -> anyhow::Result<()> {
1512+
let tx = self
1513+
.conn_mut()
1514+
.transaction()
1515+
.await
1516+
.context("failed to start transaction")?;
1517+
15131518
tx.execute(
15141519
"UPDATE benchmark_request SET status = $1 WHERE tag = $2;",
15151520
&[&benchmark_request_status, &benchmark_request.tag()],
15161521
)
15171522
.await
1518-
.unwrap();
1519-
tx.commit().await.unwrap();
1523+
.context("failed to execute UPDATE benchmark_request")?;
1524+
1525+
tx.commit().await.context("failed to commit transaction")?;
1526+
1527+
Ok(())
15201528
}
15211529
}
15221530

database/src/pool/sqlite.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,15 +1272,15 @@ impl Connection for SqliteConnection {
12721272
&self,
12731273
_statuses: &[BenchmarkRequestStatus],
12741274
_days: Option<i32>,
1275-
) -> Vec<BenchmarkRequest> {
1275+
) -> anyhow::Result<Vec<BenchmarkRequest>> {
12761276
no_queue_implementation_abort!()
12771277
}
12781278

12791279
async fn update_benchmark_request_status(
12801280
&mut self,
12811281
_benchmark_request: &BenchmarkRequest,
12821282
_benchmark_request_status: BenchmarkRequestStatus,
1283-
) {
1283+
) -> anyhow::Result<()> {
12841284
no_queue_implementation_abort!()
12851285
}
12861286
}

site/src/job_queue.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tokio::time::{self, Duration};
99

1010
/// Store the latest master commits or do nothing if all of them are
1111
/// already in the database
12-
async fn enqueue_master_commits(ctxt: &Arc<SiteCtxt>) {
12+
async fn enqueue_master_commits(ctxt: &Arc<SiteCtxt>) -> anyhow::Result<()> {
1313
let conn = ctxt.conn().await;
1414
let master_commits = &ctxt.get_master_commits().commits;
1515
// TODO; delete at some point in the future
@@ -32,6 +32,7 @@ async fn enqueue_master_commits(ctxt: &Arc<SiteCtxt>) {
3232
conn.insert_benchmark_request(&benchmark).await;
3333
}
3434
}
35+
Ok(())
3536
}
3637

3738
// This function is split for testing purposes as mocking out `SiteCtxt` is non
@@ -86,7 +87,7 @@ fn get_next_benchmark_request(
8687
}
8788

8889
/// Enqueue the job into the job_queue
89-
async fn enqueue_next_job(site_ctxt: &Arc<SiteCtxt>) {
90+
async fn enqueue_next_job(site_ctxt: &Arc<SiteCtxt>) -> anyhow::Result<()> {
9091
let mut conn = site_ctxt.conn().await;
9192
let mut pending = conn
9293
.get_benchmark_requests_by_status(
@@ -96,35 +97,38 @@ async fn enqueue_next_job(site_ctxt: &Arc<SiteCtxt>) {
9697
],
9798
None,
9899
)
99-
.await;
100+
.await?;
100101

101102
// No requests to process or we have something currently in progress
102103
if pending
103104
.iter()
104105
.any(|r| r.status == BenchmarkRequestStatus::InProgress)
105106
{
106-
return;
107+
return Ok(());
107108
}
108109

109110
// We draw back the last 30 days of completed requests
110111
let completed = conn
111112
.get_benchmark_requests_by_status(&[BenchmarkRequestStatus::Completed], Some(30))
112-
.await;
113+
.await?;
113114

114115
// And we now see if we have another request that can be processed
115116
if let Some(next_request) = get_next_benchmark_request(&mut pending, &completed) {
116117
// TODO; we simply flip the status for now however this should also
117118
// create the relevant jobs in the `job_queue`
118119
conn.update_benchmark_request_status(&next_request, BenchmarkRequestStatus::InProgress)
119-
.await;
120+
.await?
120121
}
122+
123+
Ok(())
121124
}
122125

123126
/// For queueing jobs, add the jobs you want to queue to this function
124-
async fn cron_enqueue_jobs(site_ctxt: &Arc<SiteCtxt>) {
127+
async fn cron_enqueue_jobs(site_ctxt: &Arc<SiteCtxt>) -> anyhow::Result<()> {
125128
// Put the master commits into the `benchmark_requests` queue
126-
enqueue_master_commits(site_ctxt).await;
127-
enqueue_next_job(site_ctxt).await;
129+
enqueue_master_commits(site_ctxt).await?;
130+
enqueue_next_job(site_ctxt).await?;
131+
Ok(())
128132
}
129133

130134
/// Entry point for the cron
@@ -139,8 +143,10 @@ pub async fn cron_main(site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>, seconds: u
139143
let guard = ctxt.read();
140144
guard.as_ref().cloned()
141145
} {
142-
cron_enqueue_jobs(&ctxt_clone).await;
143-
log::info!("Cron job executed at: {:?}", std::time::SystemTime::now());
146+
match cron_enqueue_jobs(&ctxt_clone).await {
147+
Ok(_) => log::info!("Cron job executed at: {:?}", std::time::SystemTime::now()),
148+
Err(e) => log::error!("Cron job failed to execute {}", e),
149+
}
144150
}
145151
}
146152
}

0 commit comments

Comments
 (0)