Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions examples/07-fullstack/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use dioxus::{
fullstack::{JsonEncoding, Streaming, TextStream},
prelude::*,
};
use futures::{StreamExt as _, TryStreamExt};

fn main() {
dioxus::launch(app)
Expand All @@ -29,6 +30,8 @@ fn main() {
fn app() -> Element {
let mut text_responses = use_signal(String::new);
let mut json_responses = use_signal(Vec::new);
let mut echo_responses = use_signal(Vec::new);
let mut transform_responses = use_signal(Vec::new);

let mut start_text_stream = use_action(move || async move {
text_responses.clear();
Expand All @@ -53,6 +56,64 @@ fn app() -> Element {
dioxus::Ok(())
});

let mut continue_echo_stream = use_signal_sync(|| false);
let mut start_echo_stream = use_action(move || async move {
continue_echo_stream.set(true);
echo_responses.clear();
let stream = echo_stream(Streaming::new(futures::stream::unfold(
0,
move |index| async move {
if !continue_echo_stream() {
return None;
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let dog = Dog {
name: format!("Dog {}", index),
age: (index % 10) as u8,
};
Some((dog, index + 1))
},
)))
.await?;
stream
.into_inner()
.try_for_each(move |dog| async move {
echo_responses.push(dog);
Ok(())
})
.await?;
dioxus::Ok(())
});

let mut continue_transform_stream = use_signal_sync(|| false);
let mut start_transform_stream = use_action(move || async move {
continue_transform_stream.set(true);
transform_responses.clear();
let stream = transform_stream(Streaming::new(futures::stream::unfold(
0,
move |index| async move {
if !continue_transform_stream() {
return None;
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let dog = Dog {
name: format!("Dog {}", index),
age: (index % 10) as u8,
};
Some((dog, index + 1))
},
)))
.await?;
stream
.into_inner()
.try_for_each(move |text| async move {
transform_responses.push(text);
Ok(())
})
.await?;
dioxus::Ok(())
});

rsx! {
div {
button { onclick: move |_| start_text_stream.call(), "Start text stream" }
Expand All @@ -66,6 +127,20 @@ fn app() -> Element {
pre { "{dog:?}" }
}
}
div {
button { onclick: move |_| start_echo_stream.call(), "Start echo stream" }
button { onclick: move |_| continue_echo_stream.set(false), "Stop echo stream" }
for dog in echo_responses.read().iter() {
pre { "{dog:?}" }
}
}
div {
button { onclick: move |_| start_transform_stream.call(), "Start transform stream" }
button { onclick: move |_| continue_transform_stream.set(false), "Stop transform stream" }
for text in transform_responses.read().iter() {
pre { "{text}" }
}
}
}
}

Expand Down Expand Up @@ -145,3 +220,20 @@ async fn byte_stream() -> Result<Streaming<Bytes>> {

Ok(Streaming::new(rx))
}

/// An example of echoing the stream back to the client.
#[post("/api/echo_stream")]
async fn echo_stream(stream: Streaming<Dog, JsonEncoding>) -> Result<Streaming<Dog, JsonEncoding>> {
Ok(stream)
}

/// An example of transforming the stream on the server.
#[post("/api/transform_stream")]
async fn transform_stream(stream: Streaming<Dog, JsonEncoding>) -> Result<TextStream> {
Ok(Streaming::new(stream.into_inner().filter_map(
|dog| async {
dog.ok()
.map(|dog| format!("name: {}, age: {}", dog.name, dog.age))
},
)))
}
46 changes: 18 additions & 28 deletions packages/fullstack/src/payloads/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ pub type ChunkedTextStream = Streaming<String, CborEncoding>;
///
/// Also note that not all browsers support streaming bodies to servers.
pub struct Streaming<T = String, E = ()> {
output_stream: Pin<Box<dyn Stream<Item = Result<T, StreamingError>> + Send>>,
input_stream: Pin<Box<dyn Stream<Item = Result<T, StreamingError>> + Send>>,
stream: Pin<Box<dyn Stream<Item = Result<T, StreamingError>> + Send>>,
encoding: PhantomData<E>,
}

Expand All @@ -123,9 +122,8 @@ impl<T: 'static + Send, E> Streaming<T, E> {
pub fn new(value: impl Stream<Item = T> + Send + 'static) -> Self {
// Box and pin the incoming stream and store as a trait object
Self {
input_stream: Box::pin(value.map(|item| Ok(item)))
stream: Box::pin(value.map(|item| Ok(item)))
as Pin<Box<dyn Stream<Item = Result<T, StreamingError>> + Send>>,
output_stream: Box::pin(futures::stream::empty()) as _,
encoding: PhantomData,
}
}
Expand All @@ -148,21 +146,20 @@ impl<T: 'static + Send, E> Streaming<T, E> {

/// Returns the next item in the stream, or `None` if the stream has ended.
pub async fn next(&mut self) -> Option<Result<T, StreamingError>> {
self.output_stream.as_mut().next().await
self.stream.as_mut().next().await
}

/// Consumes the wrapper, returning the inner stream.
pub fn into_inner(self) -> impl Stream<Item = Result<T, StreamingError>> + Send {
self.input_stream
self.stream
}

/// Creates a streaming payload from an existing stream of bytes.
///
/// This uses the internal framing mechanism to decode the stream into items of type `T`.
fn from_bytes(stream: impl Stream<Item = Result<T, StreamingError>> + Send + 'static) -> Self {
Self {
input_stream: Box::pin(stream),
output_stream: Box::pin(futures::stream::empty()) as _,
stream: Box::pin(stream),
encoding: PhantomData,
}
}
Expand All @@ -184,8 +181,7 @@ where
{
fn from(value: S) -> Self {
Self {
input_stream: Box::pin(value.map(|data| data.map_err(|_| StreamingError::Failed))),
output_stream: Box::pin(futures::stream::empty()) as _,
stream: Box::pin(value.map(|data| data.map_err(|_| StreamingError::Failed))),
encoding: PhantomData,
}
}
Expand All @@ -207,7 +203,7 @@ impl IntoResponse for Streaming<String> {
fn into_response(self) -> axum_core::response::Response {
axum::response::Response::builder()
.header("Content-Type", "text/plain; charset=utf-8")
.body(axum::body::Body::from_stream(self.input_stream))
.body(axum::body::Body::from_stream(self.stream))
.unwrap()
}
}
Expand All @@ -216,14 +212,14 @@ impl IntoResponse for Streaming<Bytes> {
fn into_response(self) -> axum_core::response::Response {
axum::response::Response::builder()
.header("Content-Type", "application/octet-stream")
.body(axum::body::Body::from_stream(self.input_stream))
.body(axum::body::Body::from_stream(self.stream))
.unwrap()
}
}

impl<T: DeserializeOwned + Serialize + 'static, E: Encoding> IntoResponse for Streaming<T, E> {
fn into_response(self) -> axum_core::response::Response {
let res = self.input_stream.map(|r| match r {
let res = self.stream.map(|r| match r {
Ok(res) => match encode_stream_frame::<T, E>(res) {
Some(bytes) => Ok(bytes),
None => Err(StreamingError::Failed),
Expand All @@ -250,8 +246,7 @@ impl FromResponse for Streaming<String> {
}));

Ok(Self {
output_stream: client_stream,
input_stream: Box::pin(futures::stream::empty()),
stream: client_stream,
encoding: PhantomData,
})
})
Expand All @@ -269,8 +264,7 @@ impl FromResponse for Streaming<Bytes> {
)));

Ok(Self {
output_stream: client_stream,
input_stream: Box::pin(futures::stream::empty()),
stream: client_stream,
encoding: PhantomData,
})
}
Expand All @@ -293,8 +287,7 @@ impl<T: DeserializeOwned + Serialize + 'static + Send, E: Encoding> FromResponse
)));

Ok(Self {
output_stream: client_stream,
input_stream: Box::pin(futures::stream::empty()),
stream: client_stream,
encoding: PhantomData,
})
})
Expand Down Expand Up @@ -323,8 +316,7 @@ impl<S> FromRequest<S> for Streaming<String> {
let stream = body.into_data_stream();

Ok(Self {
input_stream: Box::pin(futures::stream::empty()),
output_stream: Box::pin(stream.map(|byte| match byte {
stream: Box::pin(stream.map(|byte| match byte {
Ok(bytes) => match String::from_utf8(bytes.to_vec()) {
Ok(string) => Ok(string),
Err(_) => Err(StreamingError::Decoding),
Expand Down Expand Up @@ -359,8 +351,7 @@ impl<S> FromRequest<S> for ByteStream {
let stream = body.into_data_stream();

Ok(Self {
input_stream: Box::pin(futures::stream::empty()),
output_stream: Box::pin(stream.map(|byte| match byte {
stream: Box::pin(stream.map(|byte| match byte {
Ok(bytes) => Ok(bytes),
Err(_) => Err(StreamingError::Failed),
})),
Expand Down Expand Up @@ -394,8 +385,7 @@ impl<T: DeserializeOwned + Serialize + 'static + Send, E: Encoding, S> FromReque
let stream = body.into_data_stream();

Ok(Self {
input_stream: Box::pin(futures::stream::empty()),
output_stream: Box::pin(stream.map(|byte| match byte {
stream: Box::pin(stream.map(|byte| match byte {
Ok(bytes) => match decode_stream_frame::<T, E>(bytes) {
Some(res) => Ok(res),
None => Err(StreamingError::Decoding),
Expand All @@ -416,7 +406,7 @@ impl IntoRequest for Streaming<String> {
async move {
builder
.header("Content-Type", "text/plain; charset=utf-8")?
.send_body_stream(self.input_stream.map(|e| e.map(Bytes::from)))
.send_body_stream(self.stream.map(|e| e.map(Bytes::from)))
.await
}
}
Expand All @@ -430,7 +420,7 @@ impl IntoRequest for ByteStream {
async move {
builder
.header(ContentType::name(), "application/octet-stream")?
.send_body_stream(self.input_stream)
.send_body_stream(self.stream)
.await
}
}
Expand All @@ -446,7 +436,7 @@ impl<T: DeserializeOwned + Serialize + 'static + Send, E: Encoding> IntoRequest
async move {
builder
.header("Content-Type", E::stream_content_type())?
.send_body_stream(self.input_stream.map(|r| {
.send_body_stream(self.stream.map(|r| {
r.and_then(|item| {
encode_stream_frame::<T, E>(item).ok_or(StreamingError::Failed)
})
Expand Down