Skip to content

Commit 1f5ed78

Browse files
authored
Add binary request example (Azure#1865)
* Add binary request example Also refactors `FileStream` from `azure_core` to `typespec_client_core`, plus some trait implementations on necessary request structures to make it all work. * Add FileStream into RequestContent * Resolve PR feedback * Resolve PR feedback
1 parent 107cffd commit 1f5ed78

File tree

9 files changed

+146
-21
lines changed

9 files changed

+146
-21
lines changed

sdk/core/azure_core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ hmac_openssl = ["dep:openssl"]
5050
test_e2e = []
5151
azurite_workaround = []
5252
xml = ["typespec_client_core/xml"]
53-
tokio_fs = ["tokio/fs", "tokio/sync", "tokio/io-util"]
53+
tokio_fs = ["typespec_client_core/tokio_fs"]
5454
tokio_sleep = ["typespec_client_core/tokio_sleep"]
5555

5656
[package.metadata.docs.rs]

sdk/core/azure_core/src/tokio/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
// Licensed under the MIT License.
33

44
#[cfg(feature = "tokio_fs")]
5-
pub mod fs;
5+
pub use typespec_client_core::fs::*;

sdk/typespec/typespec_client_core/Cargo.toml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,26 @@ tokio = { workspace = true, features = ["macros", "rt", "time"] }
4141
[dev-dependencies]
4242
once_cell.workspace = true
4343
tokio.workspace = true
44+
tracing.workspace = true
45+
tracing-subscriber.workspace = true
4446
typespec_derive.workspace = true
4547

4648
[features]
47-
default = [
48-
"http",
49-
"json",
50-
"reqwest",
51-
"reqwest_gzip",
52-
"reqwest_rustls",
53-
"tokio_sleep",
54-
]
49+
default = ["http", "json", "reqwest", "reqwest_gzip", "reqwest_rustls"]
5550
derive = ["dep:typespec_derive"]
5651
http = ["dep:http-types", "typespec/http"]
5752
json = ["typespec/json"]
5853
reqwest = ["dep:reqwest", "reqwest/default-tls"]
5954
reqwest_gzip = ["reqwest/gzip"]
6055
reqwest_rustls = ["reqwest/rustls-tls"]
56+
tokio_fs = ["tokio/fs", "tokio/sync", "tokio/io-util"]
6157
tokio_sleep = ["tokio/time"]
6258
xml = ["dep:quick-xml"]
6359

60+
[[example]]
61+
name = "binary_data_request"
62+
required-features = ["tokio_fs", "tokio_sleep"]
63+
6464
[[example]]
6565
name = "stream_response"
6666
required-features = ["derive"]
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
use tokio::fs;
5+
use tracing::level_filters::LevelFilter;
6+
use tracing_subscriber::fmt::format::FmtSpan;
7+
use typespec_client_core::{fs::FileStreamBuilder, http::RequestContent};
8+
9+
#[tokio::main]
10+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
11+
// Log traces to stdout.
12+
let _log = tracing_subscriber::fmt()
13+
.with_span_events(FmtSpan::ENTER | FmtSpan::EXIT)
14+
.with_max_level(LevelFilter::DEBUG)
15+
.init();
16+
17+
// Asynchronously read the whole file into memory.
18+
let body = RequestContent::from(fs::read(file!()).await?);
19+
client::put_binary_data(body).await?;
20+
21+
// Asynchronously stream the file with the service client request.
22+
#[cfg(not(target_arch = "wasm32"))]
23+
{
24+
let file = fs::File::open(file!()).await?;
25+
let file = FileStreamBuilder::new(file)
26+
// Simulate a slow, chunky request.
27+
.buffer_size(512usize)
28+
.build()
29+
.await?;
30+
client::put_binary_data(file.into()).await?;
31+
}
32+
33+
Ok(())
34+
}
35+
36+
mod client {
37+
use futures::StreamExt;
38+
use tracing::debug;
39+
use typespec_client_core::{
40+
http::{headers::Headers, Body, RequestContent, Response, StatusCode},
41+
stream::BytesStream,
42+
};
43+
44+
#[tracing::instrument(skip(body))]
45+
pub async fn put_binary_data(
46+
body: RequestContent<()>,
47+
) -> typespec_client_core::Result<Response<()>> {
48+
let body: RequestContent<()> = body.into();
49+
let body: Body = body.into();
50+
51+
let content = match body {
52+
Body::Bytes(ref bytes) => {
53+
debug!("received {} bytes", bytes.len());
54+
bytes.to_owned()
55+
}
56+
Body::SeekableStream(mut stream) => {
57+
debug!("received stream");
58+
let stream = stream.as_mut();
59+
60+
let mut bytes = Vec::new();
61+
while let Some(Ok(buf)) = stream.next().await {
62+
debug!("read {} bytes from stream", buf.len());
63+
bytes.extend(buf);
64+
}
65+
66+
bytes.into()
67+
}
68+
};
69+
70+
// Assume bytes are a string in this example.
71+
let content = String::from_utf8(content.into())?;
72+
println!("{content}");
73+
74+
Ok(Response::new(
75+
StatusCode::NoContent,
76+
Headers::new(),
77+
Box::pin(BytesStream::new_empty()),
78+
))
79+
}
80+
}

sdk/typespec/typespec_client_core/examples/stream_response.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,19 @@
22
// Licensed under the MIT License.
33

44
use futures::StreamExt;
5+
use tracing::level_filters::LevelFilter;
6+
use tracing_subscriber::fmt::format::FmtSpan;
57

