Skip to content

Commit 19d2104

Browse files
authored
fix(io): Framed won't stop reading if underlying IO returns Ok(0) (#497)
* feat(io): framed integration test * fix(io): `Framed` never stops reading * refactor(io): remove redundant where clause
1 parent 1edf00f commit 19d2104

File tree

7 files changed

+197
-152
lines changed

7 files changed

+197
-152
lines changed

compio-io/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ serde_json = { version = "1.0.140", optional = true }
2222
[dev-dependencies]
2323
compio-runtime = { workspace = true }
2424
compio-macros = { workspace = true }
25+
compio-driver = { workspace = true, features = ["polling"] }
2526
tokio = { workspace = true, features = ["macros", "rt"] }
2627
serde = { version = "1.0.219", features = ["derive"] }
2728
futures-executor = "0.3.30"
@@ -38,3 +39,11 @@ codec-serde-json = ["dep:serde", "dep:serde_json", "dep:thiserror"]
3839
allocator_api = ["compio-buf/allocator_api"]
3940
read_buf = ["compio-buf/read_buf"]
4041
nightly = ["allocator_api", "read_buf"]
42+
43+
[[test]]
44+
name = "compat"
45+
required-features = ["compat"]
46+
47+
[[test]]
48+
name = "framed"
49+
required-features = ["codec-serde-json"]

compio-io/src/framed/codec/serde_json.rs

Lines changed: 0 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -112,102 +112,3 @@ impl<T: DeserializeOwned> Decoder<T> for SerdeJsonCodec {
112112
serde_json::from_slice(buf).map_err(SerdeJsonCodecError::SerdeJsonError)
113113
}
114114
}
115-
116-
#[cfg(test)]
117-
mod test {
118-
use std::{
119-
io::{self, Cursor},
120-
rc::Rc,
121-
};
122-
123-
use compio_buf::{BufResult, IoBuf, IoBufMut};
124-
use futures_util::{SinkExt, StreamExt, lock::Mutex};
125-
use serde::{Deserialize, Serialize};
126-
127-
use crate::{
128-
AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt,
129-
framed::{Framed, codec::serde_json::SerdeJsonCodec, frame::LengthDelimited},
130-
};
131-
132-
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
133-
struct Test {
134-
foo: String,
135-
bar: usize,
136-
}
137-
138-
struct InMemoryPipe(Cursor<Rc<Mutex<Vec<u8>>>>);
139-
140-
impl AsyncRead for InMemoryPipe {
141-
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
142-
let BufResult(res, buf) = self
143-
.0
144-
.get_ref()
145-
.lock()
146-
.await
147-
.read_at(buf, self.0.position())
148-
.await;
149-
match res {
150-
Ok(len) => {
151-
self.0.set_position(self.0.position() + len as u64);
152-
BufResult(Ok(len), buf)
153-
}
154-
Err(_) => BufResult(res, buf),
155-
}
156-
}
157-
}
158-
159-
impl AsyncWrite for InMemoryPipe {
160-
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
161-
let BufResult(res, buf) = self
162-
.0
163-
.get_ref()
164-
.lock()
165-
.await
166-
.write_at(buf, self.0.position())
167-
.await;
168-
match res {
169-
Ok(len) => {
170-
self.0.set_position(self.0.position() + len as u64);
171-
BufResult(Ok(len), buf)
172-
}
173-
Err(_) => BufResult(res, buf),
174-
}
175-
}
176-
177-
async fn flush(&mut self) -> io::Result<()> {
178-
self.0.get_ref().lock().await.flush().await
179-
}
180-
181-
async fn shutdown(&mut self) -> io::Result<()> {
182-
self.0.get_ref().lock().await.shutdown().await
183-
}
184-
}
185-
186-
#[compio_macros::test]
187-
async fn test_framed() {
188-
let codec = SerdeJsonCodec::new();
189-
let framer = LengthDelimited::new();
190-
let buf = Rc::new(Mutex::new(vec![]));
191-
let r = InMemoryPipe(Cursor::new(buf.clone()));
192-
let w = InMemoryPipe(Cursor::new(buf));
193-
let mut framed = Framed::symmetric::<Test>(codec, framer)
194-
.with_reader(r)
195-
.with_writer(w);
196-
197-
let origin = Test {
198-
foo: "hello, world!".to_owned(),
199-
bar: 114514,
200-
};
201-
framed.send(origin.clone()).await.unwrap();
202-
framed.send(origin.clone()).await.unwrap();
203-
204-
let des = framed.next().await.unwrap().unwrap();
205-
println!("{des:?}");
206-
207-
assert_eq!(origin, des);
208-
let des = framed.next().await.unwrap().unwrap();
209-
println!("{des:?}");
210-
211-
assert_eq!(origin, des);
212-
}
213-
}

