Skip to content

Commit 927ebe6

Browse files
committed
feat: add a message repeater to the relay
1 parent a398346 commit 927ebe6

File tree

2 files changed

+111
-0
lines changed

2 files changed

+111
-0
lines changed

mithril-relay/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod commands;
55
/// Peer to peer module
66
pub mod p2p;
77
mod relay;
8+
mod repeater;
89

910
pub use commands::Args;
1011
pub use commands::RelayCommands;

mithril-relay/src/repeater.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use anyhow::anyhow;
2+
use mithril_common::StdResult;
3+
use slog_scope::debug;
4+
use std::{fmt::Debug, sync::Arc, time::Duration};
5+
use tokio::sync::{mpsc::UnboundedSender, Mutex};
6+
7+
/// A message repeater will send a message to a channel at a given delay
8+
pub struct MessageRepeater<M: Clone + Debug + Sync + Send + 'static> {
9+
message: Arc<Mutex<Option<M>>>,
10+
tx_message: UnboundedSender<M>,
11+
delay: Duration,
12+
}
13+
14+
impl<M: Clone + Debug + Sync + Send + 'static> MessageRepeater<M> {
15+
/// Factory for MessageRepeater
16+
pub fn new(tx_message: UnboundedSender<M>, delay: Duration) -> Self {
17+
Self {
18+
message: Arc::new(Mutex::new(None)),
19+
tx_message,
20+
delay,
21+
}
22+
}
23+
24+
/// Set the message to repeat
25+
pub async fn set_message(&self, message: M) {
26+
debug!("MessageRepeater: set message"; "message" => format!("{:#?}", message));
27+
*self.message.lock().await = Some(message);
28+
}
29+
30+
/// Start repeating the message if any
31+
pub async fn repeat_message(&self) -> StdResult<()> {
32+
tokio::time::sleep(self.delay).await;
33+
match self.message.lock().await.as_ref() {
34+
Some(message) => {
35+
debug!("MessageRepeater: repeat message"; "message" => format!("{:#?}", message));
36+
self.tx_message
37+
.send(message.clone())
38+
.map_err(|e| anyhow!(e))?
39+
}
40+
None => {
41+
debug!("MessageRepeater: no message to repeat");
42+
}
43+
}
44+
45+
Ok(())
46+
}
47+
}
48+
49+
#[cfg(test)]
50+
mod tests {
51+
use tokio::{sync::mpsc, time};
52+
53+
use super::*;
54+
55+
#[tokio::test]
56+
async fn should_repeat_message_when_exists() {
57+
let (tx, mut rx) = mpsc::unbounded_channel();
58+
let delay = Duration::from_millis(100);
59+
let repeater = MessageRepeater::new(tx, delay);
60+
61+
let message = "Hello, world!";
62+
repeater.set_message(message.to_string()).await;
63+
repeater.repeat_message().await.unwrap();
64+
65+
let received = rx.recv().await.unwrap();
66+
assert_eq!(message, received);
67+
}
68+
69+
#[tokio::test]
70+
async fn should_repeat_message_when_exists_with_expected_delay() {
71+
let (tx, _rx) = mpsc::unbounded_channel();
72+
let delay = Duration::from_secs(1);
73+
let repeater = MessageRepeater::new(tx, delay);
74+
75+
let message = "Hello, world!";
76+
repeater.set_message(message.to_string()).await;
77+
78+
let result = tokio::select! {
79+
_ = time::sleep(delay-Duration::from_millis(100)) => {Err(anyhow!("Timeout"))}
80+
_ = repeater.repeat_message() => {Ok(())}
81+
};
82+
83+
result.expect_err("should have timed out");
84+
}
85+
86+
#[tokio::test]
87+
async fn should_do_nothing_when_message_not_exists() {
88+
let (tx, rx) = mpsc::unbounded_channel::<String>();
89+
let delay = Duration::from_millis(100);
90+
let repeater = MessageRepeater::new(tx, delay);
91+
92+
repeater.repeat_message().await.unwrap();
93+
94+
assert!(rx.is_empty());
95+
}
96+
97+
#[tokio::test]
98+
async fn should_do_nothing_when_message_not_exists_with_expected_delay() {
99+
let (tx, _rx) = mpsc::unbounded_channel::<String>();
100+
let delay = Duration::from_secs(1);
101+
let repeater = MessageRepeater::new(tx, delay);
102+
103+
let result = tokio::select! {
104+
_ = time::sleep(delay-Duration::from_millis(100)) => {Err(anyhow!("Timeout"))}
105+
_ = repeater.repeat_message() => {Ok(())}
106+
};
107+
108+
result.expect_err("should have timed out");
109+
}
110+
}

0 commit comments

Comments
 (0)