Skip to content

Commit e3703c1

Browse files
committed
feat: [torrust#1485] add Receiver trait to events package
1 parent d4343c0 commit e3703c1

File tree

13 files changed

+58
-26
lines changed

13 files changed

+58
-26
lines changed

packages/events/src/broadcaster.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use futures::future::BoxFuture;
22
use futures::FutureExt;
3-
use tokio::sync::broadcast::error::SendError;
3+
use tokio::sync::broadcast::error::{RecvError, SendError};
44
use tokio::sync::broadcast::{self};
55

6+
use crate::receiver::Receiver;
67
use crate::sender::Sender;
78

89
const CHANNEL_CAPACITY: usize = 32768;
@@ -13,14 +14,6 @@ pub struct Broadcaster<E: Sync + Send + Clone> {
1314
pub(crate) sender: broadcast::Sender<E>,
1415
}
1516

16-
impl<E: Sync + Send + Clone> Sender for Broadcaster<E> {
17-
type Event = E;
18-
19-
fn send_event(&self, event: E) -> BoxFuture<'_, Option<Result<usize, SendError<E>>>> {
20-
async move { Some(self.sender.send(event)) }.boxed()
21-
}
22-
}
23-
2417
impl<E: Sync + Send + Clone> Default for Broadcaster<E> {
2518
fn default() -> Self {
2619
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
@@ -34,3 +27,19 @@ impl<E: Sync + Send + Clone> Broadcaster<E> {
3427
self.sender.subscribe()
3528
}
3629
}
30+
31+
impl<E: Sync + Send + Clone> Sender for Broadcaster<E> {
32+
type Event = E;
33+
34+
fn send_event(&self, event: E) -> BoxFuture<'_, Option<Result<usize, SendError<E>>>> {
35+
async move { Some(self.sender.send(event)) }.boxed()
36+
}
37+
}
38+
39+
impl<E: Sync + Send + Clone> Receiver for broadcast::Receiver<E> {
40+
type Event = E;
41+
42+
fn recv(&mut self) -> BoxFuture<'_, Result<Self::Event, RecvError>> {
43+
async move { self.recv().await }.boxed()
44+
}
45+
}

packages/events/src/bus.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use std::sync::Arc;
22

3-
use tokio::sync::broadcast::Receiver;
4-
53
use crate::broadcaster::Broadcaster;
6-
use crate::sender;
4+
use crate::{receiver, sender};
75

86
pub struct EventBus<E: Sync + Send + Clone + 'static> {
97
pub enable_sender: bool,
@@ -38,7 +36,7 @@ impl<E: Sync + Send + Clone + 'static> EventBus<E> {
3836
}
3937

4038
#[must_use]
41-
pub fn receiver(&self) -> Receiver<E> {
42-
self.broadcaster.subscribe()
39+
pub fn receiver(&self) -> Box<dyn receiver::Receiver<Event = E>> {
40+
Box::new(self.broadcaster.subscribe())
4341
}
4442
}

packages/events/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod broadcaster;
22
pub mod bus;
3+
pub mod receiver;
34
pub mod sender;
45

56
/// Target for tracing crate logs.

packages/events/src/receiver.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
use futures::future::BoxFuture;
2+
#[cfg(test)]
3+
use mockall::{automock, predicate::str};
4+
use tokio::sync::broadcast::error::RecvError;
5+
6+
/// A trait for receiving events.
7+
#[cfg_attr(test, automock(type Event=();))]
8+
pub trait Receiver: Sync + Send {
9+
type Event: Send + Clone;
10+
11+
fn recv(&mut self) -> BoxFuture<'_, Result<Self::Event, RecvError>>;
12+
}

packages/http-tracker-core/src/event/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod bus;
2+
pub mod receiver;
23
pub mod sender;
34

45
use std::net::{IpAddr, SocketAddr};
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
use super::Event;
2+
3+
pub type Receiver = Box<dyn torrust_tracker_events::receiver::Receiver<Event = Event>>;

packages/http-tracker-core/src/statistics/event/listener.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
use std::sync::Arc;
22

3-
use tokio::sync::broadcast::{self, Receiver};
3+
use tokio::sync::broadcast::{self};
44
use tokio::task::JoinHandle;
55
use torrust_tracker_clock::clock::Time;
66

77
use super::handler::handle_event;
8-
use crate::event::Event;
8+
use crate::event::receiver::Receiver;
99
use crate::statistics::repository::Repository;
1010
use crate::{CurrentClock, HTTP_TRACKER_LOG_TARGET};
1111

1212
#[must_use]
13-
pub fn run_event_listener(receiver: Receiver<Event>, repository: &Arc<Repository>) -> JoinHandle<()> {
13+
pub fn run_event_listener(receiver: Receiver, repository: &Arc<Repository>) -> JoinHandle<()> {
1414
let stats_repository = repository.clone();
1515

1616
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Starting HTTP tracker core event listener");
@@ -22,7 +22,7 @@ pub fn run_event_listener(receiver: Receiver<Event>, repository: &Arc<Repository
2222
})
2323
}
2424

25-
async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Arc<Repository>) {
25+
async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc<Repository>) {
2626
loop {
2727
match receiver.recv().await {
2828
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,

packages/udp-tracker-core/src/event/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod bus;
2+
pub mod receiver;
23
pub mod sender;
34

45
use std::net::SocketAddr;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
use super::Event;
2+
3+
pub type Receiver = Box<dyn torrust_tracker_events::receiver::Receiver<Event = Event>>;

packages/udp-tracker-core/src/statistics/event/listener.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
use std::sync::Arc;
22

3-
use tokio::sync::broadcast::{self, Receiver};
3+
use tokio::sync::broadcast::{self};
44
use tokio::task::JoinHandle;
55
use torrust_tracker_clock::clock::Time;
66

77
use super::handler::handle_event;
8-
use crate::event::Event;
8+
use crate::event::receiver::Receiver;
99
use crate::statistics::repository::Repository;
1010
use crate::{CurrentClock, UDP_TRACKER_LOG_TARGET};
1111

1212
#[must_use]
13-
pub fn run_event_listener(receiver: Receiver<Event>, repository: &Arc<Repository>) -> JoinHandle<()> {
13+
pub fn run_event_listener(receiver: Receiver, repository: &Arc<Repository>) -> JoinHandle<()> {
1414
let stats_repository = repository.clone();
1515

1616
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting UDP tracker core event listener");
@@ -22,7 +22,7 @@ pub fn run_event_listener(receiver: Receiver<Event>, repository: &Arc<Repository
2222
})
2323
}
2424

25-
async fn dispatch_events(mut receiver: broadcast::Receiver<Event>, stats_repository: Arc<Repository>) {
25+
async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc<Repository>) {
2626
loop {
2727
match receiver.recv().await {
2828
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,

0 commit comments

Comments
 (0)