compio-io/src/framed/mod.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl<R, W, C, F, In, Out> Framed<R, W, C, F, In, Out> {
3636
/// Change the reader of the `Framed` object.
3737
pub fn with_reader<Io>(self, reader: Io) -> Framed<Io, W, C, F, In, Out> {
3838
Framed {
39-
read_state: read::State::Idle(Some((reader, Buffer::with_capacity(64)))),
39+
read_state: read::State::new(reader, Buffer::with_capacity(64)),
4040
write_state: self.write_state,
4141
codec: self.codec,
4242
framer: self.framer,
@@ -48,7 +48,7 @@ impl<R, W, C, F, In, Out> Framed<R, W, C, F, In, Out> {
4848
pub fn with_writer<Io>(self, writer: Io) -> Framed<R, Io, C, F, In, Out> {
4949
Framed {
5050
read_state: self.read_state,
51-
write_state: write::State::Idle(Some((writer, Vec::new()))),
51+
write_state: write::State::new(writer, Vec::new()),
5252
codec: self.codec,
5353
framer: self.framer,
5454
types: PhantomData,
@@ -57,18 +57,27 @@ impl<R, W, C, F, In, Out> Framed<R, W, C, F, In, Out> {
5757

5858
/// Change the codec of the `Framed` object.
5959
///
60-
/// This is useful when you have a duplex I/O type, e.g., a `TcpStream` or
61-
/// `File`, and you want [`Framed`] to implement both
62-
/// [`Sink`](futures_util::Sink) and [`Stream`](futures_util::Stream).
60+
/// This is useful when you have a duplex I/O type, e.g., a
61+
/// `compio::net::TcpStream` or `compio::fs::File`, and you want
62+
/// [`Framed`] to implement both [`Sink`](futures_util::Sink) and
63+
/// [`Stream`](futures_util::Stream).
64+
///
65+
/// Some types like the ones mentioned above are multiplexed by nature, so
66+
/// they implement the [`Splittable`] trait by themselves. For other types,
67+
/// you may want to wrap them in [`Split`] or [`UnsyncSplit`] first, which
68+
/// uses lock or `RefCell` under the hood.
69+
///
70+
/// [`Split`]: crate::util::split::Split
71+
/// [`UnsyncSplit`]: crate::util::split::UnsyncSplit
6372
pub fn with_duplex<Io: Splittable>(
6473
self,
6574
io: Io,
6675
) -> Framed<Io::ReadHalf, Io::WriteHalf, C, F, In, Out> {
6776
let (read_half, write_half) = io.split();
6877

6978
Framed {
70-
read_state: read::State::Idle(Some((read_half, Buffer::with_capacity(64)))),
71-
write_state: write::State::Idle(Some((write_half, Vec::new()))),
79+
read_state: read::State::new(read_half, Buffer::with_capacity(64)),
80+
write_state: write::State::new(write_half, Vec::new()),
7281
codec: self.codec,
7382
framer: self.framer,
7483
types: PhantomData,
@@ -81,8 +90,8 @@ impl<C, F> Framed<(), (), C, F, (), ()> {
8190
/// different input and output type.
8291
pub fn new<In, Out>(codec: C, framer: F) -> Framed<(), (), C, F, In, Out> {
8392
Framed {
84-
read_state: read::State::Idle(None),
85-
write_state: write::State::Idle(None),
93+
read_state: read::State::empty(),
94+
write_state: write::State::empty(),
8695
codec,
8796
framer,
8897
types: PhantomData,
@@ -93,8 +102,8 @@ impl<C, F> Framed<(), (), C, F, (), ()> {
93102
/// the same input and output type.
94103
pub fn symmetric<T>(codec: C, framer: F) -> Framed<(), (), C, F, T, T> {
95104
Framed {
96-
read_state: read::State::Idle(None),
97-
write_state: write::State::Idle(None),
105+
read_state: read::State::empty(),
106+
write_state: write::State::empty(),
98107
codec,
99108
framer,
100109
types: PhantomData,

compio-io/src/framed/read.rs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,32 @@ use compio_buf::BufResult;
77
use futures_util::Stream;
88

99
use super::*;
10-
use crate::{AsyncReadExt, PinBoxFuture, buffer::Buffer};
10+
use crate::{AsyncReadExt, PinBoxFuture, buffer::Buffer, framed::frame::Framer};
1111

1212
type ReadResult = BufResult<usize, Buffer>;
1313

14-
pub enum State<Io> {
14+
pub struct State<Io> {
15+
inner: StateInner<Io>,
16+
eof: bool,
17+
}
18+
19+
impl<Io> State<Io> {
20+
pub fn new(io: Io, buf: Buffer) -> Self {
21+
State {
22+
inner: StateInner::Idle(Some((io, buf))),
23+
eof: false,
24+
}
25+
}
26+
27+
pub fn empty() -> Self {
28+
State {
29+
inner: StateInner::Idle(None),
30+
eof: false,
31+
}
32+
}
33+
}
34+
35+
enum StateInner<Io> {
1536
Idle(Option<(Io, Buffer)>),
1637
Reading(PinBoxFuture<(Io, ReadResult)>),
1738
}
@@ -20,7 +41,7 @@ impl<R, W, C, F, In, Out> Stream for Framed<R, W, C, F, In, Out>
2041
where
2142
R: AsyncRead + 'static,
2243
C: Decoder<Out>,
23-
F: frame::Framer,
44+
F: Framer,
2445
Self: Unpin,
2546
{
2647
type Item = Result<Out, C::Error>;
@@ -29,20 +50,21 @@ where
2950
let this = self.get_mut();
3051

3152
loop {
32-
match &mut this.read_state {
33-
State::Idle(idle) => {
53+
match &mut this.read_state.inner {
54+
StateInner::Idle(idle) => {
3455
let (mut io, mut buf) = idle.take().expect("Inconsistent state");
56+
let slice = buf.slice();
3557

3658
// First try decode from the buffer
37-
if let Some(frame) = this.framer.extract(buf.slice()) {
38-
let decoded = this.codec.decode(frame.payload(buf.slice()))?;
59+
if let Some(frame) = this.framer.extract(slice) {
60+
let decoded = this.codec.decode(frame.payload(slice))?;
3961
buf.advance(frame.len());
4062

4163
if buf.all_done() {
4264
buf.reset();
4365
}
4466

45-
this.read_state = State::Idle(Some((io, buf)));
67+
this.read_state.inner = StateInner::Idle(Some((io, buf)));
4668

4769
return Poll::Ready(Some(Ok(decoded)));
4870
}
@@ -54,12 +76,19 @@ where
5476
(io, BufResult(res, buf))
5577
});
5678

57-
this.read_state = State::Reading(fut)
79+
this.read_state.inner = StateInner::Reading(fut)
5880
}
59-
State::Reading(fut) => {
81+
StateInner::Reading(fut) => {
6082
let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
61-
this.read_state = State::Idle(Some((io, buf)));
62-
res?;
83+
this.read_state.inner = StateInner::Idle(Some((io, buf)));
84+
if res? == 0 {
85+
// It's the second time EOF is reached, return None
86+
if this.read_state.eof {
87+
return Poll::Ready(None);
88+
}
89+
90+
this.read_state.eof = true;
91+
}
6392
}
6493
};
6594
}

compio-io/src/framed/write.rs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,46 @@ pub enum State<Io> {
1919
}
2020

2121
impl<Io> State<Io> {
22+
pub fn new(io: Io, buf: Vec<u8>) -> Self {
23+
State::Idle(Some((io, buf)))
24+
}
25+
26+
pub fn empty() -> Self {
27+
State::Idle(None)
28+
}
29+
2230
fn take_idle(&mut self) -> (Io, Vec<u8>) {
2331
match self {
2432
State::Idle(idle) => idle.take().expect("Inconsistent state"),
2533
_ => unreachable!("`Framed` not in idle state"),
2634
}
2735
}
2836

29-
pub fn buf(&mut self) -> Option<&mut Vec<u8>> {
37+
fn buf(&mut self) -> Option<&mut Vec<u8>> {
3038
match self {
3139
State::Idle(Some((_, buf))) => Some(buf),
3240
_ => None,
3341
}
3442
}
3543

36-
pub fn start_flush(&mut self)
37-
where
38-
Io: AsyncWrite + 'static,
39-
{
44+
fn poll_sink(&mut self, cx: &mut std::task::Context<'_>) -> Poll<io::Result<()>> {
45+
let (io, res, buf) = match self {
46+
State::Writing(fut) => {
47+
let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
48+
(io, res, buf)
49+
}
50+
State::Closing(fut) | State::Flushing(fut) => ready!(fut.poll_unpin(cx)),
51+
State::Idle(_) => {
52+
return Poll::Ready(Ok(()));
53+
}
54+
};
55+
*self = State::Idle(Some((io, buf)));
56+
Poll::Ready(res)
57+
}
58+
}
59+
60+
impl<Io: AsyncWrite + 'static> State<Io> {
61+
fn start_flush(&mut self) {
4062
let (mut io, buf) = self.take_idle();
4163
let fut = Box::pin(async move {
4264
let res = io.flush().await;
@@ -45,10 +67,7 @@ impl<Io> State<Io> {
4567
*self = State::Flushing(fut);
4668
}
4769

48-
pub fn start_close(&mut self)
49-
where
50-
Io: AsyncWrite + 'static,
51-
{
70+
fn start_close(&mut self) {
5271
let (mut io, buf) = self.take_idle();
5372
let fut = Box::pin(async move {
5473
let res = io.shutdown().await;
@@ -57,33 +76,14 @@ impl<Io> State<Io> {
5776
*self = State::Closing(fut);
5877
}
5978

60-
pub fn start_write(&mut self)
61-
where
62-
Io: AsyncWrite + 'static,
63-
{
79+
fn start_write(&mut self) {
6480
let (mut io, buf) = self.take_idle();
6581
let fut = Box::pin(async move {
6682
let res = io.write_all(buf).await;
6783
(io, res)
6884
});
6985
*self = State::Writing(fut);
7086
}
71-
72-
/// State that may occur when `Framed` is acting as a [`Sink`].
73-
pub fn poll_sink(&mut self, cx: &mut std::task::Context<'_>) -> Poll<io::Result<()>> {
74-
let (io, res, buf) = match self {
75-
State::Writing(fut) => {
76-
let (io, BufResult(res, buf)) = ready!(fut.poll_unpin(cx));
77-
(io, res, buf)
78-
}
79-
State::Closing(fut) | State::Flushing(fut) => ready!(fut.poll_unpin(cx)),
80-
State::Idle(_) => {
81-
return Poll::Ready(Ok(()));
82-
}
83-
};
84-
*self = State::Idle(Some((io, buf)));
85-
Poll::Ready(res)
86-
}
8787
}
8888

8989
impl<R, W, C, F, In, Out> Sink<In> for Framed<R, W, C, F, In, Out>

0 commit comments

Comments
 (0)