Skip to content

Commit 8a12d75

Browse files
authored
Add example of a streaming response (Azure#1862)
* Add example of a streaming response * Add model example * Resolve build / lint issues
1 parent d688a1a commit 8a12d75

File tree

4 files changed

+139
-11
lines changed

4 files changed

+139
-11
lines changed

sdk/typespec/typespec_client_core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,9 @@ reqwest_rustls = ["reqwest/rustls-tls"]
6161
tokio_sleep = ["tokio/time"]
6262
xml = ["dep:quick-xml"]
6363

64+
[[example]]
65+
name = "stream_response"
66+
required-features = ["derive"]
67+
6468
[package.metadata.docs.rs]
6569
all-features = true
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
use futures::StreamExt;
5+
6+
#[tokio::main]
7+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
8+
// Get a response from a service client.
9+
let response = client::get_binary_data_response()?;
10+
11+
// Normally you'd deserialize into a type or `collect()` the body,
12+
// but this better simulates fetching multiple chunks from a slow response.
13+
let mut body = response.into_body();
14+
while let Some(data) = body.next().await {
15+
// Assume bytes are a string in this example.
16+
let page = String::from_utf8(data?.into())?;
17+
println!("{page}");
18+
}
19+
20+
// You can also deserialize into a model from a slow response.
21+
let team = client::get_model_response()?.deserialize_body().await?;
22+
println!("{team:#?}");
23+
24+
Ok(())
25+
}
26+
27+
#[allow(dead_code)]
28+
mod client {
29+
use futures::Stream;
30+
use serde::Deserialize;
31+
use std::{cmp::min, task::Poll, time::Duration};
32+
use typespec_client_core::{
33+
http::{headers::Headers, Model, Response, StatusCode},
34+
Bytes,
35+
};
36+
37+
#[derive(Debug, Model, Deserialize)]
38+
pub struct Team {
39+
pub name: Option<String>,
40+
#[serde(default)]
41+
pub members: Vec<Person>,
42+
}
43+
44+
#[derive(Debug, Model, Deserialize)]
45+
pub struct Person {
46+
pub id: u32,
47+
pub name: Option<String>,
48+
}
49+
50+
pub fn get_binary_data_response() -> typespec_client_core::Result<Response<()>> {
51+
let bytes = Bytes::from_static(b"Hello, world!");
52+
let response = SlowResponse {
53+
bytes: bytes.repeat(5).into(),
54+
bytes_per_read: bytes.len(),
55+
bytes_read: 0,
56+
};
57+
58+
Ok(Response::new(
59+
StatusCode::Ok,
60+
Headers::new(),
61+
Box::pin(response),
62+
))
63+
}
64+
65+
pub fn get_model_response() -> typespec_client_core::Result<Response<Team>> {
66+
let bytes = br#"{
67+
"name": "Contoso Dev Team",
68+
"members": [
69+
{
70+
"id": 1234,
71+
"name": "Jan"
72+
},
73+
{
74+
"id": 5678,
75+
"name": "Bill"
76+
}
77+
]
78+
}"#;
79+
let response = SlowResponse {
80+
bytes: Bytes::from_static(bytes),
81+
bytes_per_read: 64,
82+
bytes_read: 0,
83+
};
84+
85+
Ok(Response::new(
86+
StatusCode::Ok,
87+
Headers::new(),
88+
Box::pin(response),
89+
))
90+
}
91+
92+
struct SlowResponse {
93+
bytes: Bytes,
94+
bytes_per_read: usize,
95+
bytes_read: usize,
96+
}
97+
98+
impl Stream for SlowResponse {
99+
type Item = typespec_client_core::Result<Bytes>;
100+
101+
fn poll_next(
102+
self: std::pin::Pin<&mut Self>,
103+
_cx: &mut std::task::Context<'_>,
104+
) -> Poll<Option<Self::Item>> {
105+
let self_mut = self.get_mut();
106+
if self_mut.bytes_read < self_mut.bytes.len() {
107+
eprintln!("getting partial response...");
108+
std::thread::sleep(Duration::from_millis(200));
109+
110+
let end = self_mut.bytes_read
111+
+ min(
112+
self_mut.bytes_per_read,
113+
self_mut.bytes.len() - self_mut.bytes_read,
114+
);
115+
let bytes = self_mut.bytes.slice(self_mut.bytes_read..end);
116+
self_mut.bytes_read += bytes.len();
117+
Poll::Ready(Some(Ok(bytes)))
118+
} else {
119+
eprintln!("done");
120+
Poll::Ready(None)
121+
}
122+
}
123+
}
124+
}

sdk/typespec/typespec_client_core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub mod stream;
1919
pub mod xml;
2020

2121
pub use crate::error::{Error, Result};
22+
pub use bytes::Bytes;
2223
pub use uuid::Uuid;
2324

2425
#[cfg(feature = "derive")]

sdk/typespec/typespec_client_core/src/stream/bytes_stream.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4-
use super::SeekableStream;
5-
use bytes::Bytes;
4+
use super::{Bytes, SeekableStream};
65
use futures::io::AsyncRead;
76
use futures::stream::Stream;
87
use std::pin::Pin;
@@ -111,14 +110,14 @@ mod tests {
111110
use futures::stream::StreamExt;
112111

113112
// Test BytesStream Stream
114-
#[test]
115-
fn test_bytes_stream() {
113+
#[tokio::test]
114+
async fn bytes_stream() {
116115
let bytes = Bytes::from("hello world");
117116
let mut stream = BytesStream::new(bytes.clone());
118117

119118
let mut buf = Vec::new();
120119
let mut bytes_read = 0;
121-
while let Some(Ok(bytes)) = futures::executor::block_on(stream.next()) {
120+
while let Some(Ok(bytes)) = stream.next().await {
122121
buf.extend_from_slice(&bytes);
123122
bytes_read += bytes.len();
124123
}
@@ -128,26 +127,26 @@ mod tests {
128127
}
129128

130129
// Test BytesStream AsyncRead, all bytes at once
131-
#[test]
132-
fn test_async_read_all_bytes_at_once() {
130+
#[tokio::test]
131+
async fn async_read_all_bytes_at_once() {
133132
let bytes = Bytes::from("hello world");
134133
let mut stream = BytesStream::new(bytes.clone());
135134

136135
let mut buf = [0; 11];
137-
let bytes_read = futures::executor::block_on(stream.read(&mut buf)).unwrap();
136+
let bytes_read = stream.read(&mut buf).await.unwrap();
138137
assert_eq!(bytes_read, 11);
139138
assert_eq!(&buf[..], &bytes);
140139
}
141140

142141
// Test BytesStream AsyncRead, one byte at a time
143-
#[test]
144-
fn test_async_read_one_byte_at_a_time() {
142+
#[tokio::test]
143+
async fn async_read_one_byte_at_a_time() {
145144
let bytes = Bytes::from("hello world");
146145
let mut stream = BytesStream::new(bytes.clone());
147146

148147
for i in 0..bytes.len() {
149148
let mut buf = [0; 1];
150-
let bytes_read = futures::executor::block_on(stream.read(&mut buf)).unwrap();
149+
let bytes_read = stream.read(&mut buf).await.unwrap();
151150
assert_eq!(bytes_read, 1);
152151
assert_eq!(buf[0], bytes[i]);
153152
}

0 commit comments

Comments
 (0)