Skip to content

Commit b930bce

Browse files
authored
Verify request body streaming (#2719)
* Verify request body streaming Resolves #2425 by adding a live-only example to `azure_storage_blob` so we can trace it (`RUST_LOG=trace`) and verify with `tcpdump` we are indeed tracing request and response bodies as expected. * Simplify API to convert SeekableStream into RequestContent * Update `time` crate for `to_utc` * Remove unnecessary blobService configuration
1 parent 9648593 commit b930bce

File tree

9 files changed

+478
-3
lines changed

9 files changed

+478
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ sha2 = { version = "0.10" }
107107
syn = { version = "2.0.87", features = ["full"] }
108108
tar = { version = "0.4.44", default-features = false }
109109
thiserror = "1.0"
110-
time = { version = "0.3.10", features = [
110+
time = { version = "0.3.41", features = [
111111
"serde-well-known",
112112
"macros",
113113
"wasm-bindgen",

sdk/core/azure_core_test/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub mod http;
88
pub mod proxy;
99
pub mod recorded;
1010
mod recording;
11+
pub mod stream;
1112

1213
use azure_core::Error;
1314
pub use azure_core::{error::ErrorKind, test::TestMode};
Lines changed: 347 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
//! Streams for testing purposes.
5+
6+
use azure_core::stream::SeekableStream;
7+
#[cfg(not(target_arch = "wasm32"))]
8+
use azure_core::{
9+
http::{Body, RequestContent},
10+
Bytes,
11+
};
12+
use futures::{io::AsyncRead, stream::Stream};
13+
use std::{fmt, iter::Cycle, ops::Range, pin::Pin, task::Poll};
14+
15+
/// Implements a [`Stream`] over an endless cycle of bytes.
16+
#[derive(Clone)]
17+
pub struct GeneratedStream<I, const LENGTH: usize, const CHUNK: usize = 1024> {
18+
generator: Cycle<I>,
19+
bytes_read: usize,
20+
}
21+
22+
impl<const LENGTH: usize, const CHUNK: usize> GeneratedStream<Range<u8>, LENGTH, CHUNK> {
23+
/// Creates a `GeneratedStream` over a series of bytes from `0..255`.
24+
///
25+
/// # Examples
26+
///
27+
/// ```
28+
/// use azure_core_test::stream::GeneratedStream;
29+
/// use futures::io::AsyncReadExt;
30+
///
31+
/// # #[tokio::main] async fn main() {
32+
/// let mut stream = GeneratedStream::<_, 4>::new();
33+
/// let mut buf = Vec::new();
34+
/// stream.read_to_end(&mut buf).await.unwrap();
35+
/// assert_eq!(buf, vec![0u8, 1, 2, 3]);
36+
/// # }
37+
/// ```
38+
pub fn new() -> GeneratedStream<Range<u8>, LENGTH, CHUNK> {
39+
GeneratedStream {
40+
generator: (0..u8::MAX).cycle(),
41+
bytes_read: 0,
42+
}
43+
}
44+
}
45+
46+
impl<I, const LENGTH: usize, const CHUNK: usize> GeneratedStream<I, LENGTH, CHUNK>
47+
where
48+
I: Iterator<Item = u8> + Clone,
49+
{
50+
/// Creates a `GeneratedStream` over a custom iterator of bytes.
51+
///
52+
/// # Examples
53+
///
54+
/// ```
55+
/// use azure_core_test::stream::GeneratedStream;
56+
/// use futures::io::AsyncReadExt;
57+
///
58+
/// # #[tokio::main] async fn main() {
59+
/// let iter = b"hello, world!".iter().copied();
60+
/// let mut stream = GeneratedStream::<_, 18>::from_iter(iter);
61+
/// let mut buf = Vec::new();
62+
/// stream.read_to_end(&mut buf).await.unwrap();
63+
/// let s = String::from_utf8(buf).unwrap();
64+
/// assert_eq!(s, "hello, world!hello");
65+
/// # }
66+
/// ```
67+
#[allow(clippy::should_implement_trait)]
68+
pub fn from_iter(iter: I) -> Self {
69+
GeneratedStream {
70+
generator: iter.cycle(),
71+
bytes_read: 0,
72+
}
73+
}
74+
}
75+
76+
impl<I, const LENGTH: usize, const CHUNK: usize> fmt::Debug for GeneratedStream<I, LENGTH, CHUNK> {
77+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78+
f.debug_struct("GeneratedStream")
79+
.field("bytes_read", &self.bytes_read)
80+
.finish_non_exhaustive()
81+
}
82+
}
83+
84+
impl<const LENGTH: usize, const CHUNK: usize> Default
85+
for GeneratedStream<Range<u8>, LENGTH, CHUNK>
86+
{
87+
/// Creates a `GeneratedStream` over a series of bytes from `0..255`.
88+
///
89+
/// # Examples
90+
///
91+
/// ```
92+
/// use azure_core_test::stream::GeneratedStream;
93+
/// use futures::io::AsyncReadExt;
94+
///
95+
/// # #[tokio::main] async fn main() {
96+
/// let mut stream = GeneratedStream::<_, 4>::default();
97+
/// let mut buf = Vec::new();
98+
/// stream.read_to_end(&mut buf).await.unwrap();
99+
/// assert_eq!(buf, vec![0u8, 1, 2, 3]);
100+
/// # }
101+
/// ```
102+
fn default() -> Self {
103+
Self::new()
104+
}
105+
}
106+
107+
impl<I, const LENGTH: usize, const CHUNK: usize> AsyncRead for GeneratedStream<I, LENGTH, CHUNK>
108+
where
109+
I: Clone,
110+
Cycle<I>: Iterator<Item = u8> + Unpin,
111+
{
112+
fn poll_read(
113+
self: Pin<&mut Self>,
114+
_cx: &mut std::task::Context<'_>,
115+
buf: &mut [u8],
116+
) -> Poll<std::io::Result<usize>> {
117+
let self_mut = self.get_mut();
118+
119+
if self_mut.bytes_read >= LENGTH {
120+
return Poll::Ready(Ok(0));
121+
}
122+
123+
let remaining_bytes = LENGTH - self_mut.bytes_read;
124+
let bytes_to_read = std::cmp::min(remaining_bytes, buf.len());
125+
126+
for byte_slot in buf.iter_mut().take(bytes_to_read) {
127+
*byte_slot = self_mut.generator.next().unwrap();
128+
self_mut.bytes_read += 1;
129+
}
130+
131+
tracing::debug!("read {bytes_to_read} bytes");
132+
Poll::Ready(Ok(bytes_to_read))
133+
}
134+
}
135+
136+
impl<I, const LENGTH: usize, const CHUNK: usize> Stream for GeneratedStream<I, LENGTH, CHUNK>
137+
where
138+
I: Clone,
139+
Cycle<I>: Iterator<Item = u8> + Unpin,
140+
{
141+
type Item = std::io::Result<Vec<u8>>;
142+
143+
fn poll_next(
144+
self: Pin<&mut Self>,
145+
_cx: &mut std::task::Context<'_>,
146+
) -> Poll<Option<Self::Item>> {
147+
let self_mut = self.get_mut();
148+
149+
if self_mut.bytes_read >= LENGTH {
150+
return Poll::Ready(None);
151+
}
152+
153+
let remaining_bytes = LENGTH - self_mut.bytes_read;
154+
let bytes_to_read = std::cmp::min(remaining_bytes, CHUNK);
155+
156+
let chunk: Vec<u8> = (0..bytes_to_read)
157+
.map(|_| {
158+
self_mut.bytes_read += 1;
159+
self_mut.generator.next().unwrap()
160+
})
161+
.collect();
162+
163+
tracing::debug!("read {} bytes", chunk.len());
164+
Poll::Ready(Some(Ok(chunk)))
165+
}
166+
}
167+
168+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
169+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
170+
impl<I, const LENGTH: usize, const CHUNK: usize> SeekableStream
171+
for GeneratedStream<I, LENGTH, CHUNK>
172+
where
173+
I: Clone + Send + Sync,
174+
Cycle<I>: Iterator<Item = u8> + Unpin,
175+
{
176+
async fn reset(&mut self) -> azure_core::Result<()> {
177+
self.bytes_read = 0;
178+
tracing::trace!("reset");
179+
Ok(())
180+
}
181+
182+
fn len(&self) -> usize {
183+
LENGTH
184+
}
185+
}
186+
187+
#[cfg(not(target_arch = "wasm32"))]
188+
impl<I, const LENGTH: usize, const CHUNK: usize> From<&GeneratedStream<I, LENGTH, CHUNK>> for Body
189+
where
190+
for<'a> I: Clone + Send + Sync + 'a,
191+
Cycle<I>: Iterator<Item = u8> + Unpin,
192+
{
193+
fn from(stream: &GeneratedStream<I, LENGTH, CHUNK>) -> Self {
194+
Body::SeekableStream(Box::new(stream.clone()))
195+
}
196+
}
197+
198+
#[cfg(not(target_arch = "wasm32"))]
199+
impl<I, const LENGTH: usize, const CHUNK: usize> From<GeneratedStream<I, LENGTH, CHUNK>> for Body
200+
where
201+
for<'a> I: Clone + Send + Sync + 'a,
202+
Cycle<I>: Iterator<Item = u8> + Unpin,
203+
{
204+
fn from(stream: GeneratedStream<I, LENGTH, CHUNK>) -> Self {
205+
Body::SeekableStream(Box::new(stream))
206+
}
207+
}
208+
209+
#[cfg(not(target_arch = "wasm32"))]
210+
impl<I, const LENGTH: usize, const CHUNK: usize> From<&GeneratedStream<I, LENGTH, CHUNK>>
211+
for RequestContent<Bytes>
212+
where
213+
for<'a> I: Clone + Send + Sync + 'a,
214+
Cycle<I>: Iterator<Item = u8> + Unpin,
215+
{
216+
fn from(stream: &GeneratedStream<I, LENGTH, CHUNK>) -> Self {
217+
Body::from(stream).into()
218+
}
219+
}
220+
221+
#[cfg(not(target_arch = "wasm32"))]
222+
impl<I, const LENGTH: usize, const CHUNK: usize> From<GeneratedStream<I, LENGTH, CHUNK>>
223+
for RequestContent<Bytes>
224+
where
225+
for<'a> I: Clone + Send + Sync + 'a,
226+
Cycle<I>: Iterator<Item = u8> + Unpin,
227+
{
228+
fn from(stream: GeneratedStream<I, LENGTH, CHUNK>) -> Self {
229+
Body::from(stream).into()
230+
}
231+
}
232+
233+
#[cfg(test)]
234+
mod tests {
235+
use super::GeneratedStream;
236+
use futures::{io::AsyncReadExt as _, stream::StreamExt as _};
237+
238+
#[tokio::test]
239+
async fn async_read_all_bytes_at_once() {
240+
let mut stream = GeneratedStream::<_, 100>::default();
241+
let mut buf = vec![0u8; 100];
242+
243+
stream.read_exact(&mut buf).await.unwrap();
244+
245+
// Verify the pattern matches the cycle (0..255)
246+
for (i, &byte) in buf.iter().enumerate() {
247+
assert_eq!(byte, (i % 255) as u8);
248+
}
249+
}
250+
251+
#[tokio::test]
252+
async fn async_read_partial_chunks() {
253+
let mut stream = GeneratedStream::<_, 50>::default();
254+
let mut total_read = 0;
255+
let mut all_bytes = Vec::new();
256+
257+
// Read in chunks of 10 bytes
258+
loop {
259+
let mut buf = [0u8; 10];
260+
let bytes_read = stream.read(&mut buf).await.unwrap();
261+
if bytes_read == 0 {
262+
break;
263+
}
264+
all_bytes.extend_from_slice(&buf[..bytes_read]);
265+
total_read += bytes_read;
266+
}
267+
268+
assert_eq!(total_read, 50);
269+
assert_eq!(all_bytes.len(), 50);
270+
271+
// Verify the pattern
272+
for (i, &byte) in all_bytes.iter().enumerate() {
273+
assert_eq!(byte, (i % 255) as u8);
274+
}
275+
}
276+
277+
#[tokio::test]
278+
async fn stream_1024_byte_chunks() {
279+
let mut stream = GeneratedStream::<_, 3000>::default();
280+
let mut total_bytes = 0;
281+
let mut chunk_count = 0;
282+
283+
while let Some(Ok(chunk)) = stream.next().await {
284+
chunk_count += 1;
285+
total_bytes += chunk.len();
286+
287+
// First two chunks should be 1024 bytes, last chunk should be smaller
288+
if chunk_count <= 2 {
289+
assert_eq!(chunk.len(), 1024);
290+
} else {
291+
assert!(chunk.len() <= 1024);
292+
}
293+
}
294+
295+
assert_eq!(total_bytes, 3000);
296+
assert_eq!(chunk_count, 3); // 1024 + 1024 + 952 = 3000
297+
}
298+
299+
#[tokio::test]
300+
async fn stream_respects_max_limit() {
301+
let mut stream = GeneratedStream::<_, 10>::default();
302+
let mut total_bytes = 0;
303+
304+
while let Some(Ok(chunk)) = stream.next().await {
305+
total_bytes += chunk.len();
306+
}
307+
308+
assert_eq!(total_bytes, 10);
309+
}
310+
311+
#[tokio::test]
312+
async fn custom_chunk_size() {
313+
let mut stream = GeneratedStream::<_, 100, 32>::default();
314+
let mut total_bytes = 0;
315+
let mut chunk_count = 0;
316+
317+
while let Some(Ok(chunk)) = stream.next().await {
318+
chunk_count += 1;
319+
total_bytes += chunk.len();
320+
321+
// All chunks except possibly the last should be 32 bytes
322+
if total_bytes < 100 {
323+
assert_eq!(chunk.len(), 32);
324+
} else {
325+
assert!(chunk.len() <= 32);
326+
}
327+
}
328+
329+
assert_eq!(total_bytes, 100);
330+
assert_eq!(chunk_count, 4); // 32 + 32 + 32 + 4 = 100
331+
}
332+
333+
#[tokio::test]
334+
async fn from_iter_hello_world() {
335+
let hello_world = b"Hello, world!";
336+
let iter = hello_world.iter().copied();
337+
let mut stream = GeneratedStream::<_, 16>::from_iter(iter);
338+
339+
let mut buf = Vec::new();
340+
stream.read_to_end(&mut buf).await.unwrap();
341+
342+
assert_eq!(buf.len(), 16);
343+
let result_str = std::str::from_utf8(&buf).unwrap();
344+
// Should be "Hello, world!" (13 bytes) + "Hel" (3 more bytes to reach 16 total)
345+
assert_eq!(result_str, "Hello, world!Hel");
346+
}
347+
}

sdk/storage/azure_storage_blob/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@ uuid.workspace = true
2727
workspace = true
2828

2929
[dev-dependencies]
30-
azure_core_test.workspace = true
30+
azure_core_test = { workspace = true, features = [
31+
"tracing",
32+
] }
3133
azure_identity.workspace = true
3234
azure_storage_blob_test.path = "../azure_storage_blob_test"
3335
futures.workspace = true
3436
tokio = { workspace = true, features = ["macros"] }
37+
tracing.workspace = true

0 commit comments

Comments
 (0)