Skip to content

Commit c3c5f37

Browse files
committed
fix Encoder::poll_read
1 parent 50db988 commit c3c5f37

File tree

1 file changed

+13
-19
lines changed

1 file changed

+13
-19
lines changed

src/encoder.rs

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,46 +10,40 @@ pin_project_lite::pin_project! {
1010
/// An SSE protocol encoder.
1111
#[derive(Debug)]
1212
pub struct Encoder {
13-
buf: Option<Vec<u8>>,
13+
buf: Box<[u8]>,
14+
cursor: usize,
1415
#[pin]
1516
receiver: async_channel::Receiver<Vec<u8>>,
16-
cursor: usize,
1717
}
1818
}
1919

2020
impl AsyncRead for Encoder {
2121
fn poll_read(
22-
mut self: Pin<&mut Self>,
22+
self: Pin<&mut Self>,
2323
cx: &mut Context<'_>,
2424
buf: &mut [u8],
2525
) -> Poll<io::Result<usize>> {
26-
// Request a new buffer if we don't have one yet.
27-
if let None = self.buf {
28-
self.buf = match ready!(Pin::new(&mut self.receiver).poll_next(cx)) {
26+
let mut this = self.project();
27+
// Request a new buffer if current one is exhausted.
28+
if this.buf.len() <= *this.cursor {
29+
match ready!(this.receiver.as_mut().poll_next(cx)) {
2930
Some(buf) => {
3031
log::trace!("> Received a new buffer with len {}", buf.len());
31-
Some(buf)
32+
*this.buf = buf.into_boxed_slice();
33+
*this.cursor = 0;
3234
}
3335
None => {
3436
log::trace!("> Encoder done reading");
3537
return Poll::Ready(Ok(0));
3638
}
3739
};
38-
};
40+
}
3941

4042
// Write the current buffer to completion.
41-
let local_buf = self.buf.as_mut().unwrap();
42-
let local_len = local_buf.len();
43+
let local_buf = &this.buf[*this.cursor..];
4344
let max = buf.len().min(local_buf.len());
4445
buf[..max].clone_from_slice(&local_buf[..max]);
45-
46-
self.cursor += max;
47-
48-
// Reset values if we're done reading.
49-
if self.cursor == local_len {
50-
self.buf = None;
51-
self.cursor = 0;
52-
};
46+
*this.cursor += max;
5347

5448
// Return bytes read.
5549
Poll::Ready(Ok(max))
@@ -86,7 +80,7 @@ pub fn encode() -> (Sender, Encoder) {
8680
let (sender, receiver) = async_channel::bounded(1);
8781
let encoder = Encoder {
8882
receiver,
89-
buf: None,
83+
buf: Box::default(),
9084
cursor: 0,
9185
};
9286
(Sender(sender), encoder)

0 commit comments

Comments
 (0)