diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index 4c4e844..26a8da3 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -27,6 +27,7 @@ where Box::pin(async { loop { match stream.next().await { + // Stream has a new valid Item Some(Ok(i)) => match handle.cast(message_builder(i)).await { Ok(_) => tracing::trace!("Message sent successfully"), Err(e) => { @@ -34,9 +35,11 @@ where break; } }, + // Stream has new data, but failed to extract the Item, + // probably due to decoding problems. Some(Err(e)) => { - tracing::trace!("Received Error in msg {e:?}"); - break; + // log the error but keep listener alive for more valid Items + tracing::error!("Error processing stream: {e:?}"); } None => { tracing::trace!("Stream finished"); diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index ecb2f36..14d89d7 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -1,10 +1,8 @@ -use std::time::Duration; - -use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream}; - use crate::tasks::{ send_after, stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle, }; +use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream}; +use std::{io::Error, time::Duration}; type SummatoryHandle = GenServerHandle; @@ -189,3 +187,27 @@ pub fn test_stream_cancellation() { assert!(Summatory::get_value(&mut summatory_handle).await.is_err()); }) } + +#[test] +pub fn test_stream_skipping_decoding_error() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut summatory_handle = Summatory::new(0).start(); + let stream = tokio_stream::iter(vec![ + Ok(1), + Ok(2), + Ok(3), + Err(Error::other("oh no!")), + Ok(4), + Ok(5), + ]); + + spawn_listener(summatory_handle.clone(), message_builder, stream); + + // Wait for 1 second so the whole stream is processed + rt::sleep(Duration::from_secs(1)).await; + + let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); + assert_eq!(val, 15); + }) +}