@@ -310,12 +310,34 @@ mod tests {
310310 // consuming messages as normal
311311 for _ in 0 ..10 {
312312 router. submit ( make_message ( Utc :: now ( ) ) ) . unwrap ( ) ;
313+ _ = router. poll ( ) ;
313314 }
314315 assert_eq ! ( router. state, State :: Forwarding ) ;
315316 // now theres a stale message, consumer should crash
316317 _ = router. submit ( make_message ( Utc :: now ( ) - TimeDelta :: seconds ( 20 ) ) ) ;
317318 }
318319
320+ fn submit_with_retry (
321+ router : & mut BLQRouter < MockStrategy , MockStrategy > ,
322+ message : Message < KafkaPayload > ,
323+ max_retries : usize ,
324+ ) -> Result < ( ) , SubmitError < KafkaPayload > > {
325+ let mut msg = message;
326+ for _ in 0 ..max_retries {
327+ match router. submit ( msg) {
328+ Ok ( ( ) ) => return Ok ( ( ) ) ,
329+ Err ( SubmitError :: MessageRejected ( rejected) ) => {
330+ _ = router. poll ( ) ;
331+ msg = rejected. message ;
332+ }
333+ Err ( e) => return Err ( e) ,
334+ }
335+ }
336+ Err ( SubmitError :: MessageRejected (
337+ sentry_arroyo:: processing:: strategies:: MessageRejected { message : msg } ,
338+ ) )
339+ }
340+
319341 #[ test]
320342 fn test_stale_to_fresh ( ) {
321343 /*
@@ -334,12 +356,14 @@ mod tests {
334356 router
335357 . submit ( make_message ( Utc :: now ( ) - TimeDelta :: minutes ( 1 ) ) )
336358 . unwrap ( ) ;
359+ _ = router. poll ( ) ;
337360 }
338361 assert_eq ! ( router. state, State :: RoutingStale ) ;
339362 assert ! ( !router. producer. join_called) ;
340363 // now we are back to fresh messages
341364 for _ in 0 ..5 {
342- router. submit ( make_message ( Utc :: now ( ) ) ) . unwrap ( ) ;
365+ submit_with_retry ( & mut router, make_message ( Utc :: now ( ) ) , 3 ) . unwrap ( ) ;
366+ _ = router. poll ( ) ;
343367 }
344368 assert_eq ! ( router. state, State :: Forwarding ) ;
345369 assert ! ( router. producer. join_called) ;
0 commit comments