Skip to content

Commit 8f76f01

Browse files
committed
refactor
Signed-off-by: Connor Tsui <[email protected]>
1 parent dbd3d4a commit 8f76f01

File tree

1 file changed

+71
-35
lines changed

1 file changed

+71
-35
lines changed

vortex-ipc/src/messages/reader_async.rs

Lines changed: 71 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,12 @@ use futures::Stream;
1212
use pin_project_lite::pin_project;
1313
use vortex_array::session::ArrayRegistry;
1414
use vortex_error::VortexResult;
15+
use vortex_error::vortex_err;
1516

1617
use crate::messages::DecoderMessage;
1718
use crate::messages::MessageDecoder;
1819
use crate::messages::PollRead;
1920

20-
/// The state of an in-progress read operation.
21-
#[derive(Default)]
22-
enum ReadState {
23-
/// Ready to consult the decoder for the next operation.
24-
#[default]
25-
AwaitingDecoder,
26-
/// Filling the buffer with data from the underlying reader.
27-
///
28-
/// Async readers may return fewer bytes than requested (partial reads), especially over network
29-
/// connections. This state persists across multiple `poll_next` calls until the buffer is
30-
/// completely filled, at which point we transition back to [`Self::AwaitingDecoder`].
31-
Filling {
32-
/// The number of bytes read into the buffer so far.
33-
total_bytes_read: usize,
34-
},
35-
}
36-
3721
pin_project! {
3822
/// An IPC message reader backed by an `AsyncRead` stream.
3923
pub struct AsyncMessageReader<R> {
@@ -56,6 +40,65 @@ impl<R> AsyncMessageReader<R> {
5640
}
5741
}
5842

43+
/// The state of an in-progress read operation.
44+
#[derive(Default)]
45+
enum ReadState {
46+
/// Ready to consult the decoder for the next operation.
47+
#[default]
48+
AwaitingDecoder,
49+
/// Filling the buffer with data from the underlying reader.
50+
///
51+
/// Async readers may return fewer bytes than requested (partial reads), especially over network
52+
/// connections. This state persists across multiple `poll_next` calls until the buffer is
53+
/// completely filled, at which point we transition back to [`Self::AwaitingDecoder`].
54+
Filling {
55+
/// The number of bytes read into the buffer so far.
56+
total_bytes_read: usize,
57+
},
58+
}
59+
60+
/// Result of polling the reader to fill the buffer.
61+
enum FillResult {
62+
/// The buffer has been completely filled.
63+
Filled,
64+
/// Need more data (partial read occurred).
65+
Pending,
66+
/// Clean EOF at a message boundary.
67+
Eof,
68+
}
69+
70+
/// Polls the reader to fill the buffer, handling partial reads.
71+
fn poll_fill_buffer<R: AsyncRead>(
72+
read: Pin<&mut R>,
73+
buffer: &mut [u8],
74+
total_bytes_read: &mut usize,
75+
cx: &mut Context<'_>,
76+
) -> Poll<VortexResult<FillResult>> {
77+
let unfilled = &mut buffer[*total_bytes_read..];
78+
79+
let bytes_read = ready!(read.poll_read(cx, unfilled))?; // Poll until ready.
80+
81+
// `0` bytes read indicates an EOF.
82+
Poll::Ready(if bytes_read == 0 {
83+
if *total_bytes_read > 0 {
84+
Err(vortex_err!(
85+
"unexpected EOF during partial read: read {total_bytes_read} of {} expected bytes",
86+
buffer.len()
87+
))
88+
} else {
89+
Ok(FillResult::Eof)
90+
}
91+
} else {
92+
*total_bytes_read += bytes_read;
93+
if *total_bytes_read == buffer.len() {
94+
Ok(FillResult::Filled)
95+
} else {
96+
debug_assert!(*total_bytes_read < buffer.len());
97+
Ok(FillResult::Pending)
98+
}
99+
})
100+
}
101+
59102
impl<R: AsyncRead> Stream for AsyncMessageReader<R> {
60103
type Item = VortexResult<DecoderMessage>;
61104

@@ -65,8 +108,6 @@ impl<R: AsyncRead> Stream for AsyncMessageReader<R> {
65108
match this.state {
66109
ReadState::AwaitingDecoder => match this.decoder.read_next(this.buffer)? {
67110
PollRead::Some(msg) => return Poll::Ready(Some(Ok(msg))),
68-
// The decoder needs more data to be read into the buffer, so resize for more
69-
// space to read from `this.read`.
70111
PollRead::NeedMore(new_len) => {
71112
this.buffer.resize(new_len, 0x00);
72113
*this.state = ReadState::Filling {
@@ -75,22 +116,17 @@ impl<R: AsyncRead> Stream for AsyncMessageReader<R> {
75116
}
76117
},
77118
ReadState::Filling { total_bytes_read } => {
78-
let unfilled = &mut this.buffer.as_mut()[*total_bytes_read..];
79-
80-
match ready!(this.read.as_mut().poll_read(cx, unfilled)) {
81-
Err(e) => return Poll::Ready(Some(Err(e.into()))),
82-
Ok(0) => return Poll::Ready(None),
83-
// Continue to read until we fill the buffer. We need to do this in a loop
84-
// to handle partial reads.
85-
Ok(bytes_read) => {
86-
*total_bytes_read += bytes_read;
87-
88-
if *total_bytes_read == this.buffer.len() {
89-
*this.state = ReadState::AwaitingDecoder;
90-
} else {
91-
debug_assert!(*total_bytes_read < this.buffer.len());
92-
}
93-
}
119+
// Poll until ready.
120+
match ready!(poll_fill_buffer(
121+
this.read.as_mut(),
122+
this.buffer,
123+
total_bytes_read,
124+
cx
125+
)) {
126+
Err(e) => return Poll::Ready(Some(Err(e))),
127+
Ok(FillResult::Eof) => return Poll::Ready(None),
128+
Ok(FillResult::Filled) => *this.state = ReadState::AwaitingDecoder,
129+
Ok(FillResult::Pending) => {}
94130
}
95131
}
96132
}

0 commit comments

Comments
 (0)