Skip to content

Commit da6d6b5

Browse files
authored
Fix error on spawn_listener (#47)
* Prevent listener to drop if stream cannot retrieve element * Added comments and test * Making clippy happy * Removing unnecessary explicit type specification
1 parent aa47f82 commit da6d6b5

File tree

2 files changed

+31
-6
lines changed

2 files changed

+31
-6
lines changed

concurrency/src/tasks/stream.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,19 @@ where
2727
Box::pin(async {
2828
loop {
2929
match stream.next().await {
30+
// Stream has a new valid Item
3031
Some(Ok(i)) => match handle.cast(message_builder(i)).await {
3132
Ok(_) => tracing::trace!("Message sent successfully"),
3233
Err(e) => {
3334
tracing::error!("Failed to send message: {e:?}");
3435
break;
3536
}
3637
},
38+
// Stream has new data, but failed to extract the Item,
39+
// probably due to decoding problems.
3740
Some(Err(e)) => {
38-
tracing::trace!("Received Error in msg {e:?}");
39-
break;
41+
// log the error but keep listener alive for more valid Items
42+
tracing::error!("Error processing stream: {e:?}");
4043
}
4144
None => {
4245
tracing::trace!("Stream finished");

concurrency/src/tasks/stream_tests.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
use std::time::Duration;
2-
3-
use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream};
4-
51
use crate::tasks::{
62
send_after, stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle,
73
};
4+
use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream};
5+
use std::{io::Error, time::Duration};
86

97
type SummatoryHandle = GenServerHandle<Summatory>;
108

@@ -189,3 +187,27 @@ pub fn test_stream_cancellation() {
189187
assert!(Summatory::get_value(&mut summatory_handle).await.is_err());
190188
})
191189
}
190+
191+
#[test]
192+
pub fn test_stream_skipping_decoding_error() {
193+
let runtime = rt::Runtime::new().unwrap();
194+
runtime.block_on(async move {
195+
let mut summatory_handle = Summatory::new(0).start();
196+
let stream = tokio_stream::iter(vec![
197+
Ok(1),
198+
Ok(2),
199+
Ok(3),
200+
Err(Error::other("oh no!")),
201+
Ok(4),
202+
Ok(5),
203+
]);
204+
205+
spawn_listener(summatory_handle.clone(), message_builder, stream);
206+
207+
// Wait for 1 second so the whole stream is processed
208+
rt::sleep(Duration::from_secs(1)).await;
209+
210+
let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
211+
assert_eq!(val, 15);
212+
})
213+
}

0 commit comments

Comments
 (0)