68
#[tokio::main]
79
async fn main() -> Result<(), Box<dyn std::error::Error>> {
10+
// Log traces to stdout.
11+
let _log = tracing_subscriber::fmt()
12+
.with_span_events(FmtSpan::ENTER | FmtSpan::EXIT)
13+
.with_max_level(LevelFilter::DEBUG)
14+
.init();
15+
816
// Get a response from a service client.
9-
let response = client::get_binary_data_response()?;
17+
let response = client::get_binary_data()?;
1018

1119
// Normally you'd deserialize into a type or `collect()` the body,
1220
// but this better simulates fetching multiple chunks from a slow response.
@@ -18,7 +26,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1826
}
1927

2028
// You can also deserialize into a model from a slow response.
21-
let team = client::get_model_response()?.deserialize_body().await?;
29+
let team = client::get_model()?.deserialize_body().await?;
2230
println!("{team:#?}");
2331

2432
Ok(())
@@ -29,6 +37,7 @@ mod client {
2937
use futures::Stream;
3038
use serde::Deserialize;
3139
use std::{cmp::min, task::Poll, time::Duration};
40+
use tracing::debug;
3241
use typespec_client_core::{
3342
http::{headers::Headers, Model, Response, StatusCode},
3443
Bytes,
@@ -47,7 +56,8 @@ mod client {
4756
pub name: Option<String>,
4857
}
4958

50-
pub fn get_binary_data_response() -> typespec_client_core::Result<Response<()>> {
59+
#[tracing::instrument]
60+
pub fn get_binary_data() -> typespec_client_core::Result<Response<()>> {
5161
let bytes = Bytes::from_static(b"Hello, world!");
5262
let response = SlowResponse {
5363
bytes: bytes.repeat(5).into(),
@@ -62,7 +72,8 @@ mod client {
6272
))
6373
}
6474

65-
pub fn get_model_response() -> typespec_client_core::Result<Response<Team>> {
75+
#[tracing::instrument]
76+
pub fn get_model() -> typespec_client_core::Result<Response<Team>> {
6677
let bytes = br#"{
6778
"name": "Contoso Dev Team",
6879
"members": [
@@ -104,7 +115,9 @@ mod client {
104115
) -> Poll<Option<Self::Item>> {
105116
let self_mut = self.get_mut();
106117
if self_mut.bytes_read < self_mut.bytes.len() {
107-
eprintln!("getting partial response...");
118+
debug!("writing partial response...");
119+
120+
// Simulate a slow response.
108121
std::thread::sleep(Duration::from_millis(200));
109122

110123
let end = self_mut.bytes_read
@@ -116,7 +129,7 @@ mod client {
116129
self_mut.bytes_read += bytes.len();
117130
Poll::Ready(Some(Ok(bytes)))
118131
} else {
119-
eprintln!("done");
132+
debug!("done");
120133
Poll::Ready(None)
121134
}
122135
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
#[cfg(feature = "tokio_fs")]
5+
mod tokio;
6+
7+
#[cfg(feature = "tokio_fs")]
8+
pub use tokio::*;

sdk/core/azure_core/src/tokio/fs.rs renamed to sdk/typespec/typespec_client_core/src/fs/tokio.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
use crate::{
5+
http::{Body, RequestContent},
6+
setters,
7+
stream::{SeekableStream, DEFAULT_BUFFER_SIZE},
8+
};
49
use futures::{task::Poll, Future};
510
use std::{cmp::min, io::SeekFrom, pin::Pin, sync::Arc, task::Context};
611
use tokio::{
@@ -9,11 +14,6 @@ use tokio::{
914
sync::Mutex,
1015
};
1116
use tracing::debug;
12-
use typespec_client_core::{
13-
http::Body,
14-
setters,
15-
stream::{SeekableStream, DEFAULT_BUFFER_SIZE},
16-
};
1717

1818
#[derive(Debug)]
1919
pub struct FileStreamBuilder {
@@ -168,3 +168,17 @@ impl From<FileStream> for Body {
168168
Body::SeekableStream(Box::new(stream))
169169
}
170170
}
171+
172+
#[cfg(not(target_arch = "wasm32"))]
173+
impl<T> From<&FileStream> for RequestContent<T> {
174+
fn from(stream: &FileStream) -> Self {
175+
Body::from(stream).into()
176+
}
177+
}
178+
179+
#[cfg(not(target_arch = "wasm32"))]
180+
impl<T> From<FileStream> for RequestContent<T> {
181+
fn from(stream: FileStream) -> Self {
182+
Body::from(stream).into()
183+
}
184+
}

sdk/typespec/typespec_client_core/src/http/request/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,15 @@ impl<T> From<RequestContent<T>> for Body {
208208
}
209209
}
210210

211+
impl<T> From<Body> for RequestContent<T> {
212+
fn from(body: Body) -> Self {
213+
Self {
214+
body,
215+
phantom: PhantomData,
216+
}
217+
}
218+
}
219+
211220
impl<T> TryFrom<Bytes> for RequestContent<T> {
212221
type Error = crate::Error;
213222
fn try_from(body: Bytes) -> Result<Self, Self::Error> {

sdk/typespec/typespec_client_core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod macros;
88
pub mod base64;
99
pub mod date;
1010
pub mod error;
11+
pub mod fs;
1112
#[cfg(feature = "http")]
1213
pub mod http;
1314
#[cfg(feature = "json")]

0 commit comments

Comments
 (0)