Skip to content

Commit 1963e93

Browse files
authored
Fetch the entire model payload in the pipeline (#3071)
Resolves #2073 by splitting methods on the `Pipeline`: * `send()` returns a `RawResponse` which is fully buffered in the `TransportPolicy`. * `stream()` returns a `BufResponse` which only the initial send goes through the pipeline e.g., on HTTP 429. The `stream()` method is basically what `send()` was before, but because we expect nearly all pipeline requests to fetch the entire response, we retain `send()` as the primary function.
1 parent 204dd4e commit 1963e93

File tree

91 files changed

+1188
-623
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+1188
-623
lines changed

sdk/core/azure_core/CHANGELOG.md

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,38 @@
55
### Features Added
66

77
- Added `Error::with_error_fn()`.
8+
- Added `AsyncResponse<T>` for responses that may stream the body outside the HTTP pipeline. This replaces `Response<T, F>` requiring an async read of the body that occurred outside the HTTP pipeline.
9+
- Added `http::response::BufResponseBody`, which also implements `Stream`.
10+
- Added a `Pipeline::stream()` to return a `Result<BufResponse>`.
11+
- Added `RawResponse::deconstruct()`.
12+
- Added `ResponseBody::collect_string()`.
13+
- Added `ResponseBody::from_bytes()`.
14+
- Implemented `AsRef<[u8]>` and `Deref<Target = [u8]>` for `ResponseBody`.
815

916
### Breaking Changes
1017

18+
1119
- Changed `ClientOptions::retry` from `Option<RetryOptions>` to `RetryOptions`.
12-
- Changed `RawResponse::json()` from `async` to synchronous function. The body was already buffered.
13-
- Changed `RawResponse::xml()` from `async` to synchronous function. The body was already buffered.
20+
- Changed `DeserializeWith::deserialize_with()` to be sync.
21+
- Changed `Pipeline::send()` to return a `Result<RawResponse>`.
22+
- Changed `RawResponse::body()` to return a `&ResponseBody` instead of `&Bytes`. `ResponseBody` wraps `&Bytes`, and implements `AsRef<[u8]>` and `Deref<Target = [u8]>`.
23+
- Changed `RawResponse::into_body()` to return a `ResponseBody` instead of `Bytes`. `ResponseBody` wraps `&Bytes`, and implements `AsRef<[u8]>` and `Deref<Target = [u8]>`.
24+
- Changed `RawResponse::json()` from `async` to a sync function. The body was already buffered.
25+
- Changed `RawResponse::xml()` from `async` to a sync function. The body was already buffered.
26+
- Changed `Response<T, F>` to fully sync; it holds a `RawResponse` that was already buffered entirely from the service so no longer needs or defines async functions.
27+
- Removed `create_extensible_enum` and `create_enum` macros.
28+
- Removed `BufResponse::json()`.
29+
- Removed `BufResponse::xml()`.
30+
- Removed `CustomHeadersPolicy` from public API.
1431
- Removed `ErrorKind::http_response()`. Construct an `ErrorResponse::HttpResponse` variant instead.
32+
- Removed `ExponentialRetryPolicy` from public API.
33+
- Removed `FixedRetryPolicy` from public API.
34+
- Removed `LoggingPolicy` from public API.
35+
- Removed `NoRetryPolicy` from public API.
36+
- Removed implementation of `Stream` for `ResponseBody`.
1537
- Removed several unreferenced HTTP headers and accessor structures for those headers.
16-
- Renamed `TransportOptions` to `Transport`.
1738
- Renamed `TransportOptions::new_custom_policy()` to `Transport::with_policy()`.
39+
- Renamed `TransportOptions` to `Transport`.
1840
- Renamed a number of construction functions for `Error` to align with [guidelines](https://azure.github.io/azure-sdk/rust_introduction.html)"
1941
- Renamed `Error::full()` to `Error::with_error()`.
2042
- Renamed `Error::with_message()` to `Error::with_message_fn()`.
@@ -24,7 +46,8 @@
2446
- Renamed `ResultExt::map_kind()` to `ResultExt::with_kind()`.
2547
- Renamed `ResultExt::with_context()` to `ResultExt::with_context_fn()`.
2648
- Renamed `ResultExt::context()` to `ResultExt::with_context()`.
27-
- Removed `create_extensible_enum` and `create_enum` macros.
49+
- Replaced implementation of `From<BufResponse>` for `Response<T, F>` to `From<RawResponse>`.
50+
- Replaced implementation of `From<Response<T, F>>` for `BufResponse` to `From<AsyncResponse<T>>`.
2851

2952
### Bugs Fixed
3053

sdk/core/azure_core/README.md

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
159159

160160
// Response<T> has two main accessors:
161161
// 1. The `into_body()` function consumes self to deserialize into a model type
162-
let secret = response.into_body().await?;
162+
let secret = response.into_body()?;
163163

164164
// get response again because it was moved in above statement
165165
let response: Response<Secret> = client.get_secret("secret-name", None).await?;
@@ -199,7 +199,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
199199
)?;
200200

201201
match client.get_secret("secret-name", None).await {
202-
Ok(secret) => println!("Secret: {:?}", secret.into_body().await?.value),
202+
Ok(secret) => println!("Secret: {:?}", secret.into_body()?.value),
203203
Err(e) => match e.kind() {
204204
ErrorKind::HttpResponse { status, error_code, .. } if *status == StatusCode::NotFound => {
205205
// handle not found error
@@ -272,7 +272,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
272272

273273
// poll the pager until there are no more SecretListResults
274274
while let Some(secrets) = pager.try_next().await? {
275-
let secrets = secrets.into_body().await?.value;
275+
let secrets = secrets.into_body()?.value;
276276
// loop through secrets in SecretsListResults
277277
for secret in secrets {
278278
// get the secret name from the ID
@@ -328,7 +328,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
328328
// The Poller implements futures::Stream and automatically waits between polls.
329329
let mut poller = client.begin_create_certificate("certificate-name", body.try_into()?, None)?;
330330
while let Some(operation) = poller.try_next().await? {
331-
let operation = operation.into_body().await?;
331+
let operation = operation.into_body()?;
332332
match operation.status.as_deref().unwrap_or("unknown") {
333333
"inProgress" => continue,
334334
"completed" => {
@@ -385,8 +385,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
385385
.wait()
386386
.await?
387387
// Deserialize the CertificateOperation:
388-
.into_body()
389-
.await?;
388+
.into_body()?;
390389

391390
if matches!(operation.status, Some(status) if status == "completed") {
392391
let target = operation.target.ok_or("expected target")?;
@@ -577,8 +576,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
577576
// get a secret
578577
let secret = client.get_secret("secret-name", None)
579578
.await?
580-
.into_body()
581-
.await?;
579+
.into_body()?;
582580

583581
println!("{secret:#?}");
584582

sdk/core/azure_core/benches/http_transport_benchmarks.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use azure_core::{
55
credentials::TokenCredential,
66
fmt::SafeDebug,
77
http::{
8-
BufResponse, ClientMethodOptions, ClientOptions, HttpClient, Method, Pipeline, Request,
8+
ClientMethodOptions, ClientOptions, HttpClient, Method, Pipeline, RawResponse, Request,
99
Transport, Url,
1010
},
1111
Result,
@@ -85,7 +85,7 @@ impl TestServiceClient {
8585
&self,
8686
path: &str,
8787
options: Option<TestServiceClientGetMethodOptions<'_>>,
88-
) -> Result<BufResponse> {
88+
) -> Result<RawResponse> {
8989
let options = options.unwrap_or_default();
9090
let mut url = self.endpoint.clone();
9191
url.set_path(path);

sdk/core/azure_core/examples/core_poller.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ async fn test_poller() -> Result<(), Box<dyn std::error::Error>> {
4242
.begin_create_certificate("my-cert", params.try_into()?, None)?
4343
.wait()
4444
.await?
45-
.into_body()
46-
.await?;
45+
.into_body()?;
4746
assert_eq!(operation.status.as_deref(), Some("completed"));
4847
assert!(operation.target.is_some());
4948

sdk/core/azure_core/examples/core_remove_user_agent.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,7 @@ async fn test_remove_user_agent() -> Result<(), Box<dyn std::error::Error>> {
6262
)?;
6363

6464
// We'll fetch a secret and let the mock client assert the User-Agent header was removed.
65-
let secret = client
66-
.get_secret("my-secret", None)
67-
.await?
68-
.into_body()
69-
.await?;
65+
let secret = client.get_secret("my-secret", None).await?.into_body()?;
7066
assert_eq!(secret.value.as_deref(), Some("secret-value"));
7167

7268
Ok(())

sdk/core/azure_core/src/error/error_response.rs

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
use crate::{
77
error::{Error, ErrorKind},
8-
http::{headers::ERROR_CODE, BufResponse},
8+
http::{headers::ERROR_CODE, BufResponse, RawResponse, StatusCode},
99
};
1010
use serde::Deserialize;
11-
use std::{collections::HashMap, str};
11+
use std::{collections::HashMap, future::Future, str};
1212

1313
/// An HTTP error response.
1414
///
@@ -42,7 +42,7 @@ impl TryFrom<Error> for ErrorResponse {
4242
ErrorKind::HttpResponse { raw_response, .. } => {
4343
let error_response: Option<crate::Result<ErrorResponse>> = raw_response
4444
.as_ref()
45-
.map(|raw| serde_json::from_slice(raw.body()).map_err(Error::from));
45+
.map(|raw| serde_json::from_slice(raw.body().as_ref()).map_err(Error::from));
4646
match error_response {
4747
Some(result) => Ok(result?),
4848
None => Err(value),
@@ -108,6 +108,41 @@ struct ErrorDetailsInternal<'a> {
108108
message: Option<&'a str>,
109109
}
110110

111+
/// Represents a response from which we can get a [`StatusCode`] and collect into a [`RawResponse`].
112+
///
113+
/// This is intended for internal use only and implemented only by [`BufResponse`] and [`RawResponse`].
114+
pub trait Response: crate::private::Sealed {
115+
/// Get the [`StatusCode`] from the response.
116+
fn status(&self) -> StatusCode;
117+
118+
/// Collect into a [`RawResponse`].
119+
fn try_into_raw_response(self) -> impl Future<Output = crate::Result<RawResponse>>;
120+
}
121+
122+
impl crate::private::Sealed for BufResponse {}
123+
impl crate::private::Sealed for RawResponse {}
124+
125+
impl Response for BufResponse {
126+
fn status(&self) -> StatusCode {
127+
self.status()
128+
}
129+
130+
fn try_into_raw_response(self) -> impl Future<Output = crate::Result<RawResponse>> {
131+
self.try_into_raw_response()
132+
}
133+
}
134+
135+
impl Response for RawResponse {
136+
fn status(&self) -> StatusCode {
137+
self.status()
138+
}
139+
140+
#[inline]
141+
fn try_into_raw_response(self) -> impl Future<Output = crate::Result<RawResponse>> {
142+
std::future::ready(Ok(self))
143+
}
144+
}
145+
111146
/// Options for customizing the behavior of `check_success`.
112147
#[derive(Debug, Default)]
113148
pub struct CheckSuccessOptions {
@@ -128,10 +163,10 @@ pub struct CheckSuccessOptions {
128163
/// * `Err(Error)` if the response is an error, with details extracted from the response
129164
/// body if possible.
130165
///
131-
pub async fn check_success(
132-
response: BufResponse,
166+
pub async fn check_success<T: Response>(
167+
response: T,
133168
options: Option<CheckSuccessOptions>,
134-
) -> crate::Result<BufResponse> {
169+
) -> crate::Result<T> {
135170
let status = response.status();
136171

137172
if options
@@ -438,9 +473,10 @@ mod tests {
438473
Bytes::from_static(br#"{"error":{"code":"InvalidRequest","message":"The request object is not recognized.","innererror":{"code":"InvalidKey"},"key":"foo"}}"#),
439474
);
440475
let error_response: ErrorResponse = buf_response
476+
.try_into_raw_response()
477+
.await?
441478
.into_body()
442479
.json()
443-
.await
444480
.expect("expected an ErrorResponse");
445481
error_response.error.as_ref().expect("error should be set");
446482
println!("{:?}", &error_response);

sdk/core/azure_core/src/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub use pager::{ItemIterator, PageIterator, Pager};
1818
pub use pipeline::*;
1919
pub use poller::Poller;
2020
pub use request::{Body, Request, RequestContent};
21-
pub use response::{BufResponse, RawResponse, Response};
21+
pub use response::{AsyncResponse, BufResponse, RawResponse, Response};
2222

2323
pub use typespec_client_core::http::response;
2424
pub use typespec_client_core::http::{

0 commit comments

Comments
 (0)