@@ -84,17 +84,24 @@ pub async fn process_expired_epochs(
8484fn drain_chunk_messages_for_threshold (
8585 chunk : & mut MessageChunk ,
8686 threshold : usize ,
87- ) -> Result < Vec < NestedMessage > , AggregatorError > {
87+ total_error_count : & mut usize ,
88+ ) -> Vec < NestedMessage > {
8889 let mut msgs = Vec :: new ( ) ;
8990 if let Some ( new_msgs) = chunk. new_msgs . get_mut ( & threshold) {
9091 msgs. append ( new_msgs) ;
9192 }
9293 if let Some ( pending_msgs) = chunk. pending_msgs . get_mut ( & threshold) {
9394 for pending_msg in pending_msgs. drain ( ..) {
94- msgs. push ( pending_msg. try_into ( ) ?) ;
95+ match pending_msg. try_into ( ) {
96+ Ok ( msg) => msgs. push ( msg) ,
97+ Err ( e) => {
98+ debug ! ( "failed to parse pending message, will omit; error = {e}" ) ;
99+ * total_error_count += 1 ;
100+ }
101+ }
95102 }
96103 }
97- Ok ( msgs)
104+ msgs
98105}
99106
100107/// Returns None if key recovery process failed, which indicates
@@ -106,6 +113,7 @@ fn get_recovery_key(
106113 chunk : & mut MessageChunk ,
107114 recovery_threshold : Option < usize > ,
108115 existing_rec_msg : Option < & & mut RecoveredMessage > ,
116+ total_error_count : & mut usize ,
109117) -> Result < Option < ( Vec < u8 > , Option < Vec < NestedMessage > > ) > , AggregatorError > {
110118 let mut key_recovery_msgs: Option < Vec < _ > > = None ;
111119
@@ -122,7 +130,7 @@ fn get_recovery_key(
122130 . unwrap_or_default ( ) ;
123131
124132 // drain messages required for recovery into the vec
125- let mut msgs = drain_chunk_messages_for_threshold ( chunk, threshold) ? ;
133+ let mut msgs = drain_chunk_messages_for_threshold ( chunk, threshold, total_error_count ) ;
126134
127135 let key = match recover_key ( & msgs, epoch, threshold) {
128136 Err ( e) => {
@@ -170,15 +178,20 @@ fn process_one_layer(
170178
171179 let has_pending_msgs = chunk. pending_msgs . values ( ) . any ( |v| !v. is_empty ( ) ) ;
172180
173- let ( key, mut key_recovery_msgs) =
174- match get_recovery_key ( * epoch, chunk, recovery_threshold, existing_rec_msg. as_ref ( ) ) ? {
175- Some ( res) => res,
176- None => {
177- // key recovery failed. stop processing for the current message chunk/tag,
178- // save messages in db for later attempt
179- continue ;
180- }
181- } ;
181+ let ( key, mut key_recovery_msgs) = match get_recovery_key (
182+ * epoch,
183+ chunk,
184+ recovery_threshold,
185+ existing_rec_msg. as_ref ( ) ,
186+ & mut total_error_count,
187+ ) ? {
188+ Some ( res) => res,
189+ None => {
190+ // key recovery failed. stop processing for the current message chunk/tag,
191+ // save messages in db for later attempt
192+ continue ;
193+ }
194+ } ;
182195
183196 let mut msgs_len = 0i64 ;
184197 let mut metric_name: Option < String > = None ;
@@ -196,7 +209,7 @@ fn process_one_layer(
196209 // recovery step, so use this existing vec
197210 key_recovery_msgs. take ( ) . unwrap ( )
198211 } else {
199- drain_chunk_messages_for_threshold ( chunk, threshold) ?
212+ drain_chunk_messages_for_threshold ( chunk, threshold, & mut total_error_count )
200213 } ;
201214
202215 if msgs. is_empty ( ) {
0 commit comments