Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions concurrency/src/tasks/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ where
Box::pin(async {
loop {
match stream.next().await {
// Stream has a new valid Item
Some(Ok(i)) => match handle.cast(message_builder(i)).await {
Ok(_) => tracing::trace!("Message sent successfully"),
Err(e) => {
tracing::error!("Failed to send message: {e:?}");
break;
}
},
// Stream has new data, but failed to extract the Item,
// probably due to decoding problems.
Some(Err(e)) => {
tracing::trace!("Received Error in msg {e:?}");
break;
// log the error but keep listener alive for more valid Items
tracing::error!("Error processing stream: {e:?}");
}
None => {
tracing::trace!("Stream finished");
Expand Down
30 changes: 26 additions & 4 deletions concurrency/src/tasks/stream_tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::time::Duration;

use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream};

use crate::tasks::{
send_after, stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle,
};
use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream};
use std::{io::Error, time::Duration};

type SummatoryHandle = GenServerHandle<Summatory>;

Expand Down Expand Up @@ -189,3 +187,27 @@ pub fn test_stream_cancellation() {
assert!(Summatory::get_value(&mut summatory_handle).await.is_err());
})
}

#[test]
pub fn test_stream_skipping_decoding_error() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut summatory_handle = Summatory::new(0).start();
let stream = tokio_stream::iter(vec![
Ok(1),
Ok(2),
Ok(3),
Err(Error::other("oh no!")),
Ok(4),
Ok(5),
]);

spawn_listener(summatory_handle.clone(), message_builder, stream);

// Wait for 1 second so the whole stream is processed
rt::sleep(Duration::from_secs(1)).await;

let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
assert_eq!(val, 15);
})
}