Skip to content

Commit 97a5296

Browse files
committed
Add env var for the total recovery grace threshold
1 parent afeb7f8 commit 97a5296

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

src/aggregator/processing.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ use crate::models::{
1212
use crate::profiler::{Profiler, ProfilerStat};
1313
use crate::record_stream::{DynRecordStream, RecordStreamArc};
1414
use crate::star::{recover_key, recover_msgs, AppSTARError, MsgRecoveryInfo};
15+
use crate::util::parse_env_var;
1516
use star_constellation::api::NestedMessage;
1617
use star_constellation::Error as ConstellationError;
1718
use std::collections::HashSet;
1819
use std::sync::{Arc, Mutex};
1920
use std::time::Instant;
2021
use tokio::task::JoinHandle;
2122

22-
const MAX_MSGS_TOTAL_RECOVERY_FAIL: usize = 75;
23+
const TOTAL_RECOVERY_FAIL_MAX_MSGS_ENV: &str = "TOTAL_RECOVERY_FAIL_MAX_MSGS";
24+
const TOTAL_RECOVERY_FAIL_MAX_MSGS_DEFAULT: &str = "75";
2325

2426
pub async fn process_expired_epoch(
2527
conn: Arc<Mutex<DBConnection>>,
@@ -215,7 +217,12 @@ fn process_one_layer(
215217
Ok(info) => info,
216218
Err(e) => {
217219
debug!("failed to recover {threshold_msgs_len} messages for threshold {threshold} on id {id} for tag {}: {e}", hex::encode(msg_tag));
218-
if threshold_msgs_len <= MAX_MSGS_TOTAL_RECOVERY_FAIL {
220+
if threshold_msgs_len
221+
<= parse_env_var::<usize>(
222+
TOTAL_RECOVERY_FAIL_MAX_MSGS_ENV,
223+
TOTAL_RECOVERY_FAIL_MAX_MSGS_DEFAULT,
224+
)
225+
{
219226
total_error_count += threshold_msgs_len;
220227
continue;
221228
}

0 commit comments

Comments
 (0)