diff --git a/Cargo.toml b/Cargo.toml index 051c4c2..4353cab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,21 +9,18 @@ readme = "README.md" edition = "2018" keywords = ["async", "server-sent-events", "sse", "client", "server"] categories = [] -authors = [ - "Renée Kooi ", - "Yoshua Wuyts ", -] +authors = ["Renée Kooi ", "Yoshua Wuyts "] [features] [dependencies] -futures-lite = "1.11.3" -http-types = { version = "2.10.0", default-features = false } -log = "0.4.8" -memchr = "2.3.3" -pin-project-lite = "0.2.7" -async-channel = "1.1.1" +futures-lite = "2.6.0" +http-types = { version = "2.12.0", default-features = false } +log = "0.4.27" +memchr = "2.7.5" +pin-project-lite = "0.2.16" +async-channel = "2.5.0" [dev-dependencies] -femme = "2.0.0" -async-std = { version = "1.6.0", features = ["attributes", "unstable"] } +femme = "2.2.1" +async-std = { version = "1.13.1", features = ["attributes", "unstable"] } diff --git a/src/decoder.rs b/src/decoder.rs index 481a86a..a4c347c 100644 --- a/src/decoder.rs +++ b/src/decoder.rs @@ -15,7 +15,6 @@ where Decoder { lines: Lines::new(reader), processed_bom: false, - buffer: vec![], last_event_id: None, event_type: None, data: vec![], @@ -29,9 +28,6 @@ pub struct Decoder { lines: Lines, /// Have we processed the optional Byte Order Marker on the first line? processed_bom: bool, - /// Was the last character of the previous line a \r? - /// Bytes that were fed to the decoder but do not yet form a message. - buffer: Vec, /// The _last event ID_ buffer. last_event_id: Option, /// The _event type_ buffer. @@ -49,11 +45,11 @@ impl Decoder { None } else { // Removing tailing newlines - if self.data.ends_with(&[b'\n']) { + if self.data.ends_with(b"\n") { self.data.pop(); } let name = self.event_type.take().unwrap_or("message".to_string()); - let data = std::mem::replace(&mut self.data, vec![]); + let data = std::mem::take(&mut self.data); // The _last event ID_ buffer persists between messages. let id = self.last_event_id.clone(); Some(Event::new_msg(name, data, id)) @@ -83,7 +79,7 @@ impl Stream for Decoder { &line }; - log::trace!("> new line: {:?}", line); + log::trace!("> new line: {line}"); let mut parts = line.splitn(2, ':'); loop { match (parts.next(), parts.next()) { @@ -105,7 +101,7 @@ impl Stream for Decoder { } // If the field name is "data": (Some("data"), value) => { - log::trace!("> data: {:?}", &value); + log::trace!("> data: {value:?}"); // Append the field value to the data buffer, if let Some(value) = value { self.data.extend(strip_leading_space_b(value.as_bytes())); @@ -122,13 +118,13 @@ impl Stream for Decoder { // return Poll::Ready(Ok(self.take_message()).transpose()); } // Comment - (Some(""), Some(_)) => (log::trace!("> comment")), + (Some(""), Some(_)) => log::trace!("> comment"), // End of frame (Some(""), None) => { log::trace!("> end of frame"); match self.take_message() { Some(event) => { - log::trace!("> end of frame [event]: {:?}", event); + log::trace!("> end of frame [event]: {event:?}"); return Poll::Ready(Some(Ok(event))); } None => { @@ -149,15 +145,15 @@ impl Stream for Decoder { /// Remove a leading space (code point 0x20) from a string slice. fn strip_leading_space(input: &str) -> &str { if input.starts_with(' ') { - &input[1..] + input.strip_prefix(' ').unwrap() } else { input } } fn strip_leading_space_b(input: &[u8]) -> &[u8] { - if input.starts_with(&[b' ']) { - &input[1..] + if input.starts_with(b" ") { + input.strip_prefix(b" ").unwrap() } else { input } diff --git a/src/encoder.rs b/src/encoder.rs index 6528527..6dcb00b 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -6,7 +6,9 @@ use std::io; use std::pin::Pin; use std::time::Duration; -pin_project_lite::pin_project! { +use pin_project_lite::pin_project; + +pin_project! { /// An SSE protocol encoder. #[derive(Debug)] pub struct Encoder { @@ -24,12 +26,13 @@ impl AsyncRead for Encoder { buf: &mut [u8], ) -> Poll> { let mut this = self.project(); + // Request a new buffer if current one is exhausted. if this.buf.len() <= *this.cursor { match ready!(this.receiver.as_mut().poll_next(cx)) { - Some(buf) => { - log::trace!("> Received a new buffer with len {}", buf.len()); - *this.buf = buf.into_boxed_slice(); + Some(new_buf) => { + log::trace!("> Received a new buffer with len {}", new_buf.len()); + *this.buf = new_buf.into_boxed_slice(); *this.cursor = 0; } None => { @@ -108,17 +111,17 @@ impl Sender { ) -> io::Result<()> { // Write the event name if let Some(name) = name.into() { - self.inner_send(format!("event:{}\n", name)).await?; + self.inner_send(format!("event:{name}\n")).await?; } // Write the id if let Some(id) = id { - self.inner_send(format!("id:{}\n", id)).await?; + self.inner_send(format!("id:{id}\n")).await?; } // Write the data section, and end. for line in data.lines() { - let msg = format!("data:{}\n", line); + let msg = format!("data:{line}\n"); self.inner_send(msg).await?; } self.inner_send("\n").await?; @@ -130,12 +133,12 @@ impl Sender { pub async fn send_retry(&self, dur: Duration, id: Option<&str>) -> io::Result<()> { // Write the id if let Some(id) = id { - self.inner_send(format!("id:{}\n", id)).await?; + self.inner_send(format!("id:{id}\n")).await?; } // Write the retry section, and end. let dur = dur.as_secs_f64() as u64; - let msg = format!("retry:{}\n\n", dur); + let msg = format!("retry:{dur}\n\n"); self.inner_send(msg).await?; Ok(()) } diff --git a/src/event.rs b/src/event.rs index 224164c..f8e0ac7 100644 --- a/src/event.rs +++ b/src/event.rs @@ -5,7 +5,7 @@ use std::time::Duration; /// The kind of SSE event sent. #[derive(Debug, Eq, PartialEq)] pub enum Event { - /// A retry frame, signaling a new retry duration must be used.. + /// A retry frame, signaling a new retry duration must be used. Retry(Duration), /// A data frame containing a message. Message(Message), @@ -22,19 +22,13 @@ impl Event { Self::Retry(Duration::from_secs_f64(dur as f64)) } - /// Check whether this is a Retry variant. + /// Check whether this is a `Retry` variant. pub fn is_retry(&self) -> bool { - match *self { - Self::Retry(_) => true, - _ => false, - } + matches!(*self, Self::Retry(_)) } /// Check whether this is a `Message` variant. pub fn is_message(&self) -> bool { - match *self { - Self::Message(_) => true, - _ => false, - } + matches!(*self, Self::Message(_)) } } diff --git a/src/lib.rs b/src/lib.rs index 5a28592..4890ff9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ //! }); //! //! // Decode messages using a decoder. -//! let mut reader = decode(BufReader::new(encoder)); +//! let mut reader = decode(BufReader::new(Box::pin(encoder))); //! let event = reader.next().await.unwrap()?; //! // Match and handle the event //! diff --git a/src/lines.rs b/src/lines.rs index 26a494d..0aaea7e 100644 --- a/src/lines.rs +++ b/src/lines.rs @@ -32,7 +32,7 @@ pin_project! { impl Lines { pub(crate) fn new(reader: R) -> Lines where - R: AsyncBufRead + Unpin + Sized, + R: AsyncBufRead + Sized, { Lines { reader, @@ -64,7 +64,7 @@ impl Stream for Lines { if this.buf.ends_with('\r') { this.buf.pop(); } - Poll::Ready(Some(Ok(mem::replace(this.buf, String::new())))) + Poll::Ready(Some(Ok(std::mem::take(this.buf)))) } } @@ -76,7 +76,7 @@ fn read_line_internal( read: &mut usize, ) -> Poll> { let ret = ready!(read_until_internal(reader, cx, bytes, read)); - if str::from_utf8(&bytes).is_err() { + if str::from_utf8(bytes).is_err() { Poll::Ready(ret.and_then(|_| { Err(io::Error::new( io::ErrorKind::InvalidData, diff --git a/tests/decode.rs b/tests/decode.rs index 83e67cb..63ae088 100644 --- a/tests/decode.rs +++ b/tests/decode.rs @@ -39,7 +39,7 @@ async fn decode_stream_when_fed_by_line() -> http_types::Result<()> { let reader = decode(Cursor::new(":ok\nevent:message\nid:id1\ndata:data1\n\n")); let res = reader.map(|i| i.unwrap()).collect::>().await; assert_eq!(res.len(), 1); - assert_message(res.get(0).unwrap(), "message", "data1", Some("id1")); + assert_message(res.first().unwrap(), "message", "data1", Some("id1")); Ok(()) } @@ -127,12 +127,12 @@ async fn comments() -> http_types::Result<()> { let longstring = "x".repeat(2049); let mut input = concat!("data:1\r", ":\0\n", ":\r\n", "data:2\n", ":").to_string(); input.push_str(&longstring); - input.push_str("\r"); + input.push('\r'); input.push_str("data:3\n"); input.push_str(":data:fail\r"); - input.push_str(":"); + input.push(':'); input.push_str(&longstring); - input.push_str("\n"); + input.push('\n'); input.push_str("data:4\n\n"); let mut reader = decode(Cursor::new(input)); assert_message( diff --git a/tests/encode.rs b/tests/encode.rs index 56b0149..eda2ca0 100644 --- a/tests/encode.rs +++ b/tests/encode.rs @@ -31,7 +31,7 @@ async fn encode_message() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send("cat", "chashu", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cat", "chashu", None); Ok(()) @@ -42,7 +42,7 @@ async fn encode_message_some() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send(Some("cat"), "chashu", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cat", "chashu", None); Ok(()) @@ -53,7 +53,7 @@ async fn encode_message_data_only() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send(None, "chashu", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "message", "chashu", None); Ok(()) @@ -64,7 +64,7 @@ async fn encode_message_with_id() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send("cat", "chashu", Some("0")).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cat", "chashu", Some("0")); Ok(()) @@ -75,7 +75,7 @@ async fn encode_message_data_only_with_id() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send(None, "chashu", Some("0")).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "message", "chashu", Some("0")); Ok(()) @@ -89,7 +89,7 @@ async fn encode_retry() -> http_types::Result<()> { sender.send_retry(dur, None).await.unwrap(); }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_retry(&event, 12); Ok(()) @@ -100,7 +100,7 @@ async fn encode_multiline_message() -> http_types::Result<()> { let (sender, encoder) = encode(); task::spawn(async move { sender.send("cats", "chashu\nnori", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cats", "chashu\nnori", None); Ok(()) @@ -112,7 +112,7 @@ async fn dropping_encoder() -> http_types::Result<()> { let sender_clone = sender.clone(); task::spawn(async move { sender_clone.send("cat", "chashu", None).await }); - let mut reader = decode(BufReader::new(encoder)); + let mut reader = decode(BufReader::new(Box::pin(encoder))); let event = reader.next().await.unwrap()?; assert_message(&event, "cat", "chashu", None);