Skip to content

Commit dbd3d4a

Browse files
committed
fix AsyncMessageReader to handle partial reads
Signed-off-by: Connor Tsui <[email protected]>
1 parent 9f9468f commit dbd3d4a

File tree

2 files changed

+58
-24
lines changed

2 files changed

+58
-24
lines changed

vortex-ipc/src/messages/decoder.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,20 @@ enum State {
4646

4747
#[derive(Debug)]
4848
pub enum PollRead {
49+
/// A complete message was decoded.
4950
Some(DecoderMessage),
50-
/// Returns the _total_ number of bytes needed to make progress.
51-
/// Note this is _not_ the incremental number of bytes needed to make progress.
51+
/// The decoder needs more data to make progress.
52+
///
53+
/// The inner value is the *total* number of bytes the buffer should contain, not the
54+
/// incremental amount needed. Callers should:
55+
/// 1. Resize the buffer to this length.
56+
/// 2. Fill the buffer completely (handling partial reads as needed).
57+
/// 3. Only then call [`MessageDecoder::read_next`] again.
58+
///
59+
/// The decoder checks [`bytes::Buf::remaining`] to determine available data, which for
60+
/// [`bytes::BytesMut`] returns the buffer length regardless of how many bytes were actually
61+
/// written. Calling `read_next` before the buffer is fully populated will cause the decoder
62+
/// to read uninitialized or stale data.
5263
NeedMore(usize),
5364
}
5465

vortex-ipc/src/messages/reader_async.rs

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,31 @@ use crate::messages::DecoderMessage;
1717
use crate::messages::MessageDecoder;
1818
use crate::messages::PollRead;
1919

20+
/// The state of an in-progress read operation.
21+
#[derive(Default)]
22+
enum ReadState {
23+
/// Ready to consult the decoder for the next operation.
24+
#[default]
25+
AwaitingDecoder,
26+
/// Filling the buffer with data from the underlying reader.
27+
///
28+
/// Async readers may return fewer bytes than requested (partial reads), especially over network
29+
/// connections. This state persists across multiple `poll_next` calls until the buffer is
30+
/// completely filled, at which point we transition back to [`Self::AwaitingDecoder`].
31+
Filling {
32+
/// The number of bytes read into the buffer so far.
33+
total_bytes_read: usize,
34+
},
35+
}
36+
2037
pin_project! {
2138
/// An IPC message reader backed by an `AsyncRead` stream.
2239
pub struct AsyncMessageReader<R> {
2340
#[pin]
2441
read: R,
2542
buffer: BytesMut,
2643
decoder: MessageDecoder,
27-
bytes_read: usize,
44+
state: ReadState,
2845
}
2946
}
3047

@@ -34,7 +51,7 @@ impl<R> AsyncMessageReader<R> {
3451
read,
3552
buffer: BytesMut::new(),
3653
decoder: MessageDecoder::new(registry),
37-
bytes_read: 0,
54+
state: ReadState::default(),
3855
}
3956
}
4057
}
@@ -45,29 +62,35 @@ impl<R: AsyncRead> Stream for AsyncMessageReader<R> {
4562
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4663
let mut this = self.project();
4764
loop {
48-
match this.decoder.read_next(this.buffer)? {
49-
PollRead::Some(msg) => return Poll::Ready(Some(Ok(msg))),
50-
PollRead::NeedMore(nbytes) => {
51-
this.buffer.resize(nbytes, 0x00);
65+
match this.state {
66+
ReadState::AwaitingDecoder => match this.decoder.read_next(this.buffer)? {
67+
PollRead::Some(msg) => return Poll::Ready(Some(Ok(msg))),
68+
// The decoder needs more data to be read into the buffer, so resize for more
69+
// space to read from `this.read`.
70+
PollRead::NeedMore(new_len) => {
71+
this.buffer.resize(new_len, 0x00);
72+
*this.state = ReadState::Filling {
73+
total_bytes_read: 0,
74+
};
75+
}
76+
},
77+
ReadState::Filling { total_bytes_read } => {
78+
let unfilled = &mut this.buffer.as_mut()[*total_bytes_read..];
5279

53-
match ready!(
54-
this.read
55-
.as_mut()
56-
.poll_read(cx, &mut this.buffer.as_mut()[*this.bytes_read..])
57-
) {
58-
Ok(0) => {
59-
// End of file
60-
return Poll::Ready(None);
61-
}
62-
Ok(nbytes) => {
63-
*this.bytes_read += nbytes;
64-
// If we've finished the read operation, then we continue the loop
65-
// and the decoder should present us with a new response.
66-
if *this.bytes_read == nbytes {
67-
*this.bytes_read = 0;
80+
match ready!(this.read.as_mut().poll_read(cx, unfilled)) {
81+
Err(e) => return Poll::Ready(Some(Err(e.into()))),
82+
Ok(0) => return Poll::Ready(None),
83+
// Continue to read until we fill the buffer. We need to do this in a loop
84+
// to handle partial reads.
85+
Ok(bytes_read) => {
86+
*total_bytes_read += bytes_read;
87+
88+
if *total_bytes_read == this.buffer.len() {
89+
*this.state = ReadState::AwaitingDecoder;
90+
} else {
91+
debug_assert!(*total_bytes_read < this.buffer.len());
6892
}
6993
}
70-
Err(e) => return Poll::Ready(Some(Err(e.into()))),
7194
}
7295
}
7396
}

0 commit comments

Comments
 (0)