From 0e0ed7bd73fe87c59b14e296fe53f4adfed8555a Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 3 Sep 2025 12:07:13 -0300 Subject: [PATCH 1/4] Prevent listener to drop if stream cannot retrieve element --- concurrency/src/tasks/stream.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index 4c4e844..e359052 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -35,8 +35,7 @@ where } }, Some(Err(e)) => { - tracing::trace!("Received Error in msg {e:?}"); - break; + tracing::trace!("Error in stream: {e:?}"); } None => { tracing::trace!("Stream finished"); From 2df4d0d12cd002ace0ada319b03608d7246c993b Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 3 Sep 2025 12:25:46 -0300 Subject: [PATCH 2/4] Added comments and test --- concurrency/src/tasks/stream.rs | 6 ++++- concurrency/src/tasks/stream_tests.rs | 32 ++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index e359052..03e2a21 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -27,6 +27,7 @@ where Box::pin(async { loop { match stream.next().await { + // Stream has a new valid Item Some(Ok(i)) => match handle.cast(message_builder(i)).await { Ok(_) => tracing::trace!("Message sent successfully"), Err(e) => { @@ -34,8 +35,11 @@ where break; } }, + // Stream has new data, but failed to extract the Item, + // probably due to decoding problems. Some(Err(e)) => { - tracing::trace!("Error in stream: {e:?}"); + // log the error but keep listening for more valid Items + tracing::error!("Error processing stream: {e:?}"); } None => { tracing::trace!("Stream finished"); diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index ecb2f36..de2628d 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -1,4 +1,7 @@ -use std::time::Duration; +use std::{ + io::{Error, ErrorKind}, + time::Duration, +}; use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream}; @@ -189,3 +192,30 @@ pub fn test_stream_cancellation() { assert!(Summatory::get_value(&mut summatory_handle).await.is_err()); }) } + +#[test] +pub fn test_stream_skipping_decoding_error() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut summatory_handle = Summatory::new(0).start(); + let stream = tokio_stream::iter( + vec![ + Ok::(1u8), + Ok(2), + Ok(3), + Err(Error::new(ErrorKind::Other, "Oh no")), + Ok(4), + Ok(5), + ] + .into_iter(), + ); + + spawn_listener(summatory_handle.clone(), message_builder, stream); + + // Wait for 1 second so the whole stream is processed + rt::sleep(Duration::from_secs(1)).await; + + let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); + assert_eq!(val, 15); + }) +} From f35f075da0080ac049dbd7cc0434b48a4a8da34f Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 3 Sep 2025 12:51:13 -0300 Subject: [PATCH 3/4] Making clippy happy --- concurrency/src/tasks/stream.rs | 2 +- concurrency/src/tasks/stream_tests.rs | 28 ++++++++++----------------- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index 03e2a21..26a8da3 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -38,7 +38,7 @@ where // Stream has new data, but failed to extract the Item, // probably due to decoding problems. Some(Err(e)) => { - // log the error but keep listening for more valid Items + // log the error but keep listener alive for more valid Items tracing::error!("Error processing stream: {e:?}"); } None => { diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index de2628d..9f0e653 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -1,13 +1,8 @@ -use std::{ - io::{Error, ErrorKind}, - time::Duration, -}; - -use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream}; - use crate::tasks::{ send_after, stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle, }; +use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream}; +use std::{io::Error, time::Duration}; type SummatoryHandle = GenServerHandle; @@ -198,17 +193,14 @@ pub fn test_stream_skipping_decoding_error() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut summatory_handle = Summatory::new(0).start(); - let stream = tokio_stream::iter( - vec![ - Ok::(1u8), - Ok(2), - Ok(3), - Err(Error::new(ErrorKind::Other, "Oh no")), - Ok(4), - Ok(5), - ] - .into_iter(), - ); + let stream = tokio_stream::iter(vec![ + Ok::(1u8), + Ok(2), + Ok(3), + Err(Error::other("oh no!")), + Ok(4), + Ok(5), + ]); spawn_listener(summatory_handle.clone(), message_builder, stream); From c8481dc70ac31b188eebc4c22ef3e15501fca641 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 3 Sep 2025 12:56:41 -0300 Subject: [PATCH 4/4] Removing unnecessary explicit type specification --- concurrency/src/tasks/stream_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index 9f0e653..14d89d7 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -194,7 +194,7 @@ pub fn test_stream_skipping_decoding_error() { runtime.block_on(async move { let mut summatory_handle = Summatory::new(0).start(); let stream = tokio_stream::iter(vec![ - Ok::(1u8), + Ok(1), Ok(2), Ok(3), Err(Error::other("oh no!")),