Skip to content

Commit be59eae

Browse files
committed
fix: better delay handling of message repeater
1 parent 927ebe6 commit be59eae

File tree

1 file changed

+20
-2
lines changed

1 file changed

+20
-2
lines changed

mithril-relay/src/repeater.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@ use anyhow::anyhow;
22
use mithril_common::StdResult;
33
use slog_scope::debug;
44
use std::{fmt::Debug, sync::Arc, time::Duration};
5-
use tokio::sync::{mpsc::UnboundedSender, Mutex};
5+
use tokio::{
6+
sync::{mpsc::UnboundedSender, Mutex},
7+
time::Instant,
8+
};
69

710
/// A message repeater will send a message to a channel at a given delay
811
pub struct MessageRepeater<M: Clone + Debug + Sync + Send + 'static> {
912
message: Arc<Mutex<Option<M>>>,
1013
tx_message: UnboundedSender<M>,
1114
delay: Duration,
15+
next_repeat_at: Arc<Mutex<Option<Instant>>>,
1216
}
1317

1418
impl<M: Clone + Debug + Sync + Send + 'static> MessageRepeater<M> {
@@ -18,18 +22,31 @@ impl<M: Clone + Debug + Sync + Send + 'static> MessageRepeater<M> {
1822
message: Arc::new(Mutex::new(None)),
1923
tx_message,
2024
delay,
25+
next_repeat_at: Arc::new(Mutex::new(None)),
2126
}
2227
}
2328

29+
async fn reset_next_repeat_at(&self) {
30+
debug!("MessageRepeater: reset next_repeat_at");
31+
*self.next_repeat_at.lock().await = Some(Instant::now() + self.delay);
32+
}
33+
2434
/// Set the message to repeat
2535
pub async fn set_message(&self, message: M) {
2636
debug!("MessageRepeater: set message"; "message" => format!("{:#?}", message));
2737
*self.message.lock().await = Some(message);
38+
self.reset_next_repeat_at().await;
2839
}
2940

3041
/// Start repeating the message if any
3142
pub async fn repeat_message(&self) -> StdResult<()> {
32-
tokio::time::sleep(self.delay).await;
43+
let wait_delay = match self.next_repeat_at.lock().await.as_ref() {
44+
None => self.delay,
45+
Some(next_repeat_at) => next_repeat_at
46+
.checked_duration_since(Instant::now())
47+
.unwrap_or_default(),
48+
};
49+
tokio::time::sleep(wait_delay).await;
3350
match self.message.lock().await.as_ref() {
3451
Some(message) => {
3552
debug!("MessageRepeater: repeat message"; "message" => format!("{:#?}", message));
@@ -41,6 +58,7 @@ impl<M: Clone + Debug + Sync + Send + 'static> MessageRepeater<M> {
4158
debug!("MessageRepeater: no message to repeat");
4259
}
4360
}
61+
self.reset_next_repeat_at().await;
4462

4563
Ok(())
4664
}

0 commit comments

Comments
 (0)