|
1 | | -use std::{collections::HashSet, env}; |
2 | | - |
3 | 1 | use anyhow::{anyhow, Result}; |
4 | 2 | use log::error; |
5 | 3 | use redis_work_queue::{Item, KeyPrefix, WorkQueue}; |
6 | 4 | use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; |
| 5 | +use std::{collections::HashSet, env, thread, time}; |
7 | 6 |
|
8 | 7 | use crate::{ |
9 | 8 | app::{RedisJob, SimpleRiderChange}, |
@@ -46,8 +45,7 @@ pub async fn main() -> Result<()> { |
46 | 45 | env::var("PINGS_REMOVE_ROUTE").expect("PINGS_REMOVE_ROUTE must be set"), |
47 | 46 | )?; |
48 | 47 |
|
49 | | - work_loop(queue, db_pool, pings).await?; |
50 | | - Ok(()) |
| 48 | + work_loop(queue, db_pool, pings).await |
51 | 49 | } |
52 | 50 |
|
53 | 51 | async fn get_simple_data( |
@@ -263,27 +261,36 @@ pub async fn work_loop( |
263 | 261 | db_pool: Pool<Postgres>, |
264 | 262 | pings: PingClient, |
265 | 263 | ) -> Result<()> { |
| 264 | + let mut queue_connect_failure = 0; |
| 265 | + let three_sec = time::Duration::from_secs(3); |
266 | 266 | loop { |
267 | 267 | // Wait for a job with no timeout and a lease time of 5 seconds. |
268 | 268 | let job: Item = match queue.get_job().await { |
269 | 269 | Ok(job) => job, |
270 | 270 | Err(err) => { |
271 | | - error!("{}", err); |
| 271 | + error!("Failed to Get Job: {}", err); |
| 272 | + queue_connect_failure += 1; |
| 273 | + thread::sleep(three_sec); |
| 274 | + if queue_connect_failure >= 3 { |
| 275 | + error!("Failed to Fetch Job 3+ Times! Failing..."); |
| 276 | + return Err(anyhow!("Fetch Job Failed 3 Times. Is Redis Running?")); |
| 277 | + } |
272 | 278 | continue; |
273 | 279 | } |
274 | 280 | }; |
| 281 | + queue_connect_failure = 0; |
275 | 282 | match work(&job, &db_pool, &pings, &mut queue).await { |
276 | 283 | // Mark successful jobs as complete |
277 | 284 | Ok(()) => { |
278 | 285 | queue.complete(&job).await?; |
279 | 286 | } |
280 | 287 | // Drop a job that should be retried - it will be returned to the work queue after |
281 | 288 | // the (5 second) lease expires. |
282 | | - Err(err) if err.should_retry => error!("{}", err.msg), |
| 289 | + Err(err) if err.should_retry => error!("Job Failed: {}, Retrying", err.msg), |
283 | 290 | // Errors that shouldn't cause a retry should mark the job as complete so it isn't |
284 | 291 | // tried again. |
285 | 292 | Err(err) => { |
286 | | - error!("{}", err.msg); |
| 293 | + error!("Job Failed: {}, Not Retrying", err.msg); |
287 | 294 | queue.complete(&job).await?; |
288 | 295 | } |
289 | 296 | } |
|
0 commit comments