Skip to content

Commit 97eb20a

Browse files
committed
test: [#1485] add unit tests to events pkg
1 parent 5a087d0 commit 97eb20a

File tree

4 files changed

+134
-4
lines changed

4 files changed

+134
-4
lines changed

packages/events/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ version.workspace = true
1616

1717
[dependencies]
1818
futures = "0"
19-
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
19+
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] }
2020

2121
[dev-dependencies]
2222
mockall = "0"

packages/events/src/broadcaster.rs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ use crate::sender::{SendError, Sender};
77

88
const CHANNEL_CAPACITY: usize = 32768;
99

10-
/// An event sender implementation using a broadcast channel.
11-
#[derive(Clone)]
10+
/// An event sender and receiver implementation using a broadcast channel.
11+
#[derive(Clone, Debug)]
1212
pub struct Broadcaster<Event: Sync + Send + Clone> {
1313
pub(crate) sender: broadcast::Sender<Event>,
1414
}
1515

1616
impl<Event: Sync + Send + Clone> Default for Broadcaster<Event> {
1717
fn default() -> Self {
18-
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
18+
let (sender, _receiver) = broadcast::channel(CHANNEL_CAPACITY);
1919
Self { sender }
2020
}
2121
}
@@ -57,3 +57,61 @@ impl From<broadcast::error::RecvError> for RecvError {
5757
}
5858
}
5959
}
60+
61+
#[cfg(test)]
62+
mod tests {
63+
use tokio::time::{timeout, Duration};
64+
65+
use super::*;
66+
67+
#[tokio::test]
68+
async fn it_should_allow_sending_an_event_and_received_it() {
69+
let broadcaster = Broadcaster::<String>::default();
70+
71+
let mut receiver = broadcaster.subscribe();
72+
73+
let event = "test";
74+
75+
let _unused = broadcaster.send(event.to_owned()).await.unwrap().unwrap();
76+
77+
let received_event = receiver.recv().await.unwrap();
78+
79+
assert_eq!(received_event, event);
80+
}
81+
82+
#[tokio::test]
83+
async fn it_should_return_the_number_of_receivers_when_and_event_is_sent() {
84+
let broadcaster = Broadcaster::<String>::default();
85+
let mut _receiver = broadcaster.subscribe();
86+
87+
let number_of_receivers = broadcaster.send("test".into()).await;
88+
89+
assert!(matches!(number_of_receivers, Some(Ok(1))));
90+
}
91+
92+
#[tokio::test]
93+
async fn it_should_fail_when_trying_tos_send_with_no_subscribers() {
94+
let event = String::from("test");
95+
96+
let broadcaster = Broadcaster::<String>::default();
97+
98+
let result: Result<usize, SendError<String>> = broadcaster.send(event).await.unwrap();
99+
100+
assert!(matches!(result, Err(SendError::<String>(_event))));
101+
}
102+
103+
#[tokio::test]
104+
async fn it_should_allow_subscribing_multiple_receivers() {
105+
let broadcaster = Broadcaster::<u8>::default();
106+
let mut r1 = broadcaster.subscribe();
107+
let mut r2 = broadcaster.subscribe();
108+
109+
let _ = broadcaster.send(1).await;
110+
111+
let val1 = timeout(Duration::from_secs(1), r1.recv()).await.unwrap().unwrap();
112+
let val2 = timeout(Duration::from_secs(1), r2.recv()).await.unwrap().unwrap();
113+
114+
assert_eq!(val1, 1);
115+
assert_eq!(val2, 1);
116+
}
117+
}

packages/events/src/bus.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33
use crate::broadcaster::Broadcaster;
44
use crate::{receiver, sender};
55

6+
#[derive(Clone, Debug)]
67
pub struct EventBus<Event: Sync + Send + Clone + 'static> {
78
pub enable_sender: bool,
89
pub broadcaster: Broadcaster<Event>,
@@ -40,3 +41,61 @@ impl<Event: Sync + Send + Clone + 'static> EventBus<Event> {
4041
Box::new(self.broadcaster.subscribe())
4142
}
4243
}
44+
45+
#[cfg(test)]
46+
mod tests {
47+
use tokio::time::{timeout, Duration};
48+
49+
use super::*;
50+
51+
#[tokio::test]
52+
async fn it_should_provide_an_event_sender_when_enabled() {
53+
let bus = EventBus::<String>::new(true, Broadcaster::default());
54+
55+
assert!(bus.sender().is_some());
56+
}
57+
58+
#[tokio::test]
59+
async fn it_should_not_provide_event_sender_when_disabled() {
60+
let bus = EventBus::<String>::new(false, Broadcaster::default());
61+
62+
assert!(bus.sender().is_none());
63+
}
64+
65+
#[tokio::test]
66+
async fn it_should_enabled_by_default() {
67+
let bus = EventBus::<String>::default();
68+
69+
assert!(bus.sender().is_some());
70+
}
71+
72+
#[tokio::test]
73+
async fn it_should_allow_sending_events_that_are_received_by_receivers() {
74+
let bus = EventBus::<String>::default();
75+
let sender = bus.sender().unwrap();
76+
let mut receiver = bus.receiver();
77+
78+
let event = "hello".to_string();
79+
80+
let _unused = sender.send(event.clone()).await.unwrap().unwrap();
81+
82+
let result = timeout(Duration::from_secs(1), receiver.recv()).await;
83+
84+
assert_eq!(result.unwrap().unwrap(), event);
85+
}
86+
87+
#[tokio::test]
88+
async fn it_should_send_a_closed_events_to_receivers_when_sender_is_dropped() {
89+
let bus = EventBus::<String>::default();
90+
91+
let mut receiver = bus.receiver();
92+
93+
let future = receiver.recv();
94+
95+
drop(bus); // explicitly drop sender
96+
97+
let result = timeout(Duration::from_secs(1), future).await;
98+
99+
assert!(matches!(result.unwrap(), Err(crate::receiver::RecvError::Closed)));
100+
}
101+
}

packages/events/src/sender.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,19 @@ use mockall::{automock, predicate::str};
99
pub trait Sender: Sync + Send {
1010
type Event: Send + Clone;
1111

12+
/// Sends an event to all active receivers.
13+
///
14+
/// Returns a future that resolves to an `Option<Result<usize, SendError<Self::Event>>>`:
15+
///
16+
/// - `Some(Ok(n))` — the event was successfully sent to `n` receivers.
17+
/// - `Some(Err(e))` — an error occurred while sending the event.
18+
/// - `None` — the sender is inactive or disconnected, and the event was not sent.
19+
///
20+
/// The `Option` allows implementations to express cases where sending is not possible
21+
/// (e.g., when the sender is disabled or there are no active receivers).
22+
///
23+
/// The `usize` typically represents the number of receivers the message was delivered to,
24+
/// but its semantics may vary depending on the concrete implementation.
1225
fn send(&self, event: Self::Event) -> BoxFuture<'_, Option<Result<usize, SendError<Self::Event>>>>;
1326
}
1427

0 commit comments

Comments
 (0)