Skip to content

Commit 56ac1b3

Browse files
committed
fix(dmq): gate code behind 'future_dmq' feature
1 parent 624e5ed commit 56ac1b3

File tree

9 files changed

+220
-88
lines changed

9 files changed

+220
-88
lines changed

internal/mithril-dmq/src/consumer/server/pallas.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,15 @@ impl DmqConsumerServerPallas {
4848

4949
/// Creates and returns a new `DmqServer` connected to the specified socket.
5050
async fn new_server(&self) -> StdResult<DmqServer> {
51-
let magic = self.network.code();
51+
info!(
52+
self.logger,
53+
"Creating a new DMQ consumer server";
54+
"socket" => ?self.socket,
55+
"network" => ?self.network
56+
);
57+
let magic = self.network.magic_id();
5258
if self.socket.exists() {
53-
fs::remove_file(self.socket.clone()).unwrap();
59+
fs::remove_file(self.socket.clone())?;
5460
}
5561
let listener = UnixListener::bind(&self.socket)
5662
.map_err(|err| anyhow!(err))
@@ -177,7 +183,7 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
177183
Some(ref mut receiver) => loop {
178184
select! {
179185
_ = stop_rx.changed() => {
180-
warn!(self.logger, "Stopping signature processor...");
186+
warn!(self.logger, "Stopping DMQ consumer server...");
181187

182188
return Ok(());
183189
}

internal/mithril-dmq/src/publisher/server/pallas.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,15 @@ impl DmqPublisherServerPallas {
5050

5151
/// Creates and returns a new `DmqServer` connected to the specified socket.
5252
async fn new_server(&self) -> StdResult<DmqServer> {
53-
let magic = self.network.code();
53+
info!(
54+
self.logger,
55+
"Creating a new DMQ publisher server";
56+
"socket" => ?self.socket,
57+
"network" => ?self.network
58+
);
59+
let magic = self.network.magic_id();
5460
if self.socket.exists() {
55-
fs::remove_file(self.socket.clone()).unwrap();
61+
fs::remove_file(self.socket.clone())?;
5662
}
5763
let listener = UnixListener::bind(&self.socket)
5864
.map_err(|err| anyhow!(err))
@@ -184,7 +190,7 @@ impl DmqPublisherServer for DmqPublisherServerPallas {
184190
loop {
185191
select! {
186192
_ = stop_rx.changed() => {
187-
warn!(self.logger, "Stopping signature processor...");
193+
warn!(self.logger, "Stopping DMQ publisher server...");
188194

189195
return Ok(());
190196
}

mithril-relay/src/commands/aggregator.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
#[cfg(feature = "future_dmq")]
12
use std::path::PathBuf;
23

34
use clap::Parser;
45
use libp2p::Multiaddr;
56
use slog::error;
67

7-
use mithril_common::{CardanoNetwork, StdResult};
8+
#[cfg(feature = "future_dmq")]
9+
use mithril_common::CardanoNetwork;
10+
use mithril_common::StdResult;
811

912
use crate::AggregatorRelay;
1013

@@ -21,6 +24,7 @@ pub struct AggregatorCommand {
2124
dial_to: Option<Multiaddr>,
2225

2326
/// Path to the DMQ socket file
27+
#[cfg(feature = "future_dmq")]
2428
#[clap(
2529
long,
2630
env = "DMQ_NODE_SOCKET_PATH",
@@ -30,11 +34,13 @@ pub struct AggregatorCommand {
3034
dmq_node_socket_path: PathBuf,
3135

3236
/// Cardano network
37+
#[cfg(feature = "future_dmq")]
3338
#[clap(long, env = "NETWORK")]
3439
pub network: String,
3540

3641
/// Cardano Network Magic number
3742
/// useful for TestNet & DevNet
43+
#[cfg(feature = "future_dmq")]
3844
#[clap(long, env = "NETWORK_MAGIC")]
3945
pub network_magic: Option<u64>,
4046

@@ -50,12 +56,15 @@ impl AggregatorCommand {
5056
let dial_to = self.dial_to.to_owned();
5157
let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", self.listen_port).parse()?;
5258
let aggregator_endpoint = self.aggregator_endpoint.to_owned();
59+
#[cfg(feature = "future_dmq")]
5360
let cardano_network =
5461
CardanoNetwork::from_code(self.network.to_owned(), self.network_magic)?;
5562

5663
let mut relay = AggregatorRelay::start(
5764
&addr,
65+
#[cfg(feature = "future_dmq")]
5866
&self.dmq_node_socket_path,
67+
#[cfg(feature = "future_dmq")]
5968
&cardano_network,
6069
&aggregator_endpoint,
6170
logger,

mithril-relay/src/commands/signer.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1-
use std::{path::PathBuf, time::Duration};
1+
#[cfg(feature = "future_dmq")]
2+
use std::path::PathBuf;
3+
use std::time::Duration;
24

35
use clap::Parser;
46
use libp2p::Multiaddr;
5-
use mithril_common::{CardanoNetwork, StdResult};
67
use slog::error;
78

8-
use super::CommandContext;
9+
#[cfg(feature = "future_dmq")]
10+
use mithril_common::CardanoNetwork;
11+
use mithril_common::StdResult;
12+
913
use crate::{SignerRelay, SignerRelayMode};
1014

15+
use super::CommandContext;
16+
1117
#[derive(Parser, Debug, Clone)]
1218
pub struct SignerCommand {
1319
/// HTTP Server listening port
@@ -23,6 +29,7 @@ pub struct SignerCommand {
2329
dial_to: Option<Multiaddr>,
2430

2531
/// Path to the DMQ socket file
32+
#[cfg(feature = "future_dmq")]
2633
#[clap(
2734
long,
2835
env = "DMQ_NODE_SOCKET_PATH",
@@ -32,11 +39,13 @@ pub struct SignerCommand {
3239
dmq_node_socket_path: PathBuf,
3340

3441
/// Cardano network
42+
#[cfg(feature = "future_dmq")]
3543
#[clap(long, env = "NETWORK")]
3644
pub network: String,
3745

3846
/// Cardano Network Magic number
3947
/// useful for TestNet & DevNet
48+
#[cfg(feature = "future_dmq")]
4049
#[clap(long, env = "NETWORK_MAGIC")]
4150
pub network_magic: Option<u64>,
4251

@@ -68,13 +77,16 @@ impl SignerCommand {
6877
let signature_registration_mode = &self.signature_registration_mode;
6978
let aggregator_endpoint = self.aggregator_endpoint.to_owned();
7079
let signer_repeater_delay = Duration::from_millis(self.signer_repeater_delay);
80+
#[cfg(feature = "future_dmq")]
7181
let cardano_network =
7282
CardanoNetwork::from_code(self.network.to_owned(), self.network_magic)?;
7383

7484
let mut relay = SignerRelay::start(
7585
&addr,
7686
&server_port,
87+
#[cfg(feature = "future_dmq")]
7788
&self.dmq_node_socket_path,
89+
#[cfg(feature = "future_dmq")]
7890
&cardano_network,
7991
signer_registration_mode,
8092
signature_registration_mode,

mithril-relay/src/p2p/peer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use mithril_common::{
1515
logging::LoggerExtensions,
1616
messages::{RegisterSignatureMessageHttp, RegisterSignerMessage},
1717
};
18+
#[cfg(feature = "future_dmq")]
1819
use mithril_dmq::DmqMessage;
1920
use serde::{Deserialize, Serialize};
2021
use slog::{Logger, debug, info};
@@ -72,6 +73,7 @@ pub enum BroadcastMessage {
7273
RegisterSignatureHttp(RegisterSignatureMessageHttp),
7374

7475
/// A DMQ signature registration message received from the Gossip sub
76+
#[cfg(feature = "future_dmq")]
7577
RegisterSignatureDmq(DmqMessage),
7678
}
7779

@@ -257,6 +259,7 @@ impl Peer {
257259
}
258260

259261
/// Publish a DMQ signature on the P2P pubsub
262+
#[cfg(feature = "future_dmq")]
260263
pub fn publish_signature_dmq(
261264
&mut self,
262265
message: &DmqMessage,

mithril-relay/src/relay/aggregator.rs

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1+
#[cfg(feature = "future_dmq")]
12
use std::{path::Path, sync::Arc};
23

34
use anyhow::anyhow;
45
use libp2p::Multiaddr;
6+
#[cfg(feature = "future_dmq")]
57
use mithril_dmq::{DmqConsumerServer, DmqConsumerServerPallas, DmqMessage};
68
use reqwest::StatusCode;
79
use slog::{Logger, error, info};
10+
#[cfg(feature = "future_dmq")]
811
use tokio::sync::{
912
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
10-
watch::{self, Receiver},
13+
watch::{self, Receiver, Sender},
1114
};
1215

16+
#[cfg(feature = "future_dmq")]
17+
use mithril_common::CardanoNetwork;
1318
use mithril_common::{
14-
CardanoNetwork, StdResult,
19+
StdResult,
1520
logging::LoggerExtensions,
1621
messages::{RegisterSignatureMessageHttp, RegisterSignerMessage},
1722
};
@@ -22,38 +27,56 @@ use crate::p2p::{BroadcastMessage, Peer, PeerEvent};
2227
pub struct AggregatorRelay {
2328
aggregator_endpoint: String,
2429
peer: Peer,
30+
#[cfg(feature = "future_dmq")]
2531
signature_dmq_tx: UnboundedSender<DmqMessage>,
32+
#[cfg(feature = "future_dmq")]
33+
#[allow(unused)]
34+
stop_tx: Sender<()>,
2635
logger: Logger,
2736
}
2837

2938
impl AggregatorRelay {
3039
/// Start a relay for a Mithril aggregator
3140
pub async fn start(
3241
addr: &Multiaddr,
33-
dmq_node_socket_path: &Path,
34-
cardano_network: &CardanoNetwork,
42+
#[cfg(feature = "future_dmq")] dmq_node_socket_path: &Path,
43+
#[cfg(feature = "future_dmq")] cardano_network: &CardanoNetwork,
3544
aggregator_endpoint: &str,
3645
logger: &Logger,
3746
) -> StdResult<Self> {
38-
let (_stop_tx, stop_rx) = watch::channel(());
39-
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
40-
let _dmq_consumer_server = Self::start_dmq_consumer_server(
41-
dmq_node_socket_path,
42-
cardano_network,
43-
signature_dmq_rx,
44-
stop_rx,
45-
logger.clone(),
46-
)
47-
.await?;
47+
let peer = Peer::new(addr).with_logger(logger).start().await?;
48+
let logger = logger.new_with_component_name::<Self>();
49+
#[cfg(feature = "future_dmq")]
50+
{
51+
let (stop_tx, stop_rx) = watch::channel(());
52+
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
53+
#[cfg(unix)]
54+
let _dmq_consumer_server = Self::start_dmq_consumer_server(
55+
dmq_node_socket_path,
56+
cardano_network,
57+
signature_dmq_rx,
58+
stop_rx,
59+
logger.clone(),
60+
)
61+
.await?;
4862

63+
Ok(Self {
64+
aggregator_endpoint: aggregator_endpoint.to_owned(),
65+
peer,
66+
signature_dmq_tx,
67+
stop_tx,
68+
logger,
69+
})
70+
}
71+
#[cfg(not(feature = "future_dmq"))]
4972
Ok(Self {
5073
aggregator_endpoint: aggregator_endpoint.to_owned(),
51-
peer: Peer::new(addr).with_logger(logger).start().await?,
52-
signature_dmq_tx,
53-
logger: logger.new_with_component_name::<Self>(),
74+
peer,
75+
logger,
5476
})
5577
}
5678

79+
#[cfg(feature = "future_dmq")]
5780
async fn start_dmq_consumer_server(
5881
socket: &Path,
5982
cardano_network: &CardanoNetwork,
@@ -174,6 +197,7 @@ impl AggregatorRelay {
174197
}
175198
}
176199
}
200+
#[cfg(feature = "future_dmq")]
177201
Ok(Some(BroadcastMessage::RegisterSignatureDmq(signature_message_received))) => {
178202
self.signature_dmq_tx.send(signature_message_received).map_err(|e| {
179203
anyhow!("Failed to send signature message to DMQ consumer server: {e}")
@@ -235,7 +259,9 @@ mod tests {
235259
let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
236260
let relay = AggregatorRelay::start(
237261
&addr,
238-
&Path::new("test"),
262+
#[cfg(feature = "future_dmq")]
263+
Path::new("test"),
264+
#[cfg(feature = "future_dmq")]
239265
&CardanoNetwork::TestNet(123),
240266
&server.url(""),
241267
&TestLogger::stdout(),

mithril-relay/src/relay/passive.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ impl PassiveRelay {
3838
Ok(Some(BroadcastMessage::RegisterSignatureHttp(signature_message_received))) => {
3939
info!(self.logger, "Received HTTP signature message from P2P network"; "signature_message" => #?signature_message_received);
4040
}
41+
#[cfg(feature = "future_dmq")]
4142
Ok(Some(BroadcastMessage::RegisterSignatureDmq(signature_message_received))) => {
4243
info!(self.logger, "Received DMQ signature message from P2P network"; "signature_message" => #?signature_message_received);
4344
}

0 commit comments

Comments
 (0)