Skip to content

Commit ec3fbca

Browse files
authored
Merge pull request #263 from brave/revert-262-graceful-pending-decode-failure
Revert "Gracefully handle pending message decode failures"
2 parents fbfd4a2 + 40017d2 commit ec3fbca

File tree

1 file changed

+14
-27
lines changed

1 file changed

+14
-27
lines changed

src/aggregator/processing.rs

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -84,24 +84,17 @@ pub async fn process_expired_epochs(
8484
fn drain_chunk_messages_for_threshold(
8585
chunk: &mut MessageChunk,
8686
threshold: usize,
87-
total_error_count: &mut usize,
88-
) -> Vec<NestedMessage> {
87+
) -> Result<Vec<NestedMessage>, AggregatorError> {
8988
let mut msgs = Vec::new();
9089
if let Some(new_msgs) = chunk.new_msgs.get_mut(&threshold) {
9190
msgs.append(new_msgs);
9291
}
9392
if let Some(pending_msgs) = chunk.pending_msgs.get_mut(&threshold) {
9493
for pending_msg in pending_msgs.drain(..) {
95-
match pending_msg.try_into() {
96-
Ok(msg) => msgs.push(msg),
97-
Err(e) => {
98-
debug!("failed to parse pending message, will omit; error = {e}");
99-
*total_error_count += 1;
100-
}
101-
}
94+
msgs.push(pending_msg.try_into()?);
10295
}
10396
}
104-
msgs
97+
Ok(msgs)
10598
}
10699

107100
/// Returns None if key recovery process failed, which indicates
@@ -113,7 +106,6 @@ fn get_recovery_key(
113106
chunk: &mut MessageChunk,
114107
recovery_threshold: Option<usize>,
115108
existing_rec_msg: Option<&&mut RecoveredMessage>,
116-
total_error_count: &mut usize,
117109
) -> Result<Option<(Vec<u8>, Option<Vec<NestedMessage>>)>, AggregatorError> {
118110
let mut key_recovery_msgs: Option<Vec<_>> = None;
119111

@@ -130,7 +122,7 @@ fn get_recovery_key(
130122
.unwrap_or_default();
131123

132124
// drain messages required for recovery into the vec
133-
let mut msgs = drain_chunk_messages_for_threshold(chunk, threshold, total_error_count);
125+
let mut msgs = drain_chunk_messages_for_threshold(chunk, threshold)?;
134126

135127
let key = match recover_key(&msgs, epoch, threshold) {
136128
Err(e) => {
@@ -178,20 +170,15 @@ fn process_one_layer(
178170

179171
let has_pending_msgs = chunk.pending_msgs.values().any(|v| !v.is_empty());
180172

181-
let (key, mut key_recovery_msgs) = match get_recovery_key(
182-
*epoch,
183-
chunk,
184-
recovery_threshold,
185-
existing_rec_msg.as_ref(),
186-
&mut total_error_count,
187-
)? {
188-
Some(res) => res,
189-
None => {
190-
// key recovery failed. stop processing for the current message chunk/tag,
191-
// save messages in db for later attempt
192-
continue;
193-
}
194-
};
173+
let (key, mut key_recovery_msgs) =
174+
match get_recovery_key(*epoch, chunk, recovery_threshold, existing_rec_msg.as_ref())? {
175+
Some(res) => res,
176+
None => {
177+
// key recovery failed. stop processing for the current message chunk/tag,
178+
// save messages in db for later attempt
179+
continue;
180+
}
181+
};
195182

196183
let mut msgs_len = 0i64;
197184
let mut metric_name: Option<String> = None;
@@ -209,7 +196,7 @@ fn process_one_layer(
209196
// recovery step, so use this existing vec
210197
key_recovery_msgs.take().unwrap()
211198
} else {
212-
drain_chunk_messages_for_threshold(chunk, threshold, &mut total_error_count)
199+
drain_chunk_messages_for_threshold(chunk, threshold)?
213200
};
214201

215202
if msgs.is_empty() {

0 commit comments

Comments
 (0)