Skip to content

Commit cdf3ec8

Browse files
committed
feat(dmq): update aggregator relay for DMQ messages
1 parent ed5987e commit cdf3ec8

File tree

1 file changed

+68
-9
lines changed

1 file changed

+68
-9
lines changed

mithril-relay/src/relay/aggregator.rs

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,83 @@
1-
use crate::p2p::{BroadcastMessage, Peer, PeerEvent};
1+
use std::{path::Path, sync::Arc};
2+
23
use anyhow::anyhow;
34
use libp2p::Multiaddr;
5+
use mithril_dmq::{DmqConsumerServer, DmqConsumerServerPallas, DmqMessage};
6+
use reqwest::StatusCode;
7+
use slog::{Logger, error, info};
8+
use tokio::sync::{
9+
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
10+
watch::{self, Receiver},
11+
};
12+
413
use mithril_common::{
5-
StdResult,
14+
CardanoNetwork, StdResult,
615
logging::LoggerExtensions,
716
messages::{RegisterSignatureMessageHttp, RegisterSignerMessage},
817
};
9-
use reqwest::StatusCode;
10-
use slog::{Logger, error, info};
18+
19+
use crate::p2p::{BroadcastMessage, Peer, PeerEvent};
1120

1221
/// A relay for a Mithril aggregator
1322
pub struct AggregatorRelay {
1423
aggregator_endpoint: String,
1524
peer: Peer,
25+
signature_dmq_tx: UnboundedSender<DmqMessage>,
1626
logger: Logger,
1727
}
1828

1929
impl AggregatorRelay {
2030
/// Start a relay for a Mithril aggregator
2131
pub async fn start(
2232
addr: &Multiaddr,
33+
dmq_node_socket_path: &Path,
34+
cardano_network: &CardanoNetwork,
2335
aggregator_endpoint: &str,
2436
logger: &Logger,
2537
) -> StdResult<Self> {
38+
let (_stop_tx, stop_rx) = watch::channel(());
39+
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
40+
let _dmq_consumer_server = Self::start_dmq_consumer_server(
41+
dmq_node_socket_path,
42+
cardano_network,
43+
signature_dmq_rx,
44+
stop_rx,
45+
logger.clone(),
46+
)
47+
.await?;
48+
2649
Ok(Self {
2750
aggregator_endpoint: aggregator_endpoint.to_owned(),
2851
peer: Peer::new(addr).with_logger(logger).start().await?,
52+
signature_dmq_tx,
2953
logger: logger.new_with_component_name::<Self>(),
3054
})
3155
}
3256

57+
async fn start_dmq_consumer_server(
58+
socket: &Path,
59+
cardano_network: &CardanoNetwork,
60+
signature_dmq_rx: UnboundedReceiver<DmqMessage>,
61+
stop_rx: Receiver<()>,
62+
logger: Logger,
63+
) -> StdResult<Arc<DmqConsumerServerPallas>> {
64+
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
65+
socket.to_path_buf(),
66+
cardano_network.to_owned(),
67+
stop_rx,
68+
logger.clone(),
69+
));
70+
dmq_consumer_server.register_receiver(signature_dmq_rx).await?;
71+
let dmq_consumer_server_clone = dmq_consumer_server.clone();
72+
tokio::spawn(async move {
73+
if let Err(err) = dmq_consumer_server_clone.run().await {
74+
error!(logger.to_owned(), "DMQ Consumer server failed"; "error" => ?err);
75+
}
76+
});
77+
78+
Ok(dmq_consumer_server)
79+
}
80+
3381
async fn notify_signature_to_aggregator(
3482
&self,
3583
signature_message: &RegisterSignatureMessageHttp,
@@ -100,7 +148,7 @@ impl AggregatorRelay {
100148
pub async fn tick(&mut self) -> StdResult<()> {
101149
if let Some(peer_event) = self.peer.tick_swarm().await? {
102150
match self.peer.convert_peer_event_to_message(peer_event) {
103-
Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) => {
151+
Ok(Some(BroadcastMessage::RegisterSignerHttp(signer_message_received))) => {
104152
let retry_max = 3;
105153
let mut retry_count = 0;
106154
while let Err(e) =
@@ -113,7 +161,7 @@ impl AggregatorRelay {
113161
}
114162
}
115163
}
116-
Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) => {
164+
Ok(Some(BroadcastMessage::RegisterSignatureHttp(signature_message_received))) => {
117165
let retry_max = 3;
118166
let mut retry_count = 0;
119167
while let Err(e) =
@@ -126,6 +174,11 @@ impl AggregatorRelay {
126174
}
127175
}
128176
}
177+
Ok(Some(BroadcastMessage::RegisterSignatureDmq(signature_message_received))) => {
178+
self.signature_dmq_tx.send(signature_message_received).map_err(|e| {
179+
anyhow!("Failed to send signature message to DMQ consumer server: {e}")
180+
})?;
181+
}
129182
Ok(None) => {}
130183
Err(e) => return Err(e),
131184
}
@@ -179,9 +232,15 @@ mod tests {
179232
then.status(201).body("ok");
180233
});
181234
let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
182-
let relay = AggregatorRelay::start(&addr, &server.url(""), &TestLogger::stdout())
183-
.await
184-
.unwrap();
235+
let relay = AggregatorRelay::start(
236+
&addr,
237+
&Path::new("test"),
238+
&CardanoNetwork::TestNet(123),
239+
&server.url(""),
240+
&TestLogger::stdout(),
241+
)
242+
.await
243+
.unwrap();
185244

186245
relay
187246
.notify_signature_to_aggregator(&RegisterSignatureMessageHttp::dummy())

0 commit comments

Comments
 (0)