Skip to content

Upgrade dependencies #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,18 @@ readme = "README.md"
edition = "2018"
keywords = ["async", "server-sent-events", "sse", "client", "server"]
categories = []
authors = [
"Renée Kooi <[email protected]>",
"Yoshua Wuyts <[email protected]>",
]
authors = ["Renée Kooi <[email protected]>", "Yoshua Wuyts <[email protected]>"]

[features]

[dependencies]
futures-lite = "1.11.3"
http-types = { version = "2.10.0", default-features = false }
log = "0.4.8"
memchr = "2.3.3"
pin-project-lite = "0.2.7"
async-channel = "1.1.1"
futures-lite = "2.6.0"
http-types = { version = "2.12.0", default-features = false }
log = "0.4.27"
memchr = "2.7.5"
pin-project-lite = "0.2.16"
async-channel = "2.5.0"

[dev-dependencies]
femme = "2.0.0"
async-std = { version = "1.6.0", features = ["attributes", "unstable"] }
femme = "2.2.1"
async-std = { version = "1.13.1", features = ["attributes", "unstable"] }
22 changes: 9 additions & 13 deletions src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ where
Decoder {
lines: Lines::new(reader),
processed_bom: false,
buffer: vec![],
last_event_id: None,
event_type: None,
data: vec![],
Expand All @@ -29,9 +28,6 @@ pub struct Decoder<R: AsyncBufRead + Unpin> {
lines: Lines<R>,
/// Have we processed the optional Byte Order Marker on the first line?
processed_bom: bool,
/// Was the last character of the previous line a \r?
/// Bytes that were fed to the decoder but do not yet form a message.
buffer: Vec<u8>,
Comment on lines -32 to -34
Copy link
Author

@vyorkin vyorkin Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find any usage of this buffer field

/// The _last event ID_ buffer.
last_event_id: Option<String>,
/// The _event type_ buffer.
Expand All @@ -49,11 +45,11 @@ impl<R: AsyncBufRead + Unpin> Decoder<R> {
None
} else {
// Removing tailing newlines
if self.data.ends_with(&[b'\n']) {
if self.data.ends_with(b"\n") {
self.data.pop();
}
let name = self.event_type.take().unwrap_or("message".to_string());
let data = std::mem::replace(&mut self.data, vec![]);
let data = std::mem::take(&mut self.data);
// The _last event ID_ buffer persists between messages.
let id = self.last_event_id.clone();
Some(Event::new_msg(name, data, id))
Expand Down Expand Up @@ -83,7 +79,7 @@ impl<R: AsyncBufRead + Unpin> Stream for Decoder<R> {
&line
};

log::trace!("> new line: {:?}", line);
log::trace!("> new line: {line}");
let mut parts = line.splitn(2, ':');
loop {
match (parts.next(), parts.next()) {
Expand All @@ -105,7 +101,7 @@ impl<R: AsyncBufRead + Unpin> Stream for Decoder<R> {
}
// If the field name is "data":
(Some("data"), value) => {
log::trace!("> data: {:?}", &value);
log::trace!("> data: {value:?}");
// Append the field value to the data buffer,
if let Some(value) = value {
self.data.extend(strip_leading_space_b(value.as_bytes()));
Expand All @@ -122,13 +118,13 @@ impl<R: AsyncBufRead + Unpin> Stream for Decoder<R> {
// return Poll::Ready(Ok(self.take_message()).transpose());
}
// Comment
(Some(""), Some(_)) => (log::trace!("> comment")),
(Some(""), Some(_)) => log::trace!("> comment"),
// End of frame
(Some(""), None) => {
log::trace!("> end of frame");
match self.take_message() {
Some(event) => {
log::trace!("> end of frame [event]: {:?}", event);
log::trace!("> end of frame [event]: {event:?}");
return Poll::Ready(Some(Ok(event)));
}
None => {
Expand All @@ -149,15 +145,15 @@ impl<R: AsyncBufRead + Unpin> Stream for Decoder<R> {
/// Remove a leading space (code point 0x20) from a string slice.
fn strip_leading_space(input: &str) -> &str {
if input.starts_with(' ') {
&input[1..]
input.strip_prefix(' ').unwrap()
} else {
input
}
}

fn strip_leading_space_b(input: &[u8]) -> &[u8] {
if input.starts_with(&[b' ']) {
&input[1..]
if input.starts_with(b" ") {
input.strip_prefix(b" ").unwrap()
} else {
input
}
Expand Down
21 changes: 12 additions & 9 deletions src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::io;
use std::pin::Pin;
use std::time::Duration;

pin_project_lite::pin_project! {
use pin_project_lite::pin_project;

pin_project! {
/// An SSE protocol encoder.
#[derive(Debug)]
pub struct Encoder {
Expand All @@ -24,12 +26,13 @@ impl AsyncRead for Encoder {
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut this = self.project();

// Request a new buffer if current one is exhausted.
if this.buf.len() <= *this.cursor {
match ready!(this.receiver.as_mut().poll_next(cx)) {
Some(buf) => {
log::trace!("> Received a new buffer with len {}", buf.len());
*this.buf = buf.into_boxed_slice();
Some(new_buf) => {
log::trace!("> Received a new buffer with len {}", new_buf.len());
*this.buf = new_buf.into_boxed_slice();
*this.cursor = 0;
}
None => {
Expand Down Expand Up @@ -108,17 +111,17 @@ impl Sender {
) -> io::Result<()> {
// Write the event name
if let Some(name) = name.into() {
self.inner_send(format!("event:{}\n", name)).await?;
self.inner_send(format!("event:{name}\n")).await?;
}

// Write the id
if let Some(id) = id {
self.inner_send(format!("id:{}\n", id)).await?;
self.inner_send(format!("id:{id}\n")).await?;
}

// Write the data section, and end.
for line in data.lines() {
let msg = format!("data:{}\n", line);
let msg = format!("data:{line}\n");
self.inner_send(msg).await?;
}
self.inner_send("\n").await?;
Expand All @@ -130,12 +133,12 @@ impl Sender {
pub async fn send_retry(&self, dur: Duration, id: Option<&str>) -> io::Result<()> {
// Write the id
if let Some(id) = id {
self.inner_send(format!("id:{}\n", id)).await?;
self.inner_send(format!("id:{id}\n")).await?;
}

// Write the retry section, and end.
let dur = dur.as_secs_f64() as u64;
let msg = format!("retry:{}\n\n", dur);
let msg = format!("retry:{dur}\n\n");
self.inner_send(msg).await?;
Ok(())
}
Expand Down
14 changes: 4 additions & 10 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;
/// The kind of SSE event sent.
#[derive(Debug, Eq, PartialEq)]
pub enum Event {
/// A retry frame, signaling a new retry duration must be used..
/// A retry frame, signaling a new retry duration must be used.
Retry(Duration),
/// A data frame containing a message.
Message(Message),
Expand All @@ -22,19 +22,13 @@ impl Event {
Self::Retry(Duration::from_secs_f64(dur as f64))
}

/// Check whether this is a Retry variant.
/// Check whether this is a `Retry` variant.
pub fn is_retry(&self) -> bool {
match *self {
Self::Retry(_) => true,
_ => false,
}
matches!(*self, Self::Retry(_))
}

/// Check whether this is a `Message` variant.
pub fn is_message(&self) -> bool {
match *self {
Self::Message(_) => true,
_ => false,
}
matches!(*self, Self::Message(_))
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! });
//!
//! // Decode messages using a decoder.
//! let mut reader = decode(BufReader::new(encoder));
//! let mut reader = decode(BufReader::new(Box::pin(encoder)));
//! let event = reader.next().await.unwrap()?;
//! // Match and handle the event
//!
Expand Down
6 changes: 3 additions & 3 deletions src/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pin_project! {
impl<R> Lines<R> {
pub(crate) fn new(reader: R) -> Lines<R>
where
R: AsyncBufRead + Unpin + Sized,
R: AsyncBufRead + Sized,
Copy link
Author

@vyorkin vyorkin Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed Unpin but I'm not sure if this is correct.
Does reader: R actually need to implement Unpin...?
My understanding of pinning is quite limited :(

{
Lines {
reader,
Expand Down Expand Up @@ -64,7 +64,7 @@ impl<R: AsyncBufRead> Stream for Lines<R> {
if this.buf.ends_with('\r') {
this.buf.pop();
}
Poll::Ready(Some(Ok(mem::replace(this.buf, String::new()))))
Poll::Ready(Some(Ok(std::mem::take(this.buf))))
Copy link
Author

@vyorkin vyorkin Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::mem::take is just more concise and idiomatic (clippy suggested this change):
since String implements Default as an empty string, mem::take should replace this.buf with an empty string

}
}

Expand All @@ -76,7 +76,7 @@ fn read_line_internal<R: AsyncBufRead + ?Sized>(
read: &mut usize,
) -> Poll<io::Result<usize>> {
let ret = ready!(read_until_internal(reader, cx, bytes, read));
if str::from_utf8(&bytes).is_err() {
if str::from_utf8(bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down
8 changes: 4 additions & 4 deletions tests/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn decode_stream_when_fed_by_line() -> http_types::Result<()> {
let reader = decode(Cursor::new(":ok\nevent:message\nid:id1\ndata:data1\n\n"));
let res = reader.map(|i| i.unwrap()).collect::<Vec<_>>().await;
assert_eq!(res.len(), 1);
assert_message(res.get(0).unwrap(), "message", "data1", Some("id1"));
assert_message(res.first().unwrap(), "message", "data1", Some("id1"));
Ok(())
}

Expand Down Expand Up @@ -127,12 +127,12 @@ async fn comments() -> http_types::Result<()> {
let longstring = "x".repeat(2049);
let mut input = concat!("data:1\r", ":\0\n", ":\r\n", "data:2\n", ":").to_string();
input.push_str(&longstring);
input.push_str("\r");
input.push('\r');
input.push_str("data:3\n");
input.push_str(":data:fail\r");
input.push_str(":");
input.push(':');
input.push_str(&longstring);
input.push_str("\n");
input.push('\n');
input.push_str("data:4\n\n");
let mut reader = decode(Cursor::new(input));
assert_message(
Expand Down
16 changes: 8 additions & 8 deletions tests/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn encode_message() -> http_types::Result<()> {
let (sender, encoder) = encode();
task::spawn(async move { sender.send("cat", "chashu", None).await });

let mut reader = decode(BufReader::new(encoder));
let mut reader = decode(BufReader::new(Box::pin(encoder)));
let event = reader.next().await.unwrap()?;
assert_message(&event, "cat", "chashu", None);
Ok(())
Expand All @@ -42,7 +42,7 @@ async fn encode_message_some() -> http_types::Result<()> {
let (sender, encoder) = encode();
task::spawn(async move { sender.send(Some("cat"), "chashu", None).await });

let mut reader = decode(BufReader::new(encoder));
let mut reader = decode(BufReader::new(Box::pin(encoder)));
let event = reader.next().await.unwrap()?;
assert_message(&event, "cat", "chashu", None);
Ok(())
Expand All @@ -53,7 +53,7 @@ async fn encode_message_data_only() -> http_types::Result<()> {
let (sender, encoder) = encode();
task::spawn(async move { sender.send(None, "chashu", None).await });

let mut reader = decode(BufReader::new(encoder));
let mut reader = decode(BufReader::new(Box::pin(encoder)));
let event = reader.next().await.unwrap()?;
assert_message(&event, "message", "chashu", None);
Ok(())
Expand All @@ -64,7 +64,7 @@ async fn encode_message_with_id() -> http_types::Result<()> {
let (sender, encoder) = encode();
task::spawn(async move { sender.send("cat", "chashu", Some("0")).await });

let mut reader = decode(BufReader::new(encoder));
let mut reader = decode(BufReader::new(Box::pin(encoder)));
let event = reader.next().await.unwrap()?;
assert_message(&event, "cat", "chashu", Some("0"));
Ok(())
Expand All @@ -75,7 +75,7 @@ async fn encode_message_data_only_with_id() -> http_types::Result<()> {
let (sender, encoder) = encode();
task::spawn(async move { sender.send(None, "chashu", Some("0")).await });

let mut reader = decode(BufReader::new(encoder));
let mut reader = decode(BufReader::new(Box::pin(encoder)));
let event = reader.next().await.unwrap()?;
assert_message(&event, "message", "chashu", Some("0"));
Ok(())
Expand All @@ -89,7 +89,7 @@ async fn encode_retry() -> http_types::Result<()> {
sender.send_retry(dur, None).await.unwrap();
});

let mut reader = decode(BufReader::new(encoder));
let mut reader = decode(BufReader::new(Box::pin(encoder)));
let event = reader.next().await.unwrap()?;
assert_retry(&event, 12);
Ok(())
Expand All @@ -100,7 +100,7 @@ async fn encode_multiline_message() -> http_types::Result<()> {
let (sender, encoder) = encode();
task::spawn(async move { sender.send("cats", "chashu\nnori", None).await });

let mut reader = decode(BufReader::new(encoder));
let mut reader = decode(BufReader::new(Box::pin(encoder)));
let event = reader.next().await.unwrap()?;
assert_message(&event, "cats", "chashu\nnori", None);
Ok(())
Expand All @@ -112,7 +112,7 @@ async fn dropping_encoder() -> http_types::Result<()> {
let sender_clone = sender.clone();
task::spawn(async move { sender_clone.send("cat", "chashu", None).await });

let mut reader = decode(BufReader::new(encoder));
let mut reader = decode(BufReader::new(Box::pin(encoder)));
let event = reader.next().await.unwrap()?;
assert_message(&event, "cat", "chashu", None);

Expand Down