Skip to content

Commit a524ac5

Browse files
committed
.
1 parent c92cf37 commit a524ac5

File tree

6 files changed

+158
-16
lines changed

6 files changed

+158
-16
lines changed

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "async-sse"
3-
version = "1.0.0"
3+
version = "1.0.1"
44
license = "MIT OR Apache-2.0"
55
repository = "https://github.com/{{USERNAME}}/async-sse"
66
documentation = "https://docs.rs/async-sse"
@@ -16,14 +16,15 @@ authors = [
1616
[features]
1717

1818
[dependencies]
19+
async-std = "1.5.0"
1920
http-types = "1.0.1"
20-
async-std = { version = "1.5.0", features = ["attributes"] }
2121
log = "0.4.8"
22-
pin-project-lite = "0.1.4"
2322
memchr = "2.3.3"
23+
pin-project-lite = "0.1.4"
2424

2525
[dev-dependencies]
2626
femme = "1.3.0"
27+
async-std = { version = "1.5.0", features = ["attributes"] }
2728

2829
[patch.crates-io]
2930
async-std = { path = "../async-std" }

src/encoder.rs

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,71 @@
1+
use async_std::io::prelude::*;
12
use async_std::io::Write as AsyncWrite;
23
use std::io;
34
use std::time::Duration;
45

5-
use crate::Message;
6-
76
/// An SSE protocol encoder.
87
#[derive(Debug)]
9-
pub struct Encoder;
8+
pub struct Encoder<W> {
9+
writer: W,
10+
}
1011

1112
/// Encode a new SSE connection.
12-
pub fn encode(_s: impl AsyncWrite) -> Encoder {
13-
todo!();
13+
pub fn encode<W: AsyncWrite + Unpin>(writer: W) -> Encoder<W> {
14+
Encoder { writer }
1415
}
1516

16-
impl Encoder {
17+
impl<W> Encoder<W> {
18+
/// Access the inner writer from the Encoder.
19+
pub fn into_writer(self) -> W {
20+
self.writer
21+
}
22+
}
23+
24+
impl<W: AsyncWrite + Unpin> Encoder<W> {
1725
/// Send a new message over SSE.
18-
pub fn send(&self, _msg: Message) -> io::Result<()> {
19-
todo!();
26+
pub async fn send(&mut self, name: &str, data: &[u8], id: Option<&str>) -> io::Result<()> {
27+
// Write the event name
28+
self.writer.write_all(b"event:").await?;
29+
self.writer.write_all(name.as_bytes()).await?;
30+
self.writer.write_all(b"\n").await?;
31+
32+
// Write the id
33+
if let Some(id) = id {
34+
self.writer.write_all(b"id:").await?;
35+
self.writer.write_all(id.as_bytes()).await?;
36+
self.writer.write_all(b"\n").await?;
37+
}
38+
39+
// Write the section
40+
self.writer.write_all(b"data:").await?;
41+
self.writer.write_all(data).await?;
42+
self.writer.write_all(b"\n").await?;
43+
44+
// Finalize the message
45+
self.writer.write_all(b"\n").await?;
46+
47+
Ok(())
2048
}
49+
2150
/// Send a new "retry" message over SSE.
22-
pub fn send_retry(&self, _dur: Duration) -> io::Result<()> {
23-
todo!();
51+
pub async fn send_retry(&mut self, dur: Duration, id: Option<&str>) -> io::Result<()> {
52+
// Write the id
53+
if let Some(id) = id {
54+
self.writer.write_all(b"id:").await?;
55+
self.writer.write_all(id.as_bytes()).await?;
56+
self.writer.write_all(b"\n").await?;
57+
}
58+
59+
// Write the section
60+
self.writer.write_all(b"retry:").await?;
61+
self.writer
62+
.write_all(&format!("{}", dur.as_secs_f64() as u64).as_bytes())
63+
.await?;
64+
self.writer.write_all(b"\n").await?;
65+
66+
// Finalize the message
67+
self.writer.write_all(b"\n").await?;
68+
69+
Ok(())
2470
}
2571
}

src/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::Message;
22

33
use std::time::Duration;
44

5-
/// Event kind.
5+
/// The kind of event sent.
66
#[derive(Debug, Eq, PartialEq)]
77
pub enum Event {
88
/// A retry frame, signaling a new retry duration must be used..

src/lib.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,29 @@
11
//! Async Server Sent Event parser and encoder.
22
//!
3-
//! ## References
3+
//! # Example
4+
//!
5+
//! ```
6+
//! use async_sse::{decode, encode, Event};
7+
//! use async_std::io::Cursor;
8+
//! use async_std::prelude::*;
9+
//!
10+
//! #[async_std::main]
11+
//! async fn main() -> http_types::Result<()> {
12+
//! let buf = Cursor::new(vec![]);
13+
//! let mut encoder = encode(buf);
14+
//! encoder.send("cat", b"chashu", None).await?;
15+
//!
16+
//! let mut buf = encoder.into_writer();
17+
//! buf.set_position(0);
18+
//!
19+
//! let mut reader = decode(buf);
20+
//! let event: Event = reader.next().await.unwrap()?;
21+
//! # let _ = event;
22+
//! Ok(())
23+
//! }
24+
//! ```
25+
//!
26+
//! # References
427
//!
528
//! - [SSE Spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#concept-event-stream-last-event-id)
629
//! - [EventSource web platform tests](https://github.com/web-platform-tests/wpt/tree/master/eventsource)

src/message.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/// An SSE message.
1+
/// A data event.
22
#[derive(Debug, PartialEq, Eq, Hash)]
33
pub struct Message {
44
/// The ID of this event.
@@ -26,4 +26,9 @@ impl Message {
2626
pub fn data(&self) -> &[u8] {
2727
&self.data
2828
}
29+
30+
/// Convert the message into the data payload.
31+
pub fn into_bytes(self) -> Vec<u8> {
32+
self.data
33+
}
2934
}

tests/encode.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use async_sse::{decode, encode, Event};
2+
use async_std::io::Cursor;
3+
use async_std::prelude::*;
4+
use std::time::Duration;
5+
6+
/// Assert a Message.
7+
fn assert_message(event: &Event, name: &str, data: &str, id: Option<&'static str>) {
8+
assert!(event.is_message());
9+
if let Event::Message(msg) = event {
10+
assert_eq!(msg.id(), &id.map(|s| s.to_owned()));
11+
assert_eq!(msg.name(), name);
12+
assert_eq!(
13+
String::from_utf8(msg.data().to_owned()).unwrap(),
14+
String::from_utf8(data.as_bytes().to_owned()).unwrap()
15+
);
16+
}
17+
}
18+
19+
/// Assert a Message.
20+
fn assert_retry(event: &Event, dur: u64) {
21+
assert!(event.is_retry());
22+
let expected = Duration::from_secs_f64(dur as f64);
23+
if let Event::Retry(dur) = event {
24+
assert_eq!(dur, &expected);
25+
}
26+
}
27+
#[async_std::test]
28+
async fn encode_message() -> http_types::Result<()> {
29+
let buf = Cursor::new(vec![]);
30+
let mut encoder = encode(buf);
31+
encoder.send("cat", b"chashu", None).await?;
32+
let mut buf = encoder.into_writer();
33+
buf.set_position(0);
34+
35+
let mut reader = decode(buf);
36+
let event = reader.next().await.unwrap()?;
37+
assert_message(&event, "cat", "chashu", None);
38+
Ok(())
39+
}
40+
41+
#[async_std::test]
42+
async fn encode_message_with_id() -> http_types::Result<()> {
43+
let buf = Cursor::new(vec![]);
44+
let mut encoder = encode(buf);
45+
encoder.send("cat", b"chashu", Some("0")).await?;
46+
let mut buf = encoder.into_writer();
47+
buf.set_position(0);
48+
49+
let mut reader = decode(buf);
50+
let event = reader.next().await.unwrap()?;
51+
assert_message(&event, "cat", "chashu", Some("0"));
52+
Ok(())
53+
}
54+
55+
#[async_std::test]
56+
async fn encode_retry() -> http_types::Result<()> {
57+
let buf = Cursor::new(vec![]);
58+
let mut encoder = encode(buf);
59+
encoder.send_retry(Duration::from_secs(12), None).await?;
60+
let mut buf = encoder.into_writer();
61+
buf.set_position(0);
62+
63+
let mut reader = decode(buf);
64+
let event = reader.next().await.unwrap()?;
65+
assert_retry(&event, 12);
66+
Ok(())
67+
}

0 commit comments

Comments
 (0)