diff --git a/examples/07-fullstack/streaming.rs b/examples/07-fullstack/streaming.rs index e11d04f12d..f9b3072dc0 100644 --- a/examples/07-fullstack/streaming.rs +++ b/examples/07-fullstack/streaming.rs @@ -21,6 +21,7 @@ use dioxus::{ fullstack::{JsonEncoding, Streaming, TextStream}, prelude::*, }; +use futures::{StreamExt as _, TryStreamExt}; fn main() { dioxus::launch(app) @@ -29,6 +30,8 @@ fn main() { fn app() -> Element { let mut text_responses = use_signal(String::new); let mut json_responses = use_signal(Vec::new); + let mut echo_responses = use_signal(Vec::new); + let mut transform_responses = use_signal(Vec::new); let mut start_text_stream = use_action(move || async move { text_responses.clear(); @@ -53,6 +56,64 @@ fn app() -> Element { dioxus::Ok(()) }); + let mut continue_echo_stream = use_signal_sync(|| false); + let mut start_echo_stream = use_action(move || async move { + continue_echo_stream.set(true); + echo_responses.clear(); + let stream = echo_stream(Streaming::new(futures::stream::unfold( + 0, + move |index| async move { + if !continue_echo_stream() { + return None; + } + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + let dog = Dog { + name: format!("Dog {}", index), + age: (index % 10) as u8, + }; + Some((dog, index + 1)) + }, + ))) + .await?; + stream + .into_inner() + .try_for_each(move |dog| async move { + echo_responses.push(dog); + Ok(()) + }) + .await?; + dioxus::Ok(()) + }); + + let mut continue_transform_stream = use_signal_sync(|| false); + let mut start_transform_stream = use_action(move || async move { + continue_transform_stream.set(true); + transform_responses.clear(); + let stream = transform_stream(Streaming::new(futures::stream::unfold( + 0, + move |index| async move { + if !continue_transform_stream() { + return None; + } + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + let dog = Dog { + name: format!("Dog {}", index), + age: (index % 10) as u8, + }; + Some((dog, index + 1)) + }, + ))) + .await?; + stream + .into_inner() + .try_for_each(move |text| async move { + transform_responses.push(text); + Ok(()) + }) + .await?; + dioxus::Ok(()) + }); + rsx! { div { button { onclick: move |_| start_text_stream.call(), "Start text stream" } @@ -66,6 +127,20 @@ fn app() -> Element { pre { "{dog:?}" } } } + div { + button { onclick: move |_| start_echo_stream.call(), "Start echo stream" } + button { onclick: move |_| continue_echo_stream.set(false), "Stop echo stream" } + for dog in echo_responses.read().iter() { + pre { "{dog:?}" } + } + } + div { + button { onclick: move |_| start_transform_stream.call(), "Start transform stream" } + button { onclick: move |_| continue_transform_stream.set(false), "Stop transform stream" } + for text in transform_responses.read().iter() { + pre { "{text}" } + } + } } } @@ -145,3 +220,20 @@ async fn byte_stream() -> Result> { Ok(Streaming::new(rx)) } + +/// An example of echoing the stream back to the client. +#[post("/api/echo_stream")] +async fn echo_stream(stream: Streaming) -> Result> { + Ok(stream) +} + +/// An example of transforming the stream on the server. +#[post("/api/transform_stream")] +async fn transform_stream(stream: Streaming) -> Result { + Ok(Streaming::new(stream.into_inner().filter_map( + |dog| async { + dog.ok() + .map(|dog| format!("name: {}, age: {}", dog.name, dog.age)) + }, + ))) +} diff --git a/packages/fullstack/src/payloads/stream.rs b/packages/fullstack/src/payloads/stream.rs index 7ab59ffeb6..fc6467cfd4 100644 --- a/packages/fullstack/src/payloads/stream.rs +++ b/packages/fullstack/src/payloads/stream.rs @@ -98,8 +98,7 @@ pub type ChunkedTextStream = Streaming; /// /// Also note that not all browsers support streaming bodies to servers. pub struct Streaming { - output_stream: Pin> + Send>>, - input_stream: Pin> + Send>>, + stream: Pin> + Send>>, encoding: PhantomData, } @@ -123,9 +122,8 @@ impl Streaming { pub fn new(value: impl Stream + Send + 'static) -> Self { // Box and pin the incoming stream and store as a trait object Self { - input_stream: Box::pin(value.map(|item| Ok(item))) + stream: Box::pin(value.map(|item| Ok(item))) as Pin> + Send>>, - output_stream: Box::pin(futures::stream::empty()) as _, encoding: PhantomData, } } @@ -148,12 +146,12 @@ impl Streaming { /// Returns the next item in the stream, or `None` if the stream has ended. pub async fn next(&mut self) -> Option> { - self.output_stream.as_mut().next().await + self.stream.as_mut().next().await } /// Consumes the wrapper, returning the inner stream. pub fn into_inner(self) -> impl Stream> + Send { - self.input_stream + self.stream } /// Creates a streaming payload from an existing stream of bytes. @@ -161,8 +159,7 @@ impl Streaming { /// This uses the internal framing mechanism to decode the stream into items of type `T`. fn from_bytes(stream: impl Stream> + Send + 'static) -> Self { Self { - input_stream: Box::pin(stream), - output_stream: Box::pin(futures::stream::empty()) as _, + stream: Box::pin(stream), encoding: PhantomData, } } @@ -184,8 +181,7 @@ where { fn from(value: S) -> Self { Self { - input_stream: Box::pin(value.map(|data| data.map_err(|_| StreamingError::Failed))), - output_stream: Box::pin(futures::stream::empty()) as _, + stream: Box::pin(value.map(|data| data.map_err(|_| StreamingError::Failed))), encoding: PhantomData, } } @@ -207,7 +203,7 @@ impl IntoResponse for Streaming { fn into_response(self) -> axum_core::response::Response { axum::response::Response::builder() .header("Content-Type", "text/plain; charset=utf-8") - .body(axum::body::Body::from_stream(self.input_stream)) + .body(axum::body::Body::from_stream(self.stream)) .unwrap() } } @@ -216,14 +212,14 @@ impl IntoResponse for Streaming { fn into_response(self) -> axum_core::response::Response { axum::response::Response::builder() .header("Content-Type", "application/octet-stream") - .body(axum::body::Body::from_stream(self.input_stream)) + .body(axum::body::Body::from_stream(self.stream)) .unwrap() } } impl IntoResponse for Streaming { fn into_response(self) -> axum_core::response::Response { - let res = self.input_stream.map(|r| match r { + let res = self.stream.map(|r| match r { Ok(res) => match encode_stream_frame::(res) { Some(bytes) => Ok(bytes), None => Err(StreamingError::Failed), @@ -250,8 +246,7 @@ impl FromResponse for Streaming { })); Ok(Self { - output_stream: client_stream, - input_stream: Box::pin(futures::stream::empty()), + stream: client_stream, encoding: PhantomData, }) }) @@ -269,8 +264,7 @@ impl FromResponse for Streaming { ))); Ok(Self { - output_stream: client_stream, - input_stream: Box::pin(futures::stream::empty()), + stream: client_stream, encoding: PhantomData, }) } @@ -293,8 +287,7 @@ impl FromResponse ))); Ok(Self { - output_stream: client_stream, - input_stream: Box::pin(futures::stream::empty()), + stream: client_stream, encoding: PhantomData, }) }) @@ -323,8 +316,7 @@ impl FromRequest for Streaming { let stream = body.into_data_stream(); Ok(Self { - input_stream: Box::pin(futures::stream::empty()), - output_stream: Box::pin(stream.map(|byte| match byte { + stream: Box::pin(stream.map(|byte| match byte { Ok(bytes) => match String::from_utf8(bytes.to_vec()) { Ok(string) => Ok(string), Err(_) => Err(StreamingError::Decoding), @@ -359,8 +351,7 @@ impl FromRequest for ByteStream { let stream = body.into_data_stream(); Ok(Self { - input_stream: Box::pin(futures::stream::empty()), - output_stream: Box::pin(stream.map(|byte| match byte { + stream: Box::pin(stream.map(|byte| match byte { Ok(bytes) => Ok(bytes), Err(_) => Err(StreamingError::Failed), })), @@ -394,8 +385,7 @@ impl FromReque let stream = body.into_data_stream(); Ok(Self { - input_stream: Box::pin(futures::stream::empty()), - output_stream: Box::pin(stream.map(|byte| match byte { + stream: Box::pin(stream.map(|byte| match byte { Ok(bytes) => match decode_stream_frame::(bytes) { Some(res) => Ok(res), None => Err(StreamingError::Decoding), @@ -416,7 +406,7 @@ impl IntoRequest for Streaming { async move { builder .header("Content-Type", "text/plain; charset=utf-8")? - .send_body_stream(self.input_stream.map(|e| e.map(Bytes::from))) + .send_body_stream(self.stream.map(|e| e.map(Bytes::from))) .await } } @@ -430,7 +420,7 @@ impl IntoRequest for ByteStream { async move { builder .header(ContentType::name(), "application/octet-stream")? - .send_body_stream(self.input_stream) + .send_body_stream(self.stream) .await } } @@ -446,7 +436,7 @@ impl IntoRequest async move { builder .header("Content-Type", E::stream_content_type())? - .send_body_stream(self.input_stream.map(|r| { + .send_body_stream(self.stream.map(|r| { r.and_then(|item| { encode_stream_frame::(item).ok_or(StreamingError::Failed) })