|
1 | | -use crate::threads::{GenServer, GenServerHandle}; |
| 1 | +use std::thread::JoinHandle; |
2 | 2 |
|
3 | | -use futures::Stream; |
| 3 | +use crate::threads::{GenServer, GenServerHandle}; |
4 | 4 |
|
5 | 5 | /// Spawns a listener that listens to a stream and sends messages to a GenServer. |
6 | 6 | /// |
7 | 7 | /// Items sent through the stream are required to be wrapped in a Result type. |
8 | | -pub fn spawn_listener<T, F, S, I, E>(_handle: GenServerHandle<T>, _message_builder: F, _stream: S) |
| 8 | +pub fn spawn_listener<T, I>(mut handle: GenServerHandle<T>, stream: I) -> JoinHandle<()> |
9 | 9 | where |
10 | | - T: GenServer + 'static, |
11 | | - F: Fn(I) -> T::CastMsg + Send + 'static, |
12 | | - I: Send + 'static, |
13 | | - E: std::fmt::Debug + Send + 'static, |
14 | | - S: Unpin + Send + Stream<Item = Result<I, E>> + 'static, |
| 10 | + T: GenServer, |
| 11 | + I: IntoIterator<Item = T::CastMsg>, |
| 12 | + <I as IntoIterator>::IntoIter: std::marker::Send + 'static, |
15 | 13 | { |
16 | | - unimplemented!("Unsupported function in threads mode") |
| 14 | + let mut iter = stream.into_iter(); |
| 15 | + let mut cancelation_token = handle.cancellation_token(); |
| 16 | + let join_handle = spawned_rt::threads::spawn(move || loop { |
| 17 | + match iter.next() { |
| 18 | + Some(msg) => match handle.cast(msg) { |
| 19 | + Ok(_) => tracing::trace!("Message sent successfully"), |
| 20 | + Err(e) => { |
| 21 | + tracing::error!("Failed to send message: {e:?}"); |
| 22 | + break; |
| 23 | + } |
| 24 | + }, |
| 25 | + None => { |
| 26 | + tracing::trace!("Stream finished"); |
| 27 | + break; |
| 28 | + } |
| 29 | + } |
| 30 | + if cancelation_token.is_cancelled() { |
| 31 | + tracing::trace!("GenServer stopped"); |
| 32 | + break; |
| 33 | + } |
| 34 | + }); |
| 35 | + join_handle |
17 | 36 | } |
0 commit comments