Skip to content

Commit ef04157

Browse files
committed
feat: implement signer registration repeater in the relay
1 parent be59eae commit ef04157

File tree

2 files changed

+36
-8
lines changed

2 files changed

+36
-8
lines changed

mithril-relay/src/relay/signer.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
use crate::p2p::{Peer, PeerEvent};
1+
use crate::{
2+
p2p::{Peer, PeerEvent},
3+
repeater::MessageRepeater,
4+
};
25
use libp2p::Multiaddr;
36
use mithril_common::{
47
messages::{RegisterSignatureMessage, RegisterSignerMessage},
58
test_utils::test_http_server::{test_http_server_with_socket_address, TestHttpServer},
69
StdResult,
710
};
811
use slog_scope::{debug, info};
9-
use std::net::SocketAddr;
12+
use std::{net::SocketAddr, sync::Arc, time::Duration};
1013
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
1114
use warp::Filter;
1215

@@ -16,6 +19,7 @@ pub struct SignerRelay {
1619
peer: Peer,
1720
signature_rx: UnboundedReceiver<RegisterSignatureMessage>,
1821
signer_rx: UnboundedReceiver<RegisterSignerMessage>,
22+
signer_repeater: Arc<MessageRepeater<RegisterSignerMessage>>,
1923
}
2024

2125
impl SignerRelay {
@@ -28,17 +32,25 @@ impl SignerRelay {
2832
debug!("SignerRelay: starting...");
2933
let (signature_tx, signature_rx) = unbounded_channel::<RegisterSignatureMessage>();
3034
let (signer_tx, signer_rx) = unbounded_channel::<RegisterSignerMessage>();
35+
let repeat_frequency = Duration::from_secs(30);
36+
let signer_repeater = Arc::new(MessageRepeater::new(signer_tx.clone(), repeat_frequency));
3137
let peer = Peer::new(address).start().await?;
32-
let server =
33-
Self::start_http_server(server_port, aggregator_endpoint, signer_tx, signature_tx)
34-
.await;
38+
let server = Self::start_http_server(
39+
server_port,
40+
aggregator_endpoint,
41+
signer_tx,
42+
signature_tx,
43+
signer_repeater.clone(),
44+
)
45+
.await;
3546
info!("SignerRelay: listening on"; "address" => format!("{:?}", server.address()));
3647

3748
Ok(Self {
3849
server,
3950
peer,
4051
signature_rx,
4152
signer_rx,
53+
signer_repeater,
4254
})
4355
}
4456

@@ -47,6 +59,7 @@ impl SignerRelay {
4759
aggregator_endpoint: &str,
4860
signer_tx: UnboundedSender<RegisterSignerMessage>,
4961
signature_tx: UnboundedSender<RegisterSignatureMessage>,
62+
signer_repeater: Arc<MessageRepeater<RegisterSignerMessage>>,
5063
) -> TestHttpServer {
5164
test_http_server_with_socket_address(
5265
warp::path("register-signatures")
@@ -58,6 +71,7 @@ impl SignerRelay {
5871
.and(warp::post())
5972
.and(warp::body::json())
6073
.and(middlewares::with_transmitter(signer_tx))
74+
.and(middlewares::with_repeater(signer_repeater.clone()))
6175
.and_then(handlers::register_signer_handler))
6276
.or(warp::path("epoch-settings")
6377
.and(warp::get())
@@ -104,6 +118,7 @@ impl SignerRelay {
104118
}
105119
}
106120
},
121+
_ = self.signer_repeater.repeat_message() => {Ok(())},
107122
_event = self.peer.tick_swarm() => {Ok(())}
108123
}
109124
}
@@ -136,16 +151,24 @@ impl SignerRelay {
136151
}
137152

138153
mod middlewares {
139-
use std::convert::Infallible;
154+
use std::{convert::Infallible, fmt::Debug, sync::Arc};
140155
use tokio::sync::mpsc::UnboundedSender;
141156
use warp::Filter;
142157

158+
use crate::repeater::MessageRepeater;
159+
143160
pub fn with_transmitter<T: Send + Sync>(
144161
tx: UnboundedSender<T>,
145162
) -> impl Filter<Extract = (UnboundedSender<T>,), Error = Infallible> + Clone {
146163
warp::any().map(move || tx.clone())
147164
}
148165

166+
pub fn with_repeater<M: Clone + Debug + Sync + Send + 'static>(
167+
repeater: Arc<MessageRepeater<M>>,
168+
) -> impl Filter<Extract = (Arc<MessageRepeater<M>>,), Error = Infallible> + Clone {
169+
warp::any().map(move || repeater.clone())
170+
}
171+
149172
pub fn with_aggregator_endpoint(
150173
aggregator_endpoint: String,
151174
) -> impl Filter<Extract = (String,), Error = Infallible> + Clone {
@@ -157,15 +180,20 @@ mod handlers {
157180
use mithril_common::messages::{RegisterSignatureMessage, RegisterSignerMessage};
158181
use reqwest::{Error, Response};
159182
use slog_scope::debug;
160-
use std::convert::Infallible;
183+
use std::{convert::Infallible, sync::Arc};
161184
use tokio::sync::mpsc::UnboundedSender;
162185
use warp::http::StatusCode;
163186

187+
use crate::repeater;
188+
164189
pub async fn register_signer_handler(
165190
register_signer_message: RegisterSignerMessage,
166191
tx: UnboundedSender<RegisterSignerMessage>,
192+
repeater: Arc<repeater::MessageRepeater<RegisterSignerMessage>>,
167193
) -> Result<impl warp::Reply, Infallible> {
168194
debug!("SignerRelay: serve HTTP route /register-signer"; "register_signer_message" => format!("{register_signer_message:#?}"));
195+
196+
repeater.set_message(register_signer_message.clone()).await;
169197
match tx.send(register_signer_message) {
170198
Ok(_) => Ok(Box::new(warp::reply::with_status(
171199
"".to_string(),

mithril-relay/src/repeater.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::{
77
time::Instant,
88
};
99

10-
/// A message repeater will send a message to a channel at a given delay
10+
/// A message repeater will send a copy of the message to a channel at a given frequency
1111
pub struct MessageRepeater<M: Clone + Debug + Sync + Send + 'static> {
1212
message: Arc<Mutex<Option<M>>>,
1313
tx_message: UnboundedSender<M>,

0 commit comments

Comments
 (0)