Skip to content

Commit 24e8aa5

Browse files
committed
feat(dmq): update signer relay for DMQ messages
1 parent fddacd7 commit 24e8aa5

File tree

1 file changed

+78
-21
lines changed

1 file changed

+78
-21
lines changed

mithril-relay/src/relay/signer.rs

Lines changed: 78 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1+
use std::{net::SocketAddr, path::Path, sync::Arc, time::Duration};
2+
13
use clap::ValueEnum;
24
use libp2p::Multiaddr;
3-
use slog::{Logger, debug, info};
4-
use std::{net::SocketAddr, sync::Arc, time::Duration};
5+
use mithril_dmq::{DmqMessage, DmqPublisherServer, DmqPublisherServerPallas};
6+
use slog::{Logger, debug, error, info};
57
use strum::Display;
6-
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
8+
use tokio::sync::{
9+
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
10+
watch::{self, Receiver},
11+
};
712
use warp::Filter;
813

914
use mithril_common::{
10-
StdResult,
15+
CardanoNetwork, StdResult,
1116
logging::LoggerExtensions,
1217
messages::{RegisterSignatureMessageHttp, RegisterSignerMessage},
1318
};
@@ -47,10 +52,11 @@ struct HTTPServerConfiguration<'a> {
4752

4853
/// A relay for a Mithril signer
4954
pub struct SignerRelay {
50-
server: TestHttpServer,
55+
http_server: TestHttpServer,
5156
peer: Peer,
52-
signature_rx: UnboundedReceiver<RegisterSignatureMessageHttp>,
53-
signer_rx: UnboundedReceiver<RegisterSignerMessage>,
57+
signature_http_rx: UnboundedReceiver<RegisterSignatureMessageHttp>,
58+
signature_dmq_rx: UnboundedReceiver<DmqMessage>,
59+
signer_http_rx: UnboundedReceiver<RegisterSignerMessage>,
5460
signer_repeater: Arc<MessageRepeater<RegisterSignerMessage>>,
5561
logger: Logger,
5662
}
@@ -60,6 +66,8 @@ impl SignerRelay {
6066
pub async fn start(
6167
address: &Multiaddr,
6268
server_port: &u16,
69+
dmq_node_socket_path: &Path,
70+
cardano_network: &CardanoNetwork,
6371
signer_registration_mode: &SignerRelayMode,
6472
signature_registration_mode: &SignerRelayMode,
6573
aggregator_endpoint: &str,
@@ -76,7 +84,7 @@ impl SignerRelay {
7684
logger,
7785
));
7886
let peer = Peer::new(address).start().await?;
79-
let server = Self::start_http_server(&HTTPServerConfiguration {
87+
let http_server = Self::start_http_server(&HTTPServerConfiguration {
8088
server_port,
8189
signer_registration_mode: signer_registration_mode.to_owned(),
8290
signature_registration_mode: signature_registration_mode.to_owned(),
@@ -87,18 +95,54 @@ impl SignerRelay {
8795
logger: &relay_logger,
8896
})
8997
.await;
90-
info!(relay_logger, "Listening on"; "address" => ?server.address());
98+
info!(relay_logger, "Listening on"; "address" => ?http_server.address());
99+
100+
let (_stop_tx, stop_rx) = watch::channel(());
101+
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
102+
let _dmq_publisher_server = Self::start_dmq_publisher_server(
103+
dmq_node_socket_path,
104+
cardano_network,
105+
signature_dmq_tx,
106+
stop_rx,
107+
relay_logger.clone(),
108+
)
109+
.await?;
91110

92111
Ok(Self {
93-
server,
112+
http_server,
94113
peer,
95-
signature_rx,
96-
signer_rx,
114+
signature_http_rx: signature_rx,
115+
signature_dmq_rx,
116+
signer_http_rx: signer_rx,
97117
signer_repeater,
98118
logger: relay_logger,
99119
})
100120
}
101121

122+
async fn start_dmq_publisher_server(
123+
socket: &Path,
124+
cardano_network: &CardanoNetwork,
125+
signature_dmq_tx: UnboundedSender<DmqMessage>,
126+
stop_rx: Receiver<()>,
127+
logger: Logger,
128+
) -> StdResult<Arc<DmqPublisherServerPallas>> {
129+
let dmq_publisher_server = Arc::new(DmqPublisherServerPallas::new(
130+
socket.to_path_buf(),
131+
cardano_network.to_owned(),
132+
stop_rx,
133+
logger.clone(),
134+
));
135+
dmq_publisher_server.register_transmitter(signature_dmq_tx).await?;
136+
let dmq_publisher_server_clone = dmq_publisher_server.clone();
137+
tokio::spawn(async move {
138+
if let Err(err) = dmq_publisher_server_clone.run().await {
139+
error!(logger.to_owned(), "DMQ Publisher server failed"; "error" => ?err);
140+
}
141+
});
142+
143+
Ok(dmq_publisher_server)
144+
}
145+
102146
async fn start_http_server(configuration: &HTTPServerConfiguration<'_>) -> TestHttpServer {
103147
let server_logger = configuration.logger.new_with_name("http_server");
104148
test_http_server_with_socket_address(
@@ -154,28 +198,41 @@ impl SignerRelay {
154198
/// Tick the signer relay
155199
pub async fn tick(&mut self) -> StdResult<()> {
156200
tokio::select! {
157-
message = self.signature_rx.recv() => {
201+
message = self.signature_http_rx.recv() => {
202+
match message {
203+
Some(signature_message) => {
204+
info!(self.logger, "Publish HTTP signature to p2p network"; "message" => #?signature_message);
205+
self.peer.publish_signature_http(&signature_message)?;
206+
Ok(())
207+
}
208+
None => {
209+
debug!(self.logger, "No HTTP signature message available");
210+
Ok(())
211+
}
212+
}
213+
},
214+
message = self.signature_dmq_rx.recv() => {
158215
match message {
159216
Some(signature_message) => {
160-
info!(self.logger, "Publish signature to p2p network"; "message" => #?signature_message);
161-
self.peer.publish_signature(&signature_message)?;
217+
info!(self.logger, "Publish DMQ signature to p2p network"; "message" => #?signature_message);
218+
self.peer.publish_signature_dmq(&signature_message)?;
162219
Ok(())
163220
}
164221
None => {
165-
debug!(self.logger, "No signature message available");
222+
//debug!(self.logger, "No DMQ signature message available");
166223
Ok(())
167224
}
168225
}
169226
},
170-
message = self.signer_rx.recv() => {
227+
message = self.signer_http_rx.recv() => {
171228
match message {
172229
Some(signer_message) => {
173-
info!(self.logger, "Publish signer-registration to p2p network"; "message" => #?signer_message);
230+
info!(self.logger, "Publish HTTP signer-registration to p2p network"; "message" => #?signer_message);
174231
self.peer.publish_signer_registration(&signer_message)?;
175232
Ok(())
176233
}
177234
None => {
178-
debug!(self.logger, "No signer message available");
235+
debug!(self.logger, "No HTTP signer message available");
179236
Ok(())
180237
}
181238
}
@@ -188,7 +245,7 @@ impl SignerRelay {
188245
/// Receive signature from the underlying channel
189246
#[allow(dead_code)]
190247
pub async fn receive_signature(&mut self) -> Option<RegisterSignatureMessageHttp> {
191-
self.signature_rx.recv().await
248+
self.signature_http_rx.recv().await
192249
}
193250

194251
/// Tick the peer of the signer relay
@@ -203,7 +260,7 @@ impl SignerRelay {
203260

204261
/// Retrieve address on which the HTTP Server is listening
205262
pub fn address(&self) -> SocketAddr {
206-
self.server.address()
263+
self.http_server.address()
207264
}
208265

209266
/// Retrieve address on which the peer is listening

0 commit comments

Comments
 (0)