Skip to content

Commit f54a3b9

Browse files
committed
add retries
1 parent 02fe03c commit f54a3b9

File tree

1 file changed

+21
-7
lines changed

1 file changed

+21
-7
lines changed

src/store/inner_redis_activation_store.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use anyhow::Error;
66
use async_backtrace::framed;
77
use base64::{Engine as _, engine::general_purpose};
88
use chrono::{DateTime, Duration, Utc};
9-
use deadpool_redis::Pool;
9+
use deadpool_redis::{Pool, Timeouts};
1010
use futures::future::try_join_all;
1111
use redis::AsyncTypedCommands;
1212
use sentry_protos::taskbroker::v1::OnAttemptsExceeded;
@@ -99,10 +99,20 @@ impl InnerRedisActivationStore {
9999
#[framed]
100100
pub async fn get_conn(&self) -> Result<deadpool_redis::Connection, Error> {
101101
let start_time = Instant::now();
102-
let conn = self.pool.get().await?;
103-
let conn_duration = start_time.duration_since(start_time);
104-
metrics::histogram!("redis_store.conn_duration").record(conn_duration.as_millis() as f64);
105-
Ok(conn)
102+
let mut retries = 0;
103+
let timeouts = Timeouts::wait_millis(50);
104+
while retries < 3 {
105+
let conn = self.pool.timeout_get(&timeouts).await;
106+
if conn.is_ok() {
107+
return Ok(conn.unwrap());
108+
}
109+
retries += 1;
110+
}
111+
let end_time = Instant::now();
112+
let duration = end_time.duration_since(start_time);
113+
metrics::histogram!("redis_store.get_conn.duration").record(duration.as_millis() as f64);
114+
metrics::counter!("redis_store.get_conn.retries").increment(retries as u64);
115+
return Err(anyhow::anyhow!("Failed to get connection after 3 retries"));
106116
}
107117

108118
#[framed]
@@ -661,11 +671,15 @@ impl InnerRedisActivationStore {
661671
for field in fields.iter().skip(1) {
662672
pipe.arg(field);
663673
}
664-
let result: Vec<Vec<String>> = pipe.query_async(&mut *conn).await?;
674+
let result = pipe.query_async(&mut *conn).await;
675+
if result.is_err() {
676+
return Ok(HashMap::new());
677+
}
678+
let raw_fields: Vec<Vec<String>> = result.unwrap();
665679
// Returns an array of tuples with the values in the same order as the fields array.
666680
// These needs to be combined into a map.
667681
let mut fields_map = HashMap::new();
668-
for values in result.iter() {
682+
for values in raw_fields.iter() {
669683
for (idx, arg_name) in fields.iter().enumerate() {
670684
fields_map.insert(arg_name.to_string(), values[idx].clone());
671685
}

0 commit comments

Comments
 (0)