|
1 | | -use std::{collections::HashSet, env}; |
2 | | - |
| 1 | +use std::{collections::HashSet, env, thread, time}; |
3 | 2 | use anyhow::{anyhow, Result}; |
4 | 3 | use log::error; |
5 | 4 | use redis_work_queue::{Item, KeyPrefix, WorkQueue}; |
@@ -46,8 +45,7 @@ pub async fn main() -> Result<()> { |
46 | 45 | env::var("PINGS_REMOVE_ROUTE").expect("PINGS_REMOVE_ROUTE must be set"), |
47 | 46 | )?; |
48 | 47 |
|
49 | | - work_loop(queue, db_pool, pings).await?; |
50 | | - Ok(()) |
| 48 | + work_loop(queue, db_pool, pings).await |
51 | 49 | } |
52 | 50 |
|
53 | 51 | async fn get_simple_data( |
@@ -263,15 +261,24 @@ pub async fn work_loop( |
263 | 261 | db_pool: Pool<Postgres>, |
264 | 262 | pings: PingClient, |
265 | 263 | ) -> Result<()> { |
| 264 | + let mut queue_connect_failure = 0; |
| 265 | + let three_sec = time::Duration::from_secs(3); |
266 | 266 | loop { |
267 | 267 | // Wait for a job with no timeout and a lease time of 5 seconds. |
268 | 268 | let job: Item = match queue.get_job().await { |
269 | 269 | Ok(job) => job, |
270 | 270 | Err(err) => { |
271 | 271 | error!("Failed to Get Job: {}", err); |
| 272 | + queue_connect_failure += 1; |
| 273 | + thread::sleep(three_sec); |
| 274 | + if queue_connect_failure >= 3 { |
| 275 | + error!("Failed to Fetch Job 3+ Times! Failing..."); |
| 276 | + return Err(anyhow!("Fetch Job Failed 3 Times. Is Redis Running?")); |
| 277 | + } |
272 | 278 | continue; |
273 | 279 | } |
274 | 280 | }; |
| 281 | + queue_connect_failure = 0; |
275 | 282 | match work(&job, &db_pool, &pings, &mut queue).await { |
276 | 283 | // Mark successful jobs as complete |
277 | 284 | Ok(()) => { |
|
0 commit comments