Skip to content

Commit 9f9468f

Browse files
committed
Add a failing test_async_stream_chunked
Signed-off-by: blaginin <[email protected]>
1 parent 374882d commit 9f9468f

File tree

1 file changed

+46
-0
lines changed

1 file changed

+46
-0
lines changed

vortex-ipc/src/stream.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ impl Stream for ArrayStreamIPCBytes {
200200

201201
#[cfg(test)]
202202
mod test {
203+
use std::io;
204+
use std::pin::Pin;
205+
use std::task::Context;
206+
use std::task::Poll;
207+
203208
use futures::io::Cursor;
204209
use vortex_array::IntoArray as _;
205210
use vortex_array::ToCanonical;
@@ -232,4 +237,45 @@ mod test {
232237
result.as_slice::<i32>()
233238
);
234239
}
240+
241+
/// Wrapper that limits reads to small chunks to simulate network behavior
242+
struct ChunkedReader<R> {
243+
inner: R,
244+
chunk_size: usize,
245+
}
246+
247+
impl<R: AsyncRead + Unpin> AsyncRead for ChunkedReader<R> {
248+
fn poll_read(
249+
mut self: Pin<&mut Self>,
250+
cx: &mut Context<'_>,
251+
buf: &mut [u8],
252+
) -> Poll<io::Result<usize>> {
253+
let chunk_size = self.chunk_size.min(buf.len());
254+
Pin::new(&mut self.inner).poll_read(cx, &mut buf[..chunk_size])
255+
}
256+
}
257+
258+
#[tokio::test]
259+
async fn test_async_stream_chunked() {
260+
let session = ArraySession::default();
261+
let array = buffer![1i32, 2, 3, 4, 5, 6, 7, 8, 9, 10].into_array();
262+
let ipc_buffer = array
263+
.to_array_stream()
264+
.into_ipc()
265+
.collect_to_buffer()
266+
.await
267+
.unwrap();
268+
269+
let chunked = ChunkedReader {
270+
inner: Cursor::new(ipc_buffer),
271+
chunk_size: 3,
272+
};
273+
274+
let reader = AsyncIPCReader::try_new(chunked, session.registry().clone())
275+
.await
276+
.unwrap();
277+
278+
let result = reader.read_all().await.unwrap();
279+
assert_eq!(result.len(), 10);
280+
}
235281
}

0 commit comments

Comments
 (0)