diff --git a/.github/workflows/actions/deploy-terraform-infrastructure/action.yml b/.github/workflows/actions/deploy-terraform-infrastructure/action.yml index d6f15666d85..2be5d887891 100644 --- a/.github/workflows/actions/deploy-terraform-infrastructure/action.yml +++ b/.github/workflows/actions/deploy-terraform-infrastructure/action.yml @@ -63,6 +63,10 @@ inputs: description: Mithril use P2P network (experimental, for test only). required: false default: "false" + mithril_p2p_use_dmq_protocol: + description: Mithril P2P network use DMQ protocol (experimental, for test only). + required: false + default: "false" mithril_p2p_network_bootstrap_peer: description: Mithril P2P network bootstrap peer (experimental, for test only). required: false @@ -247,6 +251,7 @@ runs: google_compute_instance_ssh_keys_environment = "${{ inputs.google_compute_instance_ssh_keys_environment }}" google_service_credentials_json_file = "./google-application-credentials.json" mithril_use_p2p_network = "${{ inputs.mithril_use_p2p_network }}" + mithril_p2p_use_dmq_protocol = "${{ inputs.mithril_p2p_use_dmq_protocol }}" mithril_p2p_network_bootstrap_peer = "${{ inputs.mithril_p2p_network_bootstrap_peer }}" mithril_p2p_signer_relay_signer_registration_mode = "${{ inputs.mithril_p2p_signer_relay_signer_registration_mode }}" mithril_p2p_signer_relay_signature_registration_mode = "${{ inputs.mithril_p2p_signer_relay_signature_registration_mode }}" diff --git a/.github/workflows/test-deploy-network.yml b/.github/workflows/test-deploy-network.yml index d028116b4dd..591db953da3 100644 --- a/.github/workflows/test-deploy-network.yml +++ b/.github/workflows/test-deploy-network.yml @@ -34,6 +34,7 @@ jobs: environment_prefix: dev cardano_network: preview mithril_use_p2p_network: true + mithril_p2p_use_dmq_protocol: true mithril_p2p_signer_relay_signer_registration_mode: passthrough mithril_p2p_signer_relay_signature_registration_mode: p2p mithril_api_domain: api.mithril.network @@ -72,6 +73,7 @@ jobs: environment_prefix: dev-follower cardano_network: preview mithril_use_p2p_network: true + mithril_p2p_use_dmq_protocol: true mithril_p2p_network_bootstrap_peer: "/dns4/aggregator.dev-preview.api.mithril.network/tcp/6060" mithril_p2p_signer_relay_signer_registration_mode: passthrough mithril_p2p_signer_relay_signature_registration_mode: p2p @@ -103,6 +105,7 @@ jobs: environment_prefix: dev cardano_network: mainnet mithril_use_p2p_network: false + mithril_p2p_use_dmq_protocol: true mithril_api_domain: api.mithril.network mithril_era_reader_adapter_type: bootstrap mithril_protocol_parameters: | @@ -160,6 +163,7 @@ jobs: google_compute_instance_ssh_keys_environment: testing google_application_credentials: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }} mithril_use_p2p_network: ${{ matrix.mithril_use_p2p_network }} + mithril_p2p_use_dmq_protocol: ${{ matrix.mithril_p2p_use_dmq_protocol }} mithril_p2p_network_bootstrap_peer: ${{ matrix.mithril_p2p_network_bootstrap_peer }} mithril_api_domain: ${{ matrix.mithril_api_domain }} mithril_image_id: ${{ inputs.mithril_image_id }} diff --git a/Cargo.lock b/Cargo.lock index 6107dee327b..7f2fb891b1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4216,11 +4216,15 @@ version = "0.1.5" dependencies = [ "anyhow", "async-trait", + "bincode", "blake2 0.10.6", "mithril-cardano-node-chain", "mithril-common", "mockall", + "pallas-codec 1.0.0-alpha.2", "pallas-network 1.0.0-alpha.2", + "serde", + "serde_bytes", "slog", "slog-async", "slog-term", @@ -4323,11 +4327,13 @@ name = "mithril-relay" version = "0.1.48" dependencies = [ "anyhow", + "bincode", "clap", "config", "httpmock", "libp2p", "mithril-common", + "mithril-dmq", "mithril-doc", "mithril-test-http-server", "reqwest", diff --git a/internal/mithril-dmq/Cargo.toml b/internal/mithril-dmq/Cargo.toml index e660b212852..345dc0625be 100644 --- a/internal/mithril-dmq/Cargo.toml +++ b/internal/mithril-dmq/Cargo.toml @@ -10,16 +10,24 @@ license.workspace = true repository.workspace = true include = ["**/*.rs", "Cargo.toml", "README.md", ".gitignore"] +[package.metadata.cargo-machete] +# `serde_bytes` is used for DmqMessage serialization +ignored = ["serde_bytes"] + [lib] crate-type = ["lib", "cdylib", "staticlib"] [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +bincode = { version = "2.0.1" } blake2 = "0.10.6" mithril-cardano-node-chain = { path = "../cardano-node/mithril-cardano-node-chain" } mithril-common = { path = "../../mithril-common" } pallas-network = { git = "https://github.com/txpipe/pallas.git", branch = "main" } +pallas-codec = { git = "https://github.com/txpipe/pallas.git", branch = "main" } +serde = { workspace = true } +serde_bytes = "0.11.17" slog = { workspace = true } tokio = { workspace = true, features = ["sync"] } diff --git a/internal/mithril-dmq/src/consumer/interface.rs b/internal/mithril-dmq/src/consumer/client/interface.rs similarity index 65% rename from internal/mithril-dmq/src/consumer/interface.rs rename to internal/mithril-dmq/src/consumer/client/interface.rs index 4c8bc74d995..e9b4915c13e 100644 --- a/internal/mithril-dmq/src/consumer/interface.rs +++ b/internal/mithril-dmq/src/consumer/client/interface.rs @@ -2,10 +2,10 @@ use std::fmt::Debug; use mithril_common::{StdResult, crypto_helper::TryFromBytes, entities::PartyId}; -/// Trait for consuming messages from a DMQ node. +/// Trait for the client side of consuming messages from a DMQ node. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] -pub trait DmqConsumer: Send + Sync { +pub trait DmqConsumerClient: Send + Sync { /// Consume messages from the DMQ node. async fn consume_messages(&self) -> StdResult>; } diff --git a/internal/mithril-dmq/src/consumer/client/mod.rs b/internal/mithril-dmq/src/consumer/client/mod.rs new file mode 100644 index 00000000000..4035f6c0659 --- /dev/null +++ b/internal/mithril-dmq/src/consumer/client/mod.rs @@ -0,0 +1,5 @@ +mod interface; +mod pallas; + +pub use interface::*; +pub use pallas::*; diff --git a/internal/mithril-dmq/src/consumer/pallas.rs b/internal/mithril-dmq/src/consumer/client/pallas.rs similarity index 94% rename from internal/mithril-dmq/src/consumer/pallas.rs rename to internal/mithril-dmq/src/consumer/client/pallas.rs index ef59e37c6eb..4593e059009 100644 --- a/internal/mithril-dmq/src/consumer/pallas.rs +++ b/internal/mithril-dmq/src/consumer/client/pallas.rs @@ -12,12 +12,12 @@ use mithril_common::{ logging::LoggerExtensions, }; -use crate::DmqConsumer; +use crate::DmqConsumerClient; -/// A DMQ consumer implementation. +/// A DMQ client consumer implementation. /// /// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas. -pub struct DmqConsumerPallas { +pub struct DmqConsumerClientPallas { socket: PathBuf, network: CardanoNetwork, client: Mutex>, @@ -25,8 +25,8 @@ pub struct DmqConsumerPallas { phantom: PhantomData, } -impl DmqConsumerPallas { - /// Creates a new `DmqConsumerPallas` instance. +impl DmqConsumerClientPallas { + /// Creates a new `DmqConsumerClientPallas` instance. pub fn new(socket: PathBuf, network: CardanoNetwork, logger: Logger) -> Self { Self { socket, @@ -47,7 +47,7 @@ impl DmqConsumerPallas { ); DmqClient::connect(&self.socket, self.network.magic_id()) .await - .with_context(|| "DmqConsumerPallas failed to create a new client") + .with_context(|| "DmqConsumerClientPallas failed to create a new client") } /// Gets the cached `DmqClient`, creating a new one if it does not exist. @@ -128,7 +128,7 @@ impl DmqConsumerPallas { } #[async_trait::async_trait] -impl DmqConsumer for DmqConsumerPallas { +impl DmqConsumerClient for DmqConsumerClientPallas { async fn consume_messages(&self) -> StdResult> { let messages = self.consume_messages_internal().await; if messages.is_err() { @@ -247,7 +247,7 @@ mod tests { let reply_messages = fake_msgs(); let server = setup_dmq_server(socket_path.clone(), reply_messages); let client = tokio::spawn(async move { - let consumer = DmqConsumerPallas::new( + let consumer = DmqConsumerClientPallas::new( socket_path, CardanoNetwork::TestNet(0), TestLogger::stdout(), @@ -280,7 +280,7 @@ mod tests { let reply_messages = vec![]; let server = setup_dmq_server(socket_path.clone(), reply_messages); let client = tokio::spawn(async move { - let consumer = DmqConsumerPallas::::new( + let consumer = DmqConsumerClientPallas::::new( socket_path, CardanoNetwork::TestNet(0), TestLogger::stdout(), @@ -304,7 +304,7 @@ mod tests { let reply_messages = fake_msgs(); let server = setup_dmq_server(socket_path.clone(), reply_messages); let client = tokio::spawn(async move { - let consumer = DmqConsumerPallas::::new( + let consumer = DmqConsumerClientPallas::::new( socket_path, CardanoNetwork::TestNet(0), TestLogger::stdout(), diff --git a/internal/mithril-dmq/src/consumer/mod.rs b/internal/mithril-dmq/src/consumer/mod.rs index 4035f6c0659..24bf375cd40 100644 --- a/internal/mithril-dmq/src/consumer/mod.rs +++ b/internal/mithril-dmq/src/consumer/mod.rs @@ -1,5 +1,5 @@ -mod interface; -mod pallas; +mod client; +mod server; -pub use interface::*; -pub use pallas::*; +pub use client::*; +pub use server::*; diff --git a/internal/mithril-dmq/src/consumer/server/interface.rs b/internal/mithril-dmq/src/consumer/server/interface.rs new file mode 100644 index 00000000000..9b18e4cef81 --- /dev/null +++ b/internal/mithril-dmq/src/consumer/server/interface.rs @@ -0,0 +1,12 @@ +use mithril_common::StdResult; + +/// Trait for the server side of consuming messages from a DMQ node. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait DmqConsumerServer: Send + Sync { + /// Processes the next message received from the DMQ network. + async fn process_message(&self) -> StdResult<()>; + + /// Runs the DMQ publisher server. + async fn run(&self) -> StdResult<()>; +} diff --git a/internal/mithril-dmq/src/consumer/server/mod.rs b/internal/mithril-dmq/src/consumer/server/mod.rs new file mode 100644 index 00000000000..cc3ffc1fb2a --- /dev/null +++ b/internal/mithril-dmq/src/consumer/server/mod.rs @@ -0,0 +1,9 @@ +mod interface; +#[cfg(unix)] +mod pallas; +mod queue; + +pub use interface::*; + +#[cfg(unix)] +pub use pallas::*; diff --git a/internal/mithril-dmq/src/consumer/server/pallas.rs b/internal/mithril-dmq/src/consumer/server/pallas.rs new file mode 100644 index 00000000000..c4946395d81 --- /dev/null +++ b/internal/mithril-dmq/src/consumer/server/pallas.rs @@ -0,0 +1,413 @@ +use std::{fs, path::PathBuf}; + +use anyhow::{Context, anyhow}; +use pallas_network::{facades::DmqServer, miniprotocols::localmsgnotification::Request}; +use tokio::{ + net::UnixListener, + select, + sync::{Mutex, MutexGuard, mpsc::UnboundedReceiver, watch::Receiver}, +}; + +use slog::{Logger, debug, error, info, warn}; + +use mithril_common::{CardanoNetwork, StdResult, logging::LoggerExtensions}; + +use crate::{DmqConsumerServer, DmqMessage}; + +use super::queue::MessageQueue; + +/// A DMQ server implementation for messages notification from a DMQ node. +pub struct DmqConsumerServerPallas { + socket: PathBuf, + network: CardanoNetwork, + server: Mutex>, + messages_receiver: Mutex>>, + messages_buffer: MessageQueue, + stop_rx: Receiver<()>, + logger: Logger, +} + +impl DmqConsumerServerPallas { + /// Creates a new instance of [DmqConsumerServerPallas]. + pub fn new( + socket: PathBuf, + network: CardanoNetwork, + stop_rx: Receiver<()>, + logger: Logger, + ) -> Self { + Self { + socket, + network, + server: Mutex::new(None), + messages_receiver: Mutex::new(None), + messages_buffer: MessageQueue::new(), + stop_rx, + logger: logger.new_with_component_name::(), + } + } + + /// Creates and returns a new `DmqServer` connected to the specified socket. + async fn new_server(&self) -> StdResult { + info!( + self.logger, + "Creating a new DMQ consumer server"; + "socket" => ?self.socket, + "network" => ?self.network + ); + let magic = self.network.magic_id(); + if self.socket.exists() { + fs::remove_file(self.socket.clone())?; + } + let listener = UnixListener::bind(&self.socket) + .map_err(|err| anyhow!(err)) + .with_context(|| { + format!( + "DmqConsumerServerPallas failed to bind Unix socket at {}", + self.socket.display() + ) + })?; + + DmqServer::accept(&listener, magic) + .await + .map_err(|err| anyhow!(err)) + .with_context(|| "DmqConsumerServerPallas failed to create a new server") + } + + /// Gets the cached `DmqServer`, creating a new one if it does not exist. + async fn get_server(&self) -> StdResult>> { + { + // Run this in a separate block to avoid dead lock on the Mutex + let server_lock = self.server.lock().await; + if server_lock.as_ref().is_some() { + return Ok(server_lock); + } + } + + let mut server_lock = self.server.lock().await; + *server_lock = Some(self.new_server().await?); + + Ok(server_lock) + } + + /// Drops the current `DmqServer`, if it exists. + async fn drop_server(&self) -> StdResult<()> { + debug!( + self.logger, + "Drop existing DMQ server"; + "socket" => ?self.socket, + "network" => ?self.network + ); + let mut server_lock = self.server.lock().await; + if let Some(server) = server_lock.take() { + server.abort().await; + } + + Ok(()) + } + + /// Registers the receiver for DMQ messages (only one receiver is allowed). + pub async fn register_receiver( + &self, + receiver: UnboundedReceiver, + ) -> StdResult<()> { + debug!(self.logger, "Register message receiver for DMQ messages"); + let mut receiver_guard = self.messages_receiver.lock().await; + *receiver_guard = Some(receiver); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl DmqConsumerServer for DmqConsumerServerPallas { + async fn process_message(&self) -> StdResult<()> { + debug!( + self.logger, + "Waiting for message received from the DMQ network" + ); + let mut server_guard = self.get_server().await?; + let server = server_guard.as_mut().ok_or(anyhow!("DMQ server does not exist"))?; + + let request = server + .msg_notification() + .recv_next_request() + .await + .map_err(|err| anyhow!("Failed to receive next DMQ message: {}", err))?; + + match request { + Request::Blocking => { + debug!( + self.logger, + "Blocking notification of messages received from the DMQ network" + ); + let reply_messages = self.messages_buffer.dequeue_blocking(None).await; + let reply_messages = + reply_messages.into_iter().map(|msg| msg.into()).collect::>(); + server + .msg_notification() + .send_reply_messages_blocking(reply_messages) + .await?; + } + Request::NonBlocking => { + debug!( + self.logger, + "Non blocking notification of messages received from the DMQ network" + ); + let reply_messages = self.messages_buffer.dequeue_non_blocking(None).await; + let reply_messages = + reply_messages.into_iter().map(|msg| msg.into()).collect::>(); + let has_more = !self.messages_buffer.is_empty().await; + server + .msg_notification() + .send_reply_messages_non_blocking(reply_messages, has_more) + .await?; + server.msg_notification().recv_done().await?; + } + }; + + Ok(()) + } + + /// Runs the DMQ publisher server, processing messages in a loop. + async fn run(&self) -> StdResult<()> { + info!( + self.logger, + "Starting DMQ consumer server"; + "socket" => ?self.socket, + "network" => ?self.network + ); + + let mut stop_rx = self.stop_rx.clone(); + let mut receiver = self.messages_receiver.lock().await; + match *receiver { + Some(ref mut receiver) => loop { + select! { + _ = stop_rx.changed() => { + warn!(self.logger, "Stopping DMQ consumer server..."); + + return Ok(()); + } + message = receiver.recv() => { + if let Some(message) = message { + debug!(self.logger, "Received a message from the DMQ network"; "message" => ?message); + self.messages_buffer.enqueue(message).await; + } else { + warn!(self.logger, "DMQ message receiver channel closed"); + return Ok(()); + } + + } + res = self.process_message() => { + match res { + Ok(_) => { + debug!(self.logger, "Processed a message successfully"); + } + Err(err) => { + error!(self.logger, "Failed to process message"; "error" => ?err); + if let Err(drop_err) = self.drop_server().await { + error!(self.logger, "Failed to drop DMQ consumer server"; "error" => ?drop_err); + } + } + } + } + } + }, + None => { + return Err(anyhow!("DMQ message receiver is not registered")); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + use pallas_network::{ + facades::DmqClient, + miniprotocols::{localmsgnotification, localmsgsubmission::DmqMsg}, + }; + use tokio::sync::{mpsc::unbounded_channel, watch}; + use tokio::time::sleep; + + use mithril_common::{current_function, test_utils::TempDir}; + + use crate::test_tools::TestLogger; + + use super::*; + + fn create_temp_dir(folder_name: &str) -> PathBuf { + TempDir::create_with_short_path("dmq_consumer_server", folder_name) + } + + fn fake_msg() -> DmqMsg { + DmqMsg { + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + block_number: 10, + ttl: 100, + kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![0, 1, 2, 3, 4], + kes_period: 10, + } + } + + #[tokio::test] + async fn pallas_dmq_consumer_server_non_blocking_success() { + let (stop_tx, stop_rx) = watch::channel(()); + let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); + let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let cardano_network = CardanoNetwork::TestNet(0); + let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new( + socket_path.to_path_buf(), + cardano_network.to_owned(), + stop_rx, + TestLogger::stdout(), + )); + dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap(); + let message = fake_msg(); + let client = tokio::spawn({ + async move { + tokio::time::sleep(Duration::from_secs(1)).await; + + // client setup + let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap(); + + // init local msg notification client + let client_msg = client.msg_notification(); + assert_eq!(*client_msg.state(), localmsgnotification::State::Idle); + + // client sends a non blocking request to server and waits for a reply from the server + client_msg.send_request_messages_non_blocking().await.unwrap(); + assert_eq!( + *client_msg.state(), + localmsgnotification::State::BusyNonBlocking + ); + + let reply = client_msg.recv_next_reply().await.unwrap(); + assert_eq!(*client_msg.state(), localmsgnotification::State::Idle); + let result = match reply { + localmsgnotification::Reply(messages, false) => Ok(messages), + _ => Err(anyhow::anyhow!( + "Failed to receive blocking reply from DMQ server" + )), + }; + + // stop the consumer server + stop_tx.send(()).unwrap(); + + result + } + }); + let message_clone = message.clone(); + let _signature_dmq_tx_clone = signature_dmq_tx.clone(); + let recorder = tokio::spawn(async move { + _signature_dmq_tx_clone.send(message_clone.into()).unwrap(); + }); + + let (_, messages_res, _) = tokio::join!(dmq_consumer_server.run(), client, recorder); + let messages_received: Vec<_> = messages_res.unwrap().unwrap(); + assert_eq!(vec![message], messages_received); + } + + #[tokio::test] + async fn pallas_dmq_consumer_server_blocking_success() { + let (stop_tx, stop_rx) = watch::channel(()); + let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); + let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let cardano_network = CardanoNetwork::TestNet(0); + let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new( + socket_path.to_path_buf(), + cardano_network.to_owned(), + stop_rx, + TestLogger::stdout(), + )); + dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap(); + let message = fake_msg(); + let client = tokio::spawn({ + async move { + tokio::time::sleep(Duration::from_secs(1)).await; + + // client setup + let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap(); + + // init local msg notification client + let client_msg = client.msg_notification(); + assert_eq!(*client_msg.state(), localmsgnotification::State::Idle); + + // client sends a blocking request to server and waits for a reply from the server + client_msg.send_request_messages_blocking().await.unwrap(); + assert_eq!( + *client_msg.state(), + localmsgnotification::State::BusyBlocking + ); + + let reply = client_msg.recv_next_reply().await.unwrap(); + assert_eq!(*client_msg.state(), localmsgnotification::State::Idle); + let result = match reply { + localmsgnotification::Reply(messages, false) => Ok(messages), + _ => Err(anyhow::anyhow!( + "Failed to receive blocking reply from DMQ server" + )), + }; + + // stop the consumer server + stop_tx.send(()).unwrap(); + + result + } + }); + let message_clone = message.clone(); + let _signature_dmq_tx_clone = signature_dmq_tx.clone(); + let recorder = tokio::spawn(async move { + _signature_dmq_tx_clone.send(message_clone.into()).unwrap(); + }); + + let (_, messages_res, _) = tokio::join!(dmq_consumer_server.run(), client, recorder); + let messages_received: Vec<_> = messages_res.unwrap().unwrap(); + assert_eq!(vec![message], messages_received); + } + + #[tokio::test] + async fn pallas_dmq_consumer_server_blocking_blocks_when_no_message_available() { + let (_stop_tx, stop_rx) = watch::channel(()); + let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); + let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let cardano_network = CardanoNetwork::TestNet(0); + let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new( + socket_path.to_path_buf(), + cardano_network.to_owned(), + stop_rx, + TestLogger::stdout(), + )); + dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap(); + let client = tokio::spawn({ + async move { + // client setup + let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap(); + + // init local msg notification client + let client_msg = client.msg_notification(); + assert_eq!(*client_msg.state(), localmsgnotification::State::Idle); + + // client sends a blocking request to server and waits for a reply from the server + client_msg.send_request_messages_blocking().await.unwrap(); + assert_eq!( + *client_msg.state(), + localmsgnotification::State::BusyBlocking + ); + + client_msg.recv_next_reply().await.unwrap(); + } + }); + let _signature_dmq_tx_clone = signature_dmq_tx.clone(); + + let result = tokio::select!( + _res = sleep(Duration::from_millis(1000)) => {Err(anyhow!("Timeout"))}, + _res = dmq_consumer_server.run() => {Ok(())}, + _res = client => {Ok(())}, + ); + + result.expect_err("Should have timed out"); + } +} diff --git a/internal/mithril-dmq/src/consumer/server/queue.rs b/internal/mithril-dmq/src/consumer/server/queue.rs new file mode 100644 index 00000000000..b3681b4c092 --- /dev/null +++ b/internal/mithril-dmq/src/consumer/server/queue.rs @@ -0,0 +1,169 @@ +use std::collections::VecDeque; + +use tokio::sync::{Mutex, Notify}; + +use crate::DmqMessage; + +/// A queue for storing DMQ messages. +pub(crate) struct MessageQueue { + messages: Mutex>, + new_message_notify: Notify, +} + +impl MessageQueue { + /// Creates a new instance of [BlockingNonBlockingQueue]. + pub fn new() -> Self { + Self { + messages: Mutex::new(VecDeque::new()), + new_message_notify: Notify::new(), + } + } + + /// Enqueues a new message into the queue. + pub async fn enqueue(&self, message: DmqMessage) { + let mut message_queue_guard = self.messages.lock().await; + (*message_queue_guard).push_back(message); + + self.new_message_notify.notify_waiters(); + } + + /// Returns the messages from the queue in a non blocking way, if available. + pub async fn dequeue_non_blocking(&self, limit: Option) -> Vec { + let mut message_queue_guard = self.messages.lock().await; + let limit = limit.unwrap_or((*message_queue_guard).len()); + let mut messages = Vec::new(); + for _ in 0..limit { + if let Some(message) = (*message_queue_guard).pop_front() { + messages.push(message); + } + } + + messages + } + + /// Returns the messages from the queue in a blocking way, waiting for new messages if necessary. + pub async fn dequeue_blocking(&self, limit: Option) -> Vec { + loop { + let messages = self.dequeue_non_blocking(limit).await; + if !messages.is_empty() { + return messages; + } + + self.new_message_notify.notified().await; + } + } + + /// Checks if the message queue is empty. + pub async fn is_empty(&self) -> bool { + self.len().await == 0 + } + + /// Get the length of the message queue. + pub async fn len(&self) -> usize { + let message_queue_guard = self.messages.lock().await; + (*message_queue_guard).len() + } +} + +#[cfg(test)] +mod tests { + use std::{ops::RangeInclusive, time::Duration}; + + use anyhow::anyhow; + use pallas_network::miniprotocols::localmsgsubmission::DmqMsg; + use tokio::time::sleep; + + use super::*; + + fn fake_msg() -> DmqMsg { + DmqMsg { + msg_id: vec![0, 1], + msg_body: vec![0, 1, 2], + block_number: 10, + ttl: 100, + kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![0, 1, 2, 3, 4], + kes_period: 10, + } + } + + fn fake_messages(range: RangeInclusive) -> Vec { + range + .map(|i| { + DmqMsg { + msg_id: vec![i], + ..fake_msg() + } + .into() + }) + .collect::>() + } + + #[tokio::test] + async fn enqueue_and_dequeue_non_blocking_no_limit() { + let queue = MessageQueue::new(); + let messages = fake_messages(1..=5); + for message in messages.clone() { + queue.enqueue(message).await; + } + let limit = None; + + let dequeued_messages = queue.dequeue_non_blocking(limit).await; + + assert_eq!(messages, dequeued_messages); + } + + #[tokio::test] + async fn enqueue_and_dequeue_non_blocking_with_limit() { + let queue = MessageQueue::new(); + let messages = fake_messages(1..=5); + for message in messages.clone() { + queue.enqueue(message).await; + } + let limit = Some(2); + + let dequeued_messages = queue.dequeue_non_blocking(limit).await; + + assert_eq!(messages[0..=1].to_vec(), dequeued_messages); + } + + #[tokio::test] + async fn enqueue_and_dequeue_blocking_no_limit() { + let queue = MessageQueue::new(); + let messages = fake_messages(1..=5); + for message in messages.clone() { + queue.enqueue(message).await; + } + let limit = None; + + let dequeued_messages = queue.dequeue_blocking(limit).await; + + assert_eq!(messages, dequeued_messages); + } + + #[tokio::test] + async fn enqueue_and_dequeue_blocking_with_limit() { + let queue = MessageQueue::new(); + let messages = fake_messages(1..=5); + for message in messages.clone() { + queue.enqueue(message).await; + } + let limit = Some(2); + + let dequeued_messages = queue.dequeue_blocking(limit).await; + + assert_eq!(messages[0..=1].to_vec(), dequeued_messages); + } + + #[tokio::test] + async fn dequeue_blocking_blocks_when_no_message_available() { + let queue = MessageQueue::new(); + + let result = tokio::select!( + _res = sleep(Duration::from_millis(100)) => {Err(anyhow!("Timeout"))}, + _res = queue.dequeue_blocking(None) => {Ok(())}, + ); + + result.expect_err("Should have timed out"); + } +} diff --git a/internal/mithril-dmq/src/lib.rs b/internal/mithril-dmq/src/lib.rs index db7536fc64e..21abe080378 100644 --- a/internal/mithril-dmq/src/lib.rs +++ b/internal/mithril-dmq/src/lib.rs @@ -2,13 +2,17 @@ //! This crate provides mechanisms to publish and consume messages of a Decentralized Message Queue network through a DMQ node. mod consumer; -mod message; +mod model; mod publisher; pub mod test; -pub use consumer::{DmqConsumer, DmqConsumerPallas}; -pub use message::DmqMessageBuilder; -pub use publisher::{DmqPublisher, DmqPublisherPallas}; +#[cfg(unix)] +pub use consumer::DmqConsumerServerPallas; +pub use consumer::{DmqConsumerClient, DmqConsumerClientPallas, DmqConsumerServer}; +pub use model::{DmqMessage, DmqMessageBuilder}; +#[cfg(unix)] +pub use publisher::DmqPublisherServerPallas; +pub use publisher::{DmqPublisherClient, DmqPublisherClientPallas, DmqPublisherServer}; #[cfg(test)] pub(crate) mod test_tools { diff --git a/internal/mithril-dmq/src/message.rs b/internal/mithril-dmq/src/model/builder.rs similarity index 87% rename from internal/mithril-dmq/src/message.rs rename to internal/mithril-dmq/src/model/builder.rs index 08de36ae2c0..4c11a7a9e14 100644 --- a/internal/mithril-dmq/src/message.rs +++ b/internal/mithril-dmq/src/model/builder.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, vec}; use anyhow::{Context, anyhow}; use blake2::{Blake2b, Digest, digest::consts::U64}; @@ -10,6 +10,8 @@ use mithril_common::{ crypto_helper::{KesSigner, TryToBytes}, }; +use crate::model::DmqMessage; + /// The TTL (Time To Live) for DMQ messages in blocks. const DMQ_MESSAGE_TTL_IN_BLOCKS: u16 = 100; @@ -38,7 +40,7 @@ impl DmqMessageBuilder { } /// Builds a DMQ message from the provided message bytes. - pub async fn build(&self, message_bytes: &[u8]) -> StdResult { + pub async fn build(&self, message_bytes: &[u8]) -> StdResult { fn compute_msg_id(dmq_message: &DmqMsg) -> Vec { let mut hasher = Blake2b::::new(); hasher.update(&dmq_message.msg_body); @@ -63,16 +65,18 @@ impl DmqMessageBuilder { let block_number = (*block_number) .try_into() .with_context(|| "Failed to convert block number to u32")?; - let (kes_signature, operational_certificate) = self + /* let (kes_signature, operational_certificate) = self .kes_signer .sign(message_bytes, block_number) .with_context(|| "Failed to KES sign message while building DMQ message")?; + // TODO: fix the computation of the KES period which is not done correctly in the pallas chain observer let kes_period = self .chain_observer .get_current_kes_period(&operational_certificate) .await .with_context(|| "Failed to get KES period while building DMQ message")? - .unwrap_or_default(); + .unwrap_or_default() + - operational_certificate.start_kes_period as u32; let mut dmq_message = DmqMsg { msg_id: vec![], msg_body: message_bytes.to_vec(), @@ -81,10 +85,20 @@ impl DmqMessageBuilder { kes_signature: kes_signature.to_bytes_vec()?, operational_certificate: operational_certificate.to_bytes_vec()?, kes_period, + }; */ + let kes_period = 0; + let mut dmq_message = DmqMsg { + msg_id: vec![], + msg_body: message_bytes.to_vec(), + block_number, + ttl: self.ttl_blocks, + kes_signature: vec![], + operational_certificate: vec![], + kes_period, }; dmq_message.msg_id = compute_msg_id(&dmq_message); - Ok(dmq_message) + Ok(dmq_message.into()) } } @@ -147,7 +161,7 @@ mod tests { }, DmqMsg { msg_id: vec![], - ..dmq_message + ..dmq_message.into() } ); } diff --git a/internal/mithril-dmq/src/model/message.rs b/internal/mithril-dmq/src/model/message.rs new file mode 100644 index 00000000000..3161d5a6cef --- /dev/null +++ b/internal/mithril-dmq/src/model/message.rs @@ -0,0 +1,100 @@ +use std::ops::{Deref, DerefMut}; + +use pallas_codec::minicbor::{Decode, Decoder, Encode, Encoder}; +use pallas_network::miniprotocols::localmsgsubmission::DmqMsg; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +/// Type alias for a DMQ message. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DmqMessage(DmqMsg); + +#[derive(Serialize, Deserialize)] +struct RawBytes(#[serde(with = "serde_bytes")] Vec); + +impl Deref for DmqMessage { + type Target = DmqMsg; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DmqMessage { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From for DmqMessage { + fn from(msg: DmqMsg) -> Self { + Self(msg) + } +} + +impl From for DmqMsg { + fn from(msg: DmqMessage) -> Self { + msg.0 + } +} + +impl Serialize for DmqMessage { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let raw_bytes = RawBytes({ + let mut e = Encoder::new(Vec::new()); + self.0.encode(&mut e, &mut ()).map_err(|e| { + serde::ser::Error::custom(format!("DMQ message serialization error: {e}")) + })?; + Ok(e.into_writer()) + }?); + + raw_bytes.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for DmqMessage { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let raw_bytes = RawBytes::deserialize(deserializer)?; + let res = DmqMsg::decode(&mut Decoder::new(&raw_bytes.0), &mut ()) + .map_err(|e| { + serde::de::Error::custom(format!("DMQ message deserialization error: {e}")) + })? + .into(); + + Ok(res) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dmq_message_serialize_deserialize() { + let dmq_msg = DmqMsg { + msg_id: vec![1, 2, 3], + msg_body: vec![4, 5, 6], + block_number: 123, + ttl: 10, + kes_signature: vec![7, 8, 9], + operational_certificate: vec![10, 11, 12], + kes_period: 0, + }; + + let dmq_message = DmqMessage::from(dmq_msg.clone()); + let serialized = bincode::serde::encode_to_vec(&dmq_message, bincode::config::standard()) + .expect("Serialization failed"); + + let (deserialized, _) = + bincode::serde::decode_from_slice(&serialized, bincode::config::standard()) + .expect("Deserialization failed"); + + assert_eq!(dmq_message, deserialized); + assert_eq!(dmq_message.0, dmq_msg); + } +} diff --git a/internal/mithril-dmq/src/model/mod.rs b/internal/mithril-dmq/src/model/mod.rs new file mode 100644 index 00000000000..959c5f7ee05 --- /dev/null +++ b/internal/mithril-dmq/src/model/mod.rs @@ -0,0 +1,5 @@ +mod builder; +mod message; + +pub use builder::*; +pub use message::*; diff --git a/internal/mithril-dmq/src/publisher/interface.rs b/internal/mithril-dmq/src/publisher/client/interface.rs similarity index 62% rename from internal/mithril-dmq/src/publisher/interface.rs rename to internal/mithril-dmq/src/publisher/client/interface.rs index d5d5ed25098..f58a6687fcf 100644 --- a/internal/mithril-dmq/src/publisher/interface.rs +++ b/internal/mithril-dmq/src/publisher/client/interface.rs @@ -1,9 +1,9 @@ use mithril_common::{StdResult, crypto_helper::TryToBytes}; -/// Trait for publishing messages from a DMQ node. +/// Trait for the client side of publishing messages from a DMQ node. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] -pub trait DmqPublisher: Send + Sync { +pub trait DmqPublisherClient: Send + Sync { /// Publishes a message to the DMQ node. async fn publish_message(&self, message: M) -> StdResult<()>; } diff --git a/internal/mithril-dmq/src/publisher/client/mod.rs b/internal/mithril-dmq/src/publisher/client/mod.rs new file mode 100644 index 00000000000..4035f6c0659 --- /dev/null +++ b/internal/mithril-dmq/src/publisher/client/mod.rs @@ -0,0 +1,5 @@ +mod interface; +mod pallas; + +pub use interface::*; +pub use pallas::*; diff --git a/internal/mithril-dmq/src/publisher/pallas.rs b/internal/mithril-dmq/src/publisher/client/pallas.rs similarity index 91% rename from internal/mithril-dmq/src/publisher/pallas.rs rename to internal/mithril-dmq/src/publisher/client/pallas.rs index 55f8329584b..a8e6b3eb8e2 100644 --- a/internal/mithril-dmq/src/publisher/pallas.rs +++ b/internal/mithril-dmq/src/publisher/client/pallas.rs @@ -8,12 +8,12 @@ use mithril_common::{ CardanoNetwork, StdResult, crypto_helper::TryToBytes, logging::LoggerExtensions, }; -use crate::{DmqMessageBuilder, DmqPublisher}; +use crate::{DmqMessageBuilder, DmqPublisherClient}; -/// A DMQ publisher implementation. +/// A DMQ client publisher implementation. /// /// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas. -pub struct DmqPublisherPallas { +pub struct DmqPublisherClientPallas { socket: PathBuf, network: CardanoNetwork, dmq_message_builder: DmqMessageBuilder, @@ -21,8 +21,8 @@ pub struct DmqPublisherPallas { phantom: PhantomData, } -impl DmqPublisherPallas { - /// Creates a new instance of [DmqPublisherPallas]. +impl DmqPublisherClientPallas { + /// Creates a new instance of [DmqPublisherClientPallas]. pub fn new( socket: PathBuf, network: CardanoNetwork, @@ -43,12 +43,12 @@ impl DmqPublisherPallas { let magic = self.network.magic_id(); DmqClient::connect(&self.socket, magic) .await - .with_context(|| "DmqPublisherPallas failed to create a new client") + .with_context(|| "DmqPublisherClientPallas failed to create a new client") } } #[async_trait::async_trait] -impl DmqPublisher for DmqPublisherPallas { +impl DmqPublisherClient for DmqPublisherClientPallas { async fn publish_message(&self, message: M) -> StdResult<()> { debug!( self.logger, @@ -64,7 +64,7 @@ impl DmqPublisher for DmqPublisherPallas .with_context(|| "Failed to build DMQ message")?; client .msg_submission() - .send_submit_tx(dmq_message) + .send_submit_tx(dmq_message.into()) .await .with_context(|| "Failed to submit DMQ message")?; let response = client.msg_submission().recv_submit_tx_response().await?; @@ -147,7 +147,7 @@ mod tests { let reply_success = true; let server = setup_dmq_server(socket_path.clone(), reply_success); let client = tokio::spawn(async move { - let publisher = DmqPublisherPallas::new( + let publisher = DmqPublisherClientPallas::new( socket_path, CardanoNetwork::TestNet(0), DmqMessageBuilder::new( @@ -181,7 +181,7 @@ mod tests { let reply_success = false; let server = setup_dmq_server(socket_path.clone(), reply_success); let client = tokio::spawn(async move { - let publisher = DmqPublisherPallas::new( + let publisher = DmqPublisherClientPallas::new( socket_path, CardanoNetwork::TestNet(0), DmqMessageBuilder::new( diff --git a/internal/mithril-dmq/src/publisher/mod.rs b/internal/mithril-dmq/src/publisher/mod.rs index 4035f6c0659..24bf375cd40 100644 --- a/internal/mithril-dmq/src/publisher/mod.rs +++ b/internal/mithril-dmq/src/publisher/mod.rs @@ -1,5 +1,5 @@ -mod interface; -mod pallas; +mod client; +mod server; -pub use interface::*; -pub use pallas::*; +pub use client::*; +pub use server::*; diff --git a/internal/mithril-dmq/src/publisher/server/interface.rs b/internal/mithril-dmq/src/publisher/server/interface.rs new file mode 100644 index 00000000000..822a93f766f --- /dev/null +++ b/internal/mithril-dmq/src/publisher/server/interface.rs @@ -0,0 +1,12 @@ +use mithril_common::StdResult; + +/// Trait for the server side of publishing messages from a DMQ node. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait DmqPublisherServer: Send + Sync { + /// Processes the next message received from the DMQ client. + async fn process_message(&self) -> StdResult<()>; + + /// Runs the DMQ publisher server. + async fn run(&self) -> StdResult<()>; +} diff --git a/internal/mithril-dmq/src/publisher/server/mod.rs b/internal/mithril-dmq/src/publisher/server/mod.rs new file mode 100644 index 00000000000..80dea9d418d --- /dev/null +++ b/internal/mithril-dmq/src/publisher/server/mod.rs @@ -0,0 +1,7 @@ +mod interface; +#[cfg(unix)] +mod pallas; + +pub use interface::*; +#[cfg(unix)] +pub use pallas::*; diff --git a/internal/mithril-dmq/src/publisher/server/pallas.rs b/internal/mithril-dmq/src/publisher/server/pallas.rs new file mode 100644 index 00000000000..26e321e0938 --- /dev/null +++ b/internal/mithril-dmq/src/publisher/server/pallas.rs @@ -0,0 +1,327 @@ +use std::{fs, path::PathBuf}; + +use anyhow::{Context, anyhow}; +use pallas_network::{ + facades::DmqServer, + miniprotocols::{ + localmsgsubmission::DmqMsgValidationError, + localtxsubmission::{Request, Response}, + }, +}; +use tokio::{ + net::UnixListener, + select, + sync::{Mutex, MutexGuard, mpsc::UnboundedSender, watch::Receiver}, +}; + +use slog::{Logger, debug, error, info, warn}; + +use mithril_common::{CardanoNetwork, StdResult, logging::LoggerExtensions}; + +use crate::{DmqMessage, DmqPublisherServer}; + +/// A DMQ server implementation for messages publication to a DMQ node. +pub struct DmqPublisherServerPallas { + socket: PathBuf, + network: CardanoNetwork, + server: Mutex>, + transmitters: Mutex>>, + stop_rx: Receiver<()>, + logger: Logger, +} + +impl DmqPublisherServerPallas { + /// Creates a new instance of [DmqPublisherServerPallas]. + pub fn new( + socket: PathBuf, + network: CardanoNetwork, + stop_rx: Receiver<()>, + logger: Logger, + ) -> Self { + Self { + socket, + network, + server: Mutex::new(None), + transmitters: Mutex::new(Vec::new()), + stop_rx, + logger: logger.new_with_component_name::(), + } + } + + /// Creates and returns a new `DmqServer` connected to the specified socket. + async fn new_server(&self) -> StdResult { + info!( + self.logger, + "Creating a new DMQ publisher server"; + "socket" => ?self.socket, + "network" => ?self.network + ); + let magic = self.network.magic_id(); + if self.socket.exists() { + fs::remove_file(self.socket.clone())?; + } + let listener = UnixListener::bind(&self.socket) + .map_err(|err| anyhow!(err)) + .with_context(|| { + format!( + "DmqPublisherServerPallas failed to bind Unix socket at {}", + self.socket.display() + ) + })?; + + DmqServer::accept(&listener, magic) + .await + .map_err(|err| anyhow!(err)) + .with_context(|| "DmqPublisherServerPallas failed to create a new server") + } + + /// Gets the cached `DmqServer`, creating a new one if it does not exist. + async fn get_server(&self) -> StdResult>> { + { + // Run this in a separate block to avoid dead lock on the Mutex + let server_lock = self.server.lock().await; + if server_lock.as_ref().is_some() { + return Ok(server_lock); + } + } + + let mut server_lock = self.server.lock().await; + *server_lock = Some(self.new_server().await?); + + Ok(server_lock) + } + + /// Drops the current `DmqServer`, if it exists. + async fn drop_server(&self) -> StdResult<()> { + debug!( + self.logger, + "Drop existing DMQ publisher server"; + "socket" => ?self.socket, + "network" => ?self.network + ); + let mut server_lock = self.server.lock().await; + if let Some(server) = server_lock.take() { + server.abort().await; + } + + Ok(()) + } + + /// Registers a transmitter for DMQ messages. + pub async fn register_transmitter( + &self, + transmitter: UnboundedSender, + ) -> StdResult<()> { + debug!(self.logger, "Register message transmitter for DMQ messages"); + let mut transmitters_guard = self.transmitters.lock().await; + transmitters_guard.push(transmitter); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl DmqPublisherServer for DmqPublisherServerPallas { + async fn process_message(&self) -> StdResult<()> { + debug!( + self.logger, + "Waiting for message to publish to the DMQ network"; + "socket" => ?self.socket, + "network" => ?self.network + ); + let mut server_guard = self.get_server().await?; + let server = server_guard + .as_mut() + .ok_or(anyhow!("DMQ publisher server does not exist"))?; + + let request = server + .msg_submission() + .recv_next_request() + .await + .map_err(|err| anyhow!("Failed to receive next DMQ message: {}", err))?; + let (dmq_message, response) = match request { + Request::Submit(dmq_message) => { + debug!(self.logger, "Received message to publish to DMQ"); + (Some(dmq_message), Response::Accepted) + } + request => { + error!( + self.logger, + "Expected a Submit request, but received: {request:?}" + ); + ( + None, + Response::Rejected(DmqMsgValidationError(format!( + "Expected a Submit request, but received: {request:?}" + ))), + ) + } + }; + server + .msg_submission() + .send_submit_tx_response(response) + .await + .map_err(|err| anyhow!("Failed to send response to DMQ publisher client: {}", err))?; + + if let Some(dmq_message) = dmq_message { + for transmitter in self.transmitters.lock().await.iter() { + if let Err(err) = transmitter.send(dmq_message.to_owned().into()) { + error!( + self.logger, + "Failed to send DMQ message to transmitter"; + "error" => ?err + ); + } + } + } + + let request = server.msg_submission().recv_next_request().await.map_err(|err| { + anyhow!( + "Failed to receive next request from DMQ publisher client: {}", + err + ) + })?; + match request { + Request::Done => { + debug!( + self.logger, + "Received Done request from DMQ publisher client" + ); + } + _ => { + error!( + self.logger, + "Expected a Done request, but received: {request:?}" + ); + return Err(anyhow!( + "Expected a Done request, but received: {request:?}" + )); + } + } + + Ok(()) + } + + async fn run(&self) -> StdResult<()> { + info!( + self.logger, + "Starting DMQ publisher server"; + "socket" => ?self.socket, + "network" => ?self.network + ); + + let mut stop_rx = self.stop_rx.clone(); + loop { + select! { + _ = stop_rx.changed() => { + warn!(self.logger, "Stopping DMQ publisher server..."); + + return Ok(()); + } + res = self.process_message() => { + match res { + Ok(_) => { + debug!(self.logger, "Processed a message successfully"); + } + Err(err) => { + error!(self.logger, "Failed to process message"; "error" => ?err); + } + } + if let Err(drop_err) = self.drop_server().await { + error!(self.logger, "Failed to drop DMQ publisher server"; "error" => ?drop_err); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use pallas_network::{ + facades::DmqClient, + miniprotocols::{localmsgsubmission::DmqMsg, localtxsubmission}, + }; + use tokio::sync::{mpsc::unbounded_channel, watch}; + + use mithril_common::{current_function, test_utils::TempDir}; + + use crate::test_tools::TestLogger; + + use super::*; + + fn create_temp_dir(folder_name: &str) -> PathBuf { + TempDir::create_with_short_path("dmq_publisher_server", folder_name) + } + + async fn fake_msg() -> DmqMsg { + DmqMsg { + msg_id: vec![0, 1], + msg_body: vec![2, 3, 4, 5], + block_number: 10, + ttl: 100, + kes_signature: vec![0, 1, 2, 3], + operational_certificate: vec![0, 1, 2, 3, 4, 5], + kes_period: 10, + } + } + + #[tokio::test] + async fn pallas_dmq_publisher_server_success() { + let (stop_tx, stop_rx) = watch::channel(()); + let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); + let socket_path = create_temp_dir(current_function!()).join("node.socket"); + let cardano_network = CardanoNetwork::TestNet(0); + let dmq_publisher_server = Arc::new(DmqPublisherServerPallas::new( + socket_path.to_path_buf(), + cardano_network.to_owned(), + stop_rx, + TestLogger::stdout(), + )); + dmq_publisher_server + .register_transmitter(signature_dmq_tx) + .await + .unwrap(); + let message = fake_msg().await; + let message_clone = message.clone(); + let client = tokio::spawn({ + async move { + // client setup + let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap(); + + // init local msg submission client + let client_msg = client.msg_submission(); + assert_eq!(*client_msg.state(), localtxsubmission::State::Idle); + + // client sends a request to server and waits for a reply from the server + client_msg.send_submit_tx(message_clone).await.unwrap(); + assert_eq!(*client_msg.state(), localtxsubmission::State::Busy); + + let response = client_msg.recv_submit_tx_response().await.unwrap(); + assert_eq!(*client_msg.state(), localtxsubmission::State::Idle); + assert_eq!(response, localtxsubmission::Response::Accepted); + } + }); + let recorder = tokio::spawn(async move { + let result = { + let mut signature_dmq_rx = signature_dmq_rx; + if let Some(message) = signature_dmq_rx.recv().await { + return Ok(message); + } + + Err(anyhow::anyhow!("No message received in recorder")) + }; + stop_tx + .send(()) + .expect("Failed to send stop signal to DMQ publisher server"); + + result + }); + + let (_, _, message_res) = tokio::join!(dmq_publisher_server.run(), client, recorder); + let message_received = message_res.unwrap().unwrap(); + assert_eq!(message, message_received.into()); + } +} diff --git a/internal/mithril-dmq/src/test/double/consumer.rs b/internal/mithril-dmq/src/test/double/consumer.rs index 23ff0cc3e03..45182b93b2b 100644 --- a/internal/mithril-dmq/src/test/double/consumer.rs +++ b/internal/mithril-dmq/src/test/double/consumer.rs @@ -4,11 +4,11 @@ use tokio::sync::Mutex; use mithril_common::{StdResult, crypto_helper::TryFromBytes, entities::PartyId}; -use crate::DmqConsumer; +use crate::DmqConsumerClient; type ConsumerReturn = StdResult>; -/// A fake implementation of the [DmqConsumer] trait for testing purposes. +/// A fake implementation of the [DmqConsumerClient] trait for testing purposes. pub struct DmqConsumerFake { results: Mutex>>, } @@ -23,7 +23,7 @@ impl DmqConsumerFake { } #[async_trait::async_trait] -impl DmqConsumer for DmqConsumerFake { +impl DmqConsumerClient for DmqConsumerFake { async fn consume_messages(&self) -> ConsumerReturn { let mut results = self.results.lock().await; diff --git a/internal/mithril-dmq/src/test/double/publisher.rs b/internal/mithril-dmq/src/test/double/publisher.rs index 0cc8d5e7032..4f6ca39a6d7 100644 --- a/internal/mithril-dmq/src/test/double/publisher.rs +++ b/internal/mithril-dmq/src/test/double/publisher.rs @@ -4,9 +4,9 @@ use tokio::sync::Mutex; use mithril_common::{StdResult, crypto_helper::TryToBytes}; -use crate::DmqPublisher; +use crate::DmqPublisherClient; -/// A fake implementation of the [DmqPublisher] trait for testing purposes. +/// A fake implementation of the [DmqPublisherClient] trait for testing purposes. pub struct DmqPublisherFake { results: Mutex>>, phantom: PhantomData, @@ -23,7 +23,7 @@ impl DmqPublisherFake { } #[async_trait::async_trait] -impl DmqPublisher for DmqPublisherFake { +impl DmqPublisherClient for DmqPublisherFake { async fn publish_message(&self, _message: M) -> StdResult<()> { let mut results = self.results.lock().await; diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index fb6c02bd0a0..d139b310e7e 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -10,7 +10,7 @@ license = { workspace = true } repository = { workspace = true } [features] -default = ["jemallocator"] +default = ["jemallocator", "future_dmq"] bundle_tls = ["reqwest/native-tls-vendored"] jemallocator = ["dep:tikv-jemallocator"] diff --git a/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs b/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs index 497f3262ecb..62f4e17526a 100644 --- a/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs +++ b/mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs @@ -12,7 +12,7 @@ use std::time::Duration; #[cfg(feature = "future_dmq")] use mithril_common::messages::RegisterSignatureMessageDmq; #[cfg(feature = "future_dmq")] -use mithril_dmq::DmqConsumerPallas; +use mithril_dmq::DmqConsumerClientPallas; use mithril_signed_entity_lock::SignedEntityTypeLock; use crate::database::repository::CertificateRepository; @@ -89,11 +89,12 @@ impl DependenciesBuilder { #[cfg(feature = "future_dmq")] let signature_consumer = match self.configuration.dmq_node_socket_path() { Some(dmq_node_socket_path) => { - let dmq_consumer = Arc::new(DmqConsumerPallas::::new( - dmq_node_socket_path, - self.configuration.get_network()?, - self.root_logger(), - )); + let dmq_consumer = + Arc::new(DmqConsumerClientPallas::::new( + dmq_node_socket_path, + self.configuration.get_network()?, + self.root_logger(), + )); Arc::new(SignatureConsumerDmq::new(dmq_consumer)) as Arc } _ => Arc::new(SignatureConsumerNoop) as Arc, diff --git a/mithril-aggregator/src/services/signature_consumer/dmq.rs b/mithril-aggregator/src/services/signature_consumer/dmq.rs index ffc401d0dc7..08b8461d042 100644 --- a/mithril-aggregator/src/services/signature_consumer/dmq.rs +++ b/mithril-aggregator/src/services/signature_consumer/dmq.rs @@ -9,18 +9,18 @@ use mithril_common::{ messages::RegisterSignatureMessageDmq, }; -use mithril_dmq::DmqConsumer; +use mithril_dmq::DmqConsumerClient; use super::SignatureConsumer; /// DMQ implementation of the [SignatureConsumer] trait. pub struct SignatureConsumerDmq { - dmq_consumer: Arc>, + dmq_consumer: Arc>, } impl SignatureConsumerDmq { /// Creates a new instance of [SignatureConsumerDmq]. - pub fn new(dmq_consumer: Arc>) -> Self { + pub fn new(dmq_consumer: Arc>) -> Self { Self { dmq_consumer } } } diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index 1409e5f2489..0067fd774da 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -22,7 +22,7 @@ ignored = ["serde_bytes"] crate-type = ["lib", "cdylib", "staticlib"] [features] -default = ["rug-backend"] +default = ["rug-backend", "allow_skip_signer_certification"] # Full feature set full = ["test_tools"] diff --git a/mithril-infra/assets/docker/docker-compose-aggregator-p2p-dmq-override.yaml b/mithril-infra/assets/docker/docker-compose-aggregator-p2p-dmq-override.yaml new file mode 100644 index 00000000000..36510a687b0 --- /dev/null +++ b/mithril-infra/assets/docker/docker-compose-aggregator-p2p-dmq-override.yaml @@ -0,0 +1,13 @@ +services: + mithril-aggregator: + environment: + - DMQ_NODE_SOCKET_PATH=/ipc/dmq.socket + - NETWORK=${NETWORK} + - NETWORK_MAGIC=${NETWORK_MAGIC} + mithril-aggregator-relay: + volumes: + - ../data/${NETWORK}/mithril-aggregator/cardano/ipc:/ipc + environment: + - DMQ_NODE_SOCKET_PATH=/ipc/dmq.socket + - NETWORK=${NETWORK} + - NETWORK_MAGIC=${NETWORK_MAGIC} diff --git a/mithril-infra/assets/docker/docker-compose-signer-p2p-dmq-override.yaml b/mithril-infra/assets/docker/docker-compose-signer-p2p-dmq-override.yaml new file mode 100644 index 00000000000..b3cf77d22d2 --- /dev/null +++ b/mithril-infra/assets/docker/docker-compose-signer-p2p-dmq-override.yaml @@ -0,0 +1,12 @@ +services: + mithril-signer: + environment: + - DMQ_NODE_SOCKET_PATH=/ipc/dmq.socket + - NETWORK_MAGIC=${NETWORK_MAGIC} + mithril-signer-relay: + volumes: + - ../data/${NETWORK}/mithril-signer-${SIGNER_ID}/cardano/ipc:/ipc + environment: + - DMQ_NODE_SOCKET_PATH=/ipc/dmq.socket + - NETWORK=${NETWORK} + - NETWORK_MAGIC=${NETWORK_MAGIC} diff --git a/mithril-infra/mithril.aggregator.tf b/mithril-infra/mithril.aggregator.tf index 0f3885ec2a5..48d24262975 100644 --- a/mithril-infra/mithril.aggregator.tf +++ b/mithril-infra/mithril.aggregator.tf @@ -76,6 +76,7 @@ EOT inline = [ "set -e", "export NETWORK=${var.cardano_network}", + "export NETWORK_MAGIC=${var.cardano_network_magic_map[var.cardano_network]}", "export CARDANO_IMAGE_ID=${var.cardano_image_id}", "export CARDANO_IMAGE_REGISTRY=${var.cardano_image_registry}", "export MITHRIL_IMAGE_ID=${var.mithril_image_id}", @@ -157,6 +158,10 @@ fi if [ "${local.mithril_aggregator_is_follower}" = "true" ]; then DOCKER_COMPOSE_FILES="$DOCKER_COMPOSE_FILES -f $DOCKER_DIRECTORY/docker-compose-aggregator-follower-override.yaml" fi +# Support for DMQ protocol +if [ "${var.mithril_p2p_use_dmq_protocol}" = "true" ]; then + DOCKER_COMPOSE_FILES="$DOCKER_COMPOSE_FILES -f $DOCKER_DIRECTORY/docker-compose-aggregator-p2p-dmq-override.yaml" +fi EOT , "docker compose $DOCKER_COMPOSE_FILES --profile all up -d", diff --git a/mithril-infra/mithril.signer.tf b/mithril-infra/mithril.signer.tf index b5874cc04f1..a2f706c468c 100644 --- a/mithril-infra/mithril.signer.tf +++ b/mithril-infra/mithril.signer.tf @@ -91,6 +91,7 @@ EOT "export SIGNER_ID=${each.key}", "export PARTY_ID=${each.value.pool_id}", "export NETWORK=${var.cardano_network}", + "export NETWORK_MAGIC=${var.cardano_network_magic_map[var.cardano_network]}", "export CARDANO_IMAGE_ID=${var.cardano_image_id}", "export CARDANO_IMAGE_REGISTRY=${var.cardano_image_registry}", "export MITHRIL_IMAGE_ID=${var.mithril_image_id}", @@ -169,6 +170,10 @@ if [ "${var.mithril_use_p2p_network}" = "true" ]; then DOCKER_COMPOSE_FILES="$DOCKER_COMPOSE_FILES -f $DOCKER_DIRECTORY/docker-compose-signer-p2p-bootstrap-override.yaml" fi fi +# Support for DMQ protocol +if [ "${var.mithril_p2p_use_dmq_protocol}" = "true" ]; then + DOCKER_COMPOSE_FILES="$DOCKER_COMPOSE_FILES -f $DOCKER_DIRECTORY/docker-compose-signer-p2p-dmq-override.yaml" +fi EOT , "docker compose -p $SIGNER_ID $DOCKER_COMPOSE_FILES --profile all up -d", diff --git a/mithril-infra/variables.tf b/mithril-infra/variables.tf index 5c2e46fcc91..73c213e9635 100644 --- a/mithril-infra/variables.tf +++ b/mithril-infra/variables.tf @@ -14,6 +14,16 @@ variable "cardano_network" { description = "The Cardano network name to attach: preview, preprod or mainnet" } +variable "cardano_network_magic_map" { + type = map(number) + description = "The Cardano network magic number mapping from Cardano network name" + default = { + "mainnet" = 764824073, + "preprod" = 1, + "preview" = 2, + } +} + locals { environment_name_short = format("%s%s", "${var.environment_prefix}-${var.cardano_network}", var.environment_suffix != "" ? "-${var.environment_suffix}" : "") environment_name = "mithril-${local.environment_name_short}" @@ -184,6 +194,12 @@ variable "mithril_use_p2p_network" { default = false } +variable "mithril_p2p_use_dmq_protocol" { + type = bool + description = "Use the Decentralized Message Queue protocol (DMQ) (experimental, for test only)" + default = false +} + variable "mithril_p2p_network_bootstrap_peer" { type = string description = "The dial to address of a bootstrap peer of the P2P network layer. Useful when setting-up a follower aggregator and signers in a different VM. (experimental, for test only)" @@ -457,6 +473,7 @@ variable "mithril_signers" { type = string pool_id = string })) + description = "The Mithril signers configuration to deploy" default = { "1" = { type = "unverified-cardano-passive", diff --git a/mithril-relay/Cargo.toml b/mithril-relay/Cargo.toml index 2cc3ed0e29d..0d2059cee11 100644 --- a/mithril-relay/Cargo.toml +++ b/mithril-relay/Cargo.toml @@ -10,11 +10,14 @@ license = { workspace = true } repository = { workspace = true } [features] +default = ["future_dmq"] + bundle_tls = ["reqwest/native-tls-vendored"] -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +future_dmq = ["dep:mithril-dmq"] [dependencies] anyhow = { workspace = true } +bincode = { version = "2.0.1" } clap = { workspace = true } config = { workspace = true } libp2p = { version = "0.56.0", features = [ @@ -35,6 +38,7 @@ libp2p = { version = "0.56.0", features = [ "yamux", ] } mithril-common = { path = "../mithril-common", features = ["full"] } +mithril-dmq = { path = "../internal/mithril-dmq", optional = true } mithril-doc = { path = "../internal/mithril-doc" } mithril-test-http-server = { path = "../internal/tests/mithril-test-http-server" } reqwest = { workspace = true, features = [ diff --git a/mithril-relay/src/commands/aggregator.rs b/mithril-relay/src/commands/aggregator.rs index e37c1873dd9..ba75b5fdf39 100644 --- a/mithril-relay/src/commands/aggregator.rs +++ b/mithril-relay/src/commands/aggregator.rs @@ -1,11 +1,18 @@ +#[cfg(feature = "future_dmq")] +use std::path::PathBuf; + use clap::Parser; use libp2p::Multiaddr; -use mithril_common::StdResult; use slog::error; -use super::CommandContext; +#[cfg(feature = "future_dmq")] +use mithril_common::CardanoNetwork; +use mithril_common::StdResult; + use crate::AggregatorRelay; +use super::CommandContext; + #[derive(Parser, Debug, Clone)] pub struct AggregatorCommand { /// Peer listening port @@ -16,6 +23,27 @@ pub struct AggregatorCommand { #[clap(long, env = "DIAL_TO")] dial_to: Option, + /// Path to the DMQ socket file + #[cfg(feature = "future_dmq")] + #[clap( + long, + env = "DMQ_NODE_SOCKET_PATH", + value_name = "PATH", + default_value = "./dmq.socket" + )] + dmq_node_socket_path: PathBuf, + + /// Cardano network + #[cfg(feature = "future_dmq")] + #[clap(long, env = "NETWORK")] + pub network: String, + + /// Cardano Network Magic number + /// useful for TestNet & DevNet + #[cfg(feature = "future_dmq")] + #[clap(long, env = "NETWORK_MAGIC")] + pub network_magic: Option, + /// Aggregator endpoint URL. #[clap(long, env = "AGGREGATOR_ENDPOINT")] aggregator_endpoint: String, @@ -24,12 +52,24 @@ pub struct AggregatorCommand { impl AggregatorCommand { /// Main command execution pub async fn execute(&self, context: CommandContext) -> StdResult<()> { + let logger = context.logger(); let dial_to = self.dial_to.to_owned(); let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", self.listen_port).parse()?; let aggregator_endpoint = self.aggregator_endpoint.to_owned(); - let logger = context.logger(); + #[cfg(feature = "future_dmq")] + let cardano_network = + CardanoNetwork::from_code(self.network.to_owned(), self.network_magic)?; - let mut relay = AggregatorRelay::start(&addr, &aggregator_endpoint, logger).await?; + let mut relay = AggregatorRelay::start( + &addr, + #[cfg(feature = "future_dmq")] + &self.dmq_node_socket_path, + #[cfg(feature = "future_dmq")] + &cardano_network, + &aggregator_endpoint, + logger, + ) + .await?; if let Some(dial_to_address) = dial_to { relay.dial_peer(dial_to_address.clone())?; } diff --git a/mithril-relay/src/commands/signer.rs b/mithril-relay/src/commands/signer.rs index 7045f5c883c..cafb6857684 100644 --- a/mithril-relay/src/commands/signer.rs +++ b/mithril-relay/src/commands/signer.rs @@ -1,12 +1,18 @@ +#[cfg(feature = "future_dmq")] +use std::path::PathBuf; use std::time::Duration; use clap::Parser; use libp2p::Multiaddr; -use mithril_common::StdResult; use slog::error; +#[cfg(feature = "future_dmq")] +use mithril_common::CardanoNetwork; +use mithril_common::StdResult; + +use crate::{SignerRelay, SignerRelayConfiguration, SignerRelayMode}; + use super::CommandContext; -use crate::{SignerRelay, SignerRelayMode}; #[derive(Parser, Debug, Clone)] pub struct SignerCommand { @@ -22,6 +28,27 @@ pub struct SignerCommand { #[clap(long, env = "DIAL_TO")] dial_to: Option, + /// Path to the DMQ socket file + #[cfg(feature = "future_dmq")] + #[clap( + long, + env = "DMQ_NODE_SOCKET_PATH", + value_name = "PATH", + default_value = "./dmq.socket" + )] + dmq_node_socket_path: PathBuf, + + /// Cardano network + #[cfg(feature = "future_dmq")] + #[clap(long, env = "NETWORK")] + pub network: String, + + /// Cardano Network Magic number + /// useful for TestNet & DevNet + #[cfg(feature = "future_dmq")] + #[clap(long, env = "NETWORK_MAGIC")] + pub network_magic: Option, + /// Aggregator endpoint URL. #[clap(long, env = "AGGREGATOR_ENDPOINT")] aggregator_endpoint: String, @@ -50,16 +77,23 @@ impl SignerCommand { let signature_registration_mode = &self.signature_registration_mode; let aggregator_endpoint = self.aggregator_endpoint.to_owned(); let signer_repeater_delay = Duration::from_millis(self.signer_repeater_delay); + #[cfg(feature = "future_dmq")] + let cardano_network = + CardanoNetwork::from_code(self.network.to_owned(), self.network_magic)?; - let mut relay = SignerRelay::start( - &addr, - &server_port, + let mut relay = SignerRelay::start(SignerRelayConfiguration { + address: &addr, + server_port: &server_port, + #[cfg(feature = "future_dmq")] + dmq_node_socket_path: &self.dmq_node_socket_path, + #[cfg(feature = "future_dmq")] + cardano_network: &cardano_network, signer_registration_mode, signature_registration_mode, - &aggregator_endpoint, - &signer_repeater_delay, + aggregator_endpoint: &aggregator_endpoint, + signer_repeater_delay: &signer_repeater_delay, logger, - ) + }) .await?; if let Some(dial_to_address) = dial_to { relay.dial_peer(dial_to_address.clone())?; diff --git a/mithril-relay/src/lib.rs b/mithril-relay/src/lib.rs index 52e2ecc45e6..2c6ff622221 100644 --- a/mithril-relay/src/lib.rs +++ b/mithril-relay/src/lib.rs @@ -7,20 +7,21 @@ pub mod p2p; mod relay; mod repeater; -pub use commands::Args; -pub use commands::RelayCommands; -pub use relay::AggregatorRelay; -pub use relay::PassiveRelay; -pub use relay::SignerRelay; -pub use relay::SignerRelayMode; +pub use commands::{Args, RelayCommands}; +pub use relay::{ + AggregatorRelay, PassiveRelay, SignerRelay, SignerRelayConfiguration, SignerRelayMode, +}; /// The P2P topic names used by Mithril pub mod mithril_p2p_topic { - /// The topic name where signer registrations are published - pub const SIGNERS: &str = "mithril/signers"; + /// The topic name where HTTP signer registrations are published + pub const SIGNERS_HTTP: &str = "mithril/signers/http"; - /// The topic name where signatures are published - pub const SIGNATURES: &str = "mithril/signatures"; + /// The topic name where HTTP signatures are published + pub const SIGNATURES_HTTP: &str = "mithril/signatures/http"; + + /// The topic name where DMQ signatures are published + pub const SIGNATURES_DMQ: &str = "mithril/signatures/dmq"; } #[cfg(test)] diff --git a/mithril-relay/src/p2p/peer.rs b/mithril-relay/src/p2p/peer.rs index 9393d1da52d..9e8194efb7e 100644 --- a/mithril-relay/src/p2p/peer.rs +++ b/mithril-relay/src/p2p/peer.rs @@ -11,9 +11,12 @@ use libp2p::{ }; use mithril_common::{ StdResult, + crypto_helper::{TryFromBytes, TryToBytes}, logging::LoggerExtensions, messages::{RegisterSignatureMessageHttp, RegisterSignerMessage}, }; +#[cfg(feature = "future_dmq")] +use mithril_dmq::DmqMessage; use serde::{Deserialize, Serialize}; use slog::{Logger, debug, info}; use std::{collections::HashMap, time::Duration}; @@ -63,11 +66,34 @@ pub type TopicName = String; /// The broadcast message received from a Gossip sub event #[derive(Serialize, Deserialize)] pub enum BroadcastMessage { - /// A signer registration message received from the Gossip sub - RegisterSigner(RegisterSignerMessage), + /// A HTTP signer registration message received from the Gossip sub + RegisterSignerHttp(RegisterSignerMessage), - /// A signature registration message received from the Gossip sub - RegisterSignature(RegisterSignatureMessageHttp), + /// A HTTP signature registration message received from the Gossip sub + RegisterSignatureHttp(RegisterSignatureMessageHttp), + + /// A DMQ signature registration message received from the Gossip sub + #[cfg(feature = "future_dmq")] + RegisterSignatureDmq(DmqMessage), +} + +impl TryToBytes for BroadcastMessage { + fn to_bytes_vec(&self) -> StdResult> { + let bytes = + bincode::serde::encode_to_vec(self, bincode::config::standard()).map_err(|e| e.into()); + + bytes + } +} + +impl TryFromBytes for BroadcastMessage { + fn try_from_bytes(bytes: &[u8]) -> StdResult { + let (res, _) = + bincode::serde::decode_from_slice::(bytes, bincode::config::standard()) + .map_err(|e| anyhow!(e))?; + + Ok(res) + } } /// A peer in the P2P network @@ -95,12 +121,16 @@ impl Peer { fn build_topics() -> HashMap { HashMap::from([ ( - mithril_p2p_topic::SIGNATURES.into(), - gossipsub::IdentTopic::new(mithril_p2p_topic::SIGNATURES), + mithril_p2p_topic::SIGNATURES_HTTP.into(), + gossipsub::IdentTopic::new(mithril_p2p_topic::SIGNATURES_HTTP), ), ( - mithril_p2p_topic::SIGNERS.into(), - gossipsub::IdentTopic::new(mithril_p2p_topic::SIGNERS), + mithril_p2p_topic::SIGNATURES_DMQ.into(), + gossipsub::IdentTopic::new(mithril_p2p_topic::SIGNATURES_DMQ), + ), + ( + mithril_p2p_topic::SIGNERS_HTTP.into(), + gossipsub::IdentTopic::new(mithril_p2p_topic::SIGNERS_HTTP), ), ]) } @@ -177,7 +207,7 @@ impl Peer { match event { PeerEvent::Behaviour { event: PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }), - } => Ok(Some(serde_json::from_slice(&message.data)?)), + } => Ok(Some(BroadcastMessage::try_from_bytes(&message.data)?)), _ => Ok(None), } } @@ -217,14 +247,37 @@ impl Peer { } } - /// Publish a signature on the P2P pubsub - pub fn publish_signature( + /// Publish a HTTP signature on the P2P pubsub + pub fn publish_signature_http( &mut self, message: &RegisterSignatureMessageHttp, ) -> StdResult { self.publish_broadcast_message( - &BroadcastMessage::RegisterSignature(message.to_owned()), - mithril_p2p_topic::SIGNATURES, + &BroadcastMessage::RegisterSignatureHttp(message.to_owned()), + mithril_p2p_topic::SIGNATURES_HTTP, + ) + } + + /// Publish a DMQ signature on the P2P pubsub + #[cfg(feature = "future_dmq")] + pub fn publish_signature_dmq( + &mut self, + message: &DmqMessage, + ) -> StdResult { + self.publish_broadcast_message( + &BroadcastMessage::RegisterSignatureDmq(message.to_owned()), + mithril_p2p_topic::SIGNATURES_DMQ, + ) + } + + /// Publish a signer registration on the P2P pubsub + pub fn publish_signer_registration( + &mut self, + message: &RegisterSignerMessage, + ) -> StdResult { + self.publish_broadcast_message( + &BroadcastMessage::RegisterSignerHttp(message.to_owned()), + mithril_p2p_topic::SIGNERS_HTTP, ) } @@ -242,9 +295,10 @@ impl Peer { format!("Can not publish broadcast message on invalid topic: {topic_name}") })? .to_owned(); - let data = serde_json::to_vec(message).with_context(|| { + let data = message.to_bytes_vec().with_context(|| { format!("Can not publish broadcast message with invalid format on topic {topic_name}") })?; + println!("Publishing message on topic {topic_name}: {:?}", data.len()); let message_id = self .swarm @@ -264,17 +318,6 @@ impl Peer { Ok(message_id.to_owned()) } - /// Publish a signer registration on the P2P pubsub - pub fn publish_signer_registration( - &mut self, - message: &RegisterSignerMessage, - ) -> StdResult { - self.publish_broadcast_message( - &BroadcastMessage::RegisterSigner(message.to_owned()), - mithril_p2p_topic::SIGNERS, - ) - } - /// Connect to a remote peer pub fn dial(&mut self, addr: Multiaddr) -> StdResult<()> { debug!(self.logger, "Dialing to"; "address" => ?addr, "local_peer_id" => ?self.local_peer_id()); diff --git a/mithril-relay/src/relay/aggregator.rs b/mithril-relay/src/relay/aggregator.rs index 0d25c4c829f..c3310005a57 100644 --- a/mithril-relay/src/relay/aggregator.rs +++ b/mithril-relay/src/relay/aggregator.rs @@ -1,18 +1,37 @@ -use crate::p2p::{BroadcastMessage, Peer, PeerEvent}; +#[cfg(feature = "future_dmq")] +use std::{path::Path, sync::Arc}; + use anyhow::anyhow; use libp2p::Multiaddr; +#[cfg(feature = "future_dmq")] +use mithril_dmq::{DmqConsumerServer, DmqConsumerServerPallas, DmqMessage}; +use reqwest::StatusCode; +use slog::{Logger, error, info}; +#[cfg(feature = "future_dmq")] +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + watch::{self, Receiver, Sender}, +}; + +#[cfg(feature = "future_dmq")] +use mithril_common::CardanoNetwork; use mithril_common::{ StdResult, logging::LoggerExtensions, messages::{RegisterSignatureMessageHttp, RegisterSignerMessage}, }; -use reqwest::StatusCode; -use slog::{Logger, error, info}; + +use crate::p2p::{BroadcastMessage, Peer, PeerEvent}; /// A relay for a Mithril aggregator pub struct AggregatorRelay { aggregator_endpoint: String, peer: Peer, + #[cfg(feature = "future_dmq")] + signature_dmq_tx: UnboundedSender, + #[cfg(feature = "future_dmq")] + #[allow(unused)] + stop_tx: Sender<()>, logger: Logger, } @@ -20,16 +39,68 @@ impl AggregatorRelay { /// Start a relay for a Mithril aggregator pub async fn start( addr: &Multiaddr, + #[cfg(feature = "future_dmq")] dmq_node_socket_path: &Path, + #[cfg(feature = "future_dmq")] cardano_network: &CardanoNetwork, aggregator_endpoint: &str, logger: &Logger, ) -> StdResult { + let peer = Peer::new(addr).with_logger(logger).start().await?; + let logger = logger.new_with_component_name::(); + #[cfg(feature = "future_dmq")] + { + let (stop_tx, stop_rx) = watch::channel(()); + let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); + #[cfg(unix)] + let _dmq_consumer_server = Self::start_dmq_consumer_server( + dmq_node_socket_path, + cardano_network, + signature_dmq_rx, + stop_rx, + logger.clone(), + ) + .await?; + + Ok(Self { + aggregator_endpoint: aggregator_endpoint.to_owned(), + peer, + signature_dmq_tx, + stop_tx, + logger, + }) + } + #[cfg(not(feature = "future_dmq"))] Ok(Self { aggregator_endpoint: aggregator_endpoint.to_owned(), - peer: Peer::new(addr).with_logger(logger).start().await?, - logger: logger.new_with_component_name::(), + peer, + logger, }) } + #[cfg(feature = "future_dmq")] + async fn start_dmq_consumer_server( + socket: &Path, + cardano_network: &CardanoNetwork, + signature_dmq_rx: UnboundedReceiver, + stop_rx: Receiver<()>, + logger: Logger, + ) -> StdResult> { + let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new( + socket.to_path_buf(), + cardano_network.to_owned(), + stop_rx, + logger.clone(), + )); + dmq_consumer_server.register_receiver(signature_dmq_rx).await?; + let dmq_consumer_server_clone = dmq_consumer_server.clone(); + tokio::spawn(async move { + if let Err(err) = dmq_consumer_server_clone.run().await { + error!(logger.to_owned(), "DMQ Consumer server failed"; "error" => ?err); + } + }); + + Ok(dmq_consumer_server) + } + async fn notify_signature_to_aggregator( &self, signature_message: &RegisterSignatureMessageHttp, @@ -100,7 +171,7 @@ impl AggregatorRelay { pub async fn tick(&mut self) -> StdResult<()> { if let Some(peer_event) = self.peer.tick_swarm().await? { match self.peer.convert_peer_event_to_message(peer_event) { - Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) => { + Ok(Some(BroadcastMessage::RegisterSignerHttp(signer_message_received))) => { let retry_max = 3; let mut retry_count = 0; while let Err(e) = @@ -113,7 +184,7 @@ impl AggregatorRelay { } } } - Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) => { + Ok(Some(BroadcastMessage::RegisterSignatureHttp(signature_message_received))) => { let retry_max = 3; let mut retry_count = 0; while let Err(e) = @@ -126,6 +197,12 @@ impl AggregatorRelay { } } } + #[cfg(feature = "future_dmq")] + Ok(Some(BroadcastMessage::RegisterSignatureDmq(signature_message_received))) => { + self.signature_dmq_tx.send(signature_message_received).map_err(|e| { + anyhow!("Failed to send signature message to DMQ consumer server: {e}") + })?; + } Ok(None) => {} Err(e) => return Err(e), } @@ -180,9 +257,17 @@ mod tests { then.status(201).body("ok"); }); let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap(); - let relay = AggregatorRelay::start(&addr, &server.url(""), &TestLogger::stdout()) - .await - .unwrap(); + let relay = AggregatorRelay::start( + &addr, + #[cfg(feature = "future_dmq")] + Path::new("test"), + #[cfg(feature = "future_dmq")] + &CardanoNetwork::TestNet(123), + &server.url(""), + &TestLogger::stdout(), + ) + .await + .unwrap(); relay .notify_signature_to_aggregator(&RegisterSignatureMessageHttp::dummy()) diff --git a/mithril-relay/src/relay/mod.rs b/mithril-relay/src/relay/mod.rs index 3dffee0edb5..fffa415285d 100644 --- a/mithril-relay/src/relay/mod.rs +++ b/mithril-relay/src/relay/mod.rs @@ -4,5 +4,4 @@ mod signer; pub use aggregator::AggregatorRelay; pub use passive::PassiveRelay; -pub use signer::SignerRelay; -pub use signer::SignerRelayMode; +pub use signer::{SignerRelay, SignerRelayConfiguration, SignerRelayMode}; diff --git a/mithril-relay/src/relay/passive.rs b/mithril-relay/src/relay/passive.rs index d931d38dd54..e58c5d1acfc 100644 --- a/mithril-relay/src/relay/passive.rs +++ b/mithril-relay/src/relay/passive.rs @@ -32,11 +32,15 @@ impl PassiveRelay { pub async fn tick(&mut self) -> StdResult<()> { if let Some(peer_event) = self.peer.tick_swarm().await? { match self.peer.convert_peer_event_to_message(peer_event) { - Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) => { - info!(self.logger, "Received signer registration message from P2P network"; "signer_message" => #?signer_message_received); + Ok(Some(BroadcastMessage::RegisterSignerHttp(signer_message_received))) => { + info!(self.logger, "Received HTTP signer registration message from P2P network"; "signer_message" => #?signer_message_received); } - Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) => { - info!(self.logger, "Received signature message from P2P network"; "signature_message" => #?signature_message_received); + Ok(Some(BroadcastMessage::RegisterSignatureHttp(signature_message_received))) => { + info!(self.logger, "Received HTTP signature message from P2P network"; "signature_message" => #?signature_message_received); + } + #[cfg(feature = "future_dmq")] + Ok(Some(BroadcastMessage::RegisterSignatureDmq(signature_message_received))) => { + info!(self.logger, "Received DMQ signature message from P2P network"; "signature_message" => #?signature_message_received); } Ok(None) => {} Err(e) => return Err(e), diff --git a/mithril-relay/src/relay/signer.rs b/mithril-relay/src/relay/signer.rs index 8b429ee6cf3..340e070cb69 100644 --- a/mithril-relay/src/relay/signer.rs +++ b/mithril-relay/src/relay/signer.rs @@ -1,16 +1,27 @@ +#[cfg(feature = "future_dmq")] +use std::path::Path; +use std::{net::SocketAddr, sync::Arc, time::Duration}; + use clap::ValueEnum; use libp2p::Multiaddr; +#[cfg(feature = "future_dmq")] +use slog::error; use slog::{Logger, debug, info}; -use std::{net::SocketAddr, sync::Arc, time::Duration}; use strum::Display; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; +#[cfg(feature = "future_dmq")] +use tokio::sync::watch::{self, Receiver, Sender}; use warp::Filter; +#[cfg(feature = "future_dmq")] +use mithril_common::CardanoNetwork; use mithril_common::{ StdResult, logging::LoggerExtensions, messages::{RegisterSignatureMessageHttp, RegisterSignerMessage}, }; +#[cfg(feature = "future_dmq")] +use mithril_dmq::{DmqMessage, DmqPublisherServer, DmqPublisherServerPallas}; use mithril_test_http_server::{TestHttpServer, test_http_server_with_socket_address}; use crate::{ @@ -45,60 +56,132 @@ struct HTTPServerConfiguration<'a> { logger: &'a Logger, } +/// Configuration for a Mithril Signer Relay +pub struct SignerRelayConfiguration<'a> { + /// Address on which the HTTP server will listen + pub address: &'a Multiaddr, + /// Port on which the HTTP server will listen + pub server_port: &'a u16, + /// Path to the DMQ node socket file + #[cfg(feature = "future_dmq")] + pub dmq_node_socket_path: &'a Path, + /// Cardano network + #[cfg(feature = "future_dmq")] + pub cardano_network: &'a CardanoNetwork, + /// Signer registration mode + pub signer_registration_mode: &'a SignerRelayMode, + /// Signature registration mode + pub signature_registration_mode: &'a SignerRelayMode, + /// Aggregator endpoint URL + pub aggregator_endpoint: &'a str, + /// Delay for repeating a signer registration message + pub signer_repeater_delay: &'a Duration, + /// Logger for the signer relay + pub logger: &'a Logger, +} + /// A relay for a Mithril signer pub struct SignerRelay { - server: TestHttpServer, + http_server: TestHttpServer, peer: Peer, - signature_rx: UnboundedReceiver, - signer_rx: UnboundedReceiver, + signature_http_rx: UnboundedReceiver, + #[cfg(feature = "future_dmq")] + signature_dmq_rx: UnboundedReceiver, + signer_http_rx: UnboundedReceiver, signer_repeater: Arc>, + #[cfg(feature = "future_dmq")] + #[allow(unused)] + stop_tx: Sender<()>, logger: Logger, } impl SignerRelay { /// Start a relay for a Mithril signer - pub async fn start( - address: &Multiaddr, - server_port: &u16, - signer_registration_mode: &SignerRelayMode, - signature_registration_mode: &SignerRelayMode, - aggregator_endpoint: &str, - signer_repeater_delay: &Duration, - logger: &Logger, - ) -> StdResult { - let relay_logger = logger.new_with_component_name::(); - debug!(relay_logger, "Starting..."; "signer_registration_mode" => ?signer_registration_mode, "signature_registration_mode" => ?signature_registration_mode); + pub async fn start(config: SignerRelayConfiguration<'_>) -> StdResult { + let relay_logger = config.logger.new_with_component_name::(); + debug!(relay_logger, "Starting..."; "signer_registration_mode" => ?config.signer_registration_mode, "signature_registration_mode" => ?config.signature_registration_mode); let (signature_tx, signature_rx) = unbounded_channel::(); let (signer_tx, signer_rx) = unbounded_channel::(); let signer_repeater = Arc::new(MessageRepeater::new( signer_tx.clone(), - signer_repeater_delay.to_owned(), - logger, + config.signer_repeater_delay.to_owned(), + config.logger, )); - let peer = Peer::new(address).start().await?; - let server = Self::start_http_server(&HTTPServerConfiguration { - server_port, - signer_registration_mode: signer_registration_mode.to_owned(), - signature_registration_mode: signature_registration_mode.to_owned(), - aggregator_endpoint, + let peer = Peer::new(config.address).start().await?; + let http_server = Self::start_http_server(&HTTPServerConfiguration { + server_port: config.server_port, + signer_registration_mode: config.signer_registration_mode.to_owned(), + signature_registration_mode: config.signature_registration_mode.to_owned(), + aggregator_endpoint: config.aggregator_endpoint, signer_tx: signer_tx.clone(), signature_tx: signature_tx.clone(), signer_repeater: signer_repeater.clone(), logger: &relay_logger, }) .await; - info!(relay_logger, "Listening on"; "address" => ?server.address()); - + info!(relay_logger, "Listening on"; "address" => ?http_server.address()); + + #[cfg(feature = "future_dmq")] + { + let (stop_tx, stop_rx) = watch::channel(()); + let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::(); + #[cfg(unix)] + let _dmq_publisher_server = Self::start_dmq_publisher_server( + config.dmq_node_socket_path, + config.cardano_network, + signature_dmq_tx, + stop_rx, + relay_logger.clone(), + ) + .await?; + + Ok(Self { + http_server, + peer, + signature_http_rx: signature_rx, + signature_dmq_rx, + signer_http_rx: signer_rx, + signer_repeater, + stop_tx, + logger: relay_logger, + }) + } + #[cfg(not(feature = "future_dmq"))] Ok(Self { - server, + http_server, peer, - signature_rx, - signer_rx, + signature_http_rx: signature_rx, + signer_http_rx: signer_rx, signer_repeater, logger: relay_logger, }) } + #[cfg(feature = "future_dmq")] + async fn start_dmq_publisher_server( + socket: &Path, + cardano_network: &CardanoNetwork, + signature_dmq_tx: UnboundedSender, + stop_rx: Receiver<()>, + logger: Logger, + ) -> StdResult> { + let dmq_publisher_server = Arc::new(DmqPublisherServerPallas::new( + socket.to_path_buf(), + cardano_network.to_owned(), + stop_rx, + logger.clone(), + )); + dmq_publisher_server.register_transmitter(signature_dmq_tx).await?; + let dmq_publisher_server_clone = dmq_publisher_server.clone(); + tokio::spawn(async move { + if let Err(err) = dmq_publisher_server_clone.run().await { + error!(logger.to_owned(), "DMQ Publisher server failed"; "error" => ?err); + } + }); + + Ok(dmq_publisher_server) + } + async fn start_http_server(configuration: &HTTPServerConfiguration<'_>) -> TestHttpServer { let server_logger = configuration.logger.new_with_name("http_server"); test_http_server_with_socket_address( @@ -151,34 +234,86 @@ impl SignerRelay { ) } + fn process_register_signature_http_message( + &mut self, + message: Option, + ) -> StdResult<()> { + match message { + Some(signature_message) => { + info!(self.logger, "Publish HTTP signature to p2p network"; "message" => #?signature_message); + self.peer.publish_signature_http(&signature_message)?; + + Ok(()) + } + None => { + debug!(self.logger, "No HTTP signature message available"); + + Ok(()) + } + } + } + + fn process_register_signer_http_message( + &mut self, + message: Option, + ) -> StdResult<()> { + match message { + Some(signer_message) => { + info!(self.logger, "Publish HTTP signer-registration to p2p network"; "message" => #?signer_message); + self.peer.publish_signer_registration(&signer_message)?; + + Ok(()) + } + None => { + debug!(self.logger, "No HTTP signer message available"); + + Ok(()) + } + } + } + + #[cfg(feature = "future_dmq")] + fn process_register_signature_dmq_message( + &mut self, + message: Option, + ) -> StdResult<()> { + match message { + Some(signature_message) => { + info!(self.logger, "Publish DMQ signature to p2p network"; "message" => #?signature_message); + self.peer.publish_signature_dmq(&signature_message)?; + + Ok(()) + } + None => { + //debug!(self.logger, "No DMQ signature message available"); + Ok(()) + } + } + } + /// Tick the signer relay pub async fn tick(&mut self) -> StdResult<()> { + #[cfg(feature = "future_dmq")] tokio::select! { - message = self.signature_rx.recv() => { - match message { - Some(signature_message) => { - info!(self.logger, "Publish signature to p2p network"; "message" => #?signature_message); - self.peer.publish_signature(&signature_message)?; - Ok(()) - } - None => { - debug!(self.logger, "No signature message available"); - Ok(()) - } - } + message = self.signature_http_rx.recv() => { + self.process_register_signature_http_message(message) }, - message = self.signer_rx.recv() => { - match message { - Some(signer_message) => { - info!(self.logger, "Publish signer-registration to p2p network"; "message" => #?signer_message); - self.peer.publish_signer_registration(&signer_message)?; - Ok(()) - } - None => { - debug!(self.logger, "No signer message available"); - Ok(()) - } - } + message = self.signer_http_rx.recv() => { + self.process_register_signer_http_message(message) + }, + message = self.signature_dmq_rx.recv() => { + self.process_register_signature_dmq_message(message) + }, + _ = self.signer_repeater.repeat_message() => {Ok(())}, + _event = self.peer.tick_swarm() => {Ok(())} + } + #[cfg(not(feature = "future_dmq"))] + tokio::select! { + message = self.signature_http_rx.recv() => { + self.process_register_signature_http_message(message) + }, + message = self.signer_http_rx.recv() => { + self.process_register_signer_http_message(message) }, _ = self.signer_repeater.repeat_message() => {Ok(())}, _event = self.peer.tick_swarm() => {Ok(())} @@ -188,7 +323,7 @@ impl SignerRelay { /// Receive signature from the underlying channel #[allow(dead_code)] pub async fn receive_signature(&mut self) -> Option { - self.signature_rx.recv().await + self.signature_http_rx.recv().await } /// Tick the peer of the signer relay @@ -203,7 +338,7 @@ impl SignerRelay { /// Retrieve address on which the HTTP Server is listening pub fn address(&self) -> SocketAddr { - self.server.address() + self.http_server.address() } /// Retrieve address on which the peer is listening diff --git a/mithril-relay/tests/register_signer_signature.rs b/mithril-relay/tests/register_signer_signature.rs index 7c58670c008..3d9c52bdb69 100644 --- a/mithril-relay/tests/register_signer_signature.rs +++ b/mithril-relay/tests/register_signer_signature.rs @@ -1,15 +1,20 @@ +#[cfg(feature = "future_dmq")] +use std::path::PathBuf; use std::{sync::Arc, time::Duration}; use libp2p::{Multiaddr, gossipsub}; +use reqwest::StatusCode; +use slog::{Drain, Level, Logger}; +use slog_scope::{error, info}; + +#[cfg(feature = "future_dmq")] +use mithril_common::CardanoNetwork; use mithril_common::messages::{RegisterSignatureMessageHttp, RegisterSignerMessage}; use mithril_common::test_utils::double::Dummy; use mithril_relay::{ - PassiveRelay, SignerRelay, SignerRelayMode, + PassiveRelay, SignerRelay, SignerRelayConfiguration, SignerRelayMode, p2p::{BroadcastMessage, PeerBehaviourEvent, PeerEvent}, }; -use reqwest::StatusCode; -use slog::{Drain, Level, Logger}; -use slog_scope::{error, info}; // Launch a relay that connects to P2P network. The relay is a peer in the P2P // network. The relay sends some signer registrations that must be received by other @@ -36,19 +41,27 @@ async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() { let total_peers = 1 + total_p2p_client; let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap(); let server_port = 0; + #[cfg(feature = "future_dmq")] + let dmq_node_socket_path = PathBuf::new(); + #[cfg(feature = "future_dmq")] + let cardano_network = CardanoNetwork::TestNet(123); let signer_registration_mode = SignerRelayMode::P2P; let signature_registration_mode = SignerRelayMode::P2P; let aggregator_endpoint = "http://0.0.0.0:1234".to_string(); let signer_repeater_delay = Duration::from_secs(100); - let mut signer_relay = SignerRelay::start( - &addr, - &server_port, - &signer_registration_mode, - &signature_registration_mode, - &aggregator_endpoint, - &signer_repeater_delay, - &logger, - ) + let mut signer_relay = SignerRelay::start(SignerRelayConfiguration { + address: &addr, + server_port: &server_port, + #[cfg(feature = "future_dmq")] + dmq_node_socket_path: &dmq_node_socket_path, + #[cfg(feature = "future_dmq")] + cardano_network: &cardano_network, + signer_registration_mode: &signer_registration_mode, + signature_registration_mode: &signature_registration_mode, + aggregator_endpoint: &aggregator_endpoint, + signer_repeater_delay: &signer_repeater_delay, + logger: &logger, + }) .await .expect("Relay start failed"); let relay_address = signer_relay.address(); @@ -143,7 +156,7 @@ async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() { loop { tokio::select! { event = p2p_client1.tick_peer() => { - if let Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) = p2p_client1.peer_mut().convert_peer_event_to_message(event.unwrap().unwrap()) + if let Ok(Some(BroadcastMessage::RegisterSignerHttp(signer_message_received))) = p2p_client1.peer_mut().convert_peer_event_to_message(event.unwrap().unwrap()) { info!("Test: client1 consumed signer registration"; "message" => #?signer_message_received); assert_eq!(signer_message_sent, signer_message_received); @@ -151,7 +164,7 @@ async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() { } } event = p2p_client2.tick_peer() => { - if let Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) = p2p_client2.peer_mut().convert_peer_event_to_message(event.unwrap().unwrap()) + if let Ok(Some(BroadcastMessage::RegisterSignerHttp(signer_message_received))) = p2p_client2.peer_mut().convert_peer_event_to_message(event.unwrap().unwrap()) { info!("Test: client2 consumed signer registration"; "message" => #?signer_message_received); assert_eq!(signer_message_sent, signer_message_received); @@ -190,7 +203,7 @@ async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() { loop { tokio::select! { event = p2p_client1.tick_peer() => { - if let Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) = p2p_client1.peer_mut().convert_peer_event_to_message(event.unwrap().unwrap()) + if let Ok(Some(BroadcastMessage::RegisterSignatureHttp(signature_message_received))) = p2p_client1.peer_mut().convert_peer_event_to_message(event.unwrap().unwrap()) { info!("Test: client1 consumed signature"; "message" => #?signature_message_received); assert_eq!(signature_message_sent, signature_message_received); @@ -198,7 +211,7 @@ async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() { } } event = p2p_client2.tick_peer() => { - if let Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) = p2p_client2.peer_mut().convert_peer_event_to_message(event.unwrap().unwrap()) + if let Ok(Some(BroadcastMessage::RegisterSignatureHttp(signature_message_received))) = p2p_client2.peer_mut().convert_peer_event_to_message(event.unwrap().unwrap()) { info!("Test: client2 consumed signature"; "message" => #?signature_message_received); assert_eq!(signature_message_sent, signature_message_received); diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index 976b825d980..86f7da1d880 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -10,7 +10,7 @@ license = { workspace = true } repository = { workspace = true } [features] -default = ["jemallocator"] +default = ["jemallocator", "future_dmq"] bundle_tls = ["reqwest/native-tls-vendored"] jemallocator = ["dep:tikv-jemallocator"] diff --git a/mithril-signer/src/dependency_injection/builder.rs b/mithril-signer/src/dependency_injection/builder.rs index 78ccc696f7b..f735609a0b2 100644 --- a/mithril-signer/src/dependency_injection/builder.rs +++ b/mithril-signer/src/dependency_injection/builder.rs @@ -42,7 +42,7 @@ use mithril_persistence::database::{ApplicationNodeType, SqlMigration}; use mithril_persistence::sqlite::{ConnectionBuilder, SqliteConnection, SqliteConnectionPool}; #[cfg(feature = "future_dmq")] -use mithril_dmq::{DmqMessageBuilder, DmqPublisherPallas}; +use mithril_dmq::{DmqMessageBuilder, DmqPublisherClientPallas}; use crate::dependency_injection::SignerDependencyContainer; #[cfg(feature = "future_dmq")] @@ -430,14 +430,14 @@ impl<'a> DependenciesBuilder<'a> { ))?, chain_observer.clone(), ); - Arc::new(SignaturePublisherDmq::new(Arc::new(DmqPublisherPallas::< - RegisterSignatureMessageDmq, - >::new( - dmq_node_socket_path.to_owned(), - *cardano_network, - dmq_message_builder, - self.root_logger(), - )))) as Arc + Arc::new(SignaturePublisherDmq::new(Arc::new( + DmqPublisherClientPallas::::new( + dmq_node_socket_path.to_owned(), + *cardano_network, + dmq_message_builder, + self.root_logger(), + ), + ))) as Arc } _ => Arc::new(SignaturePublisherNoop) as Arc, }; diff --git a/mithril-signer/src/services/signature_publisher/dmq.rs b/mithril-signer/src/services/signature_publisher/dmq.rs index 7d251efb78f..8ffffed3f95 100644 --- a/mithril-signer/src/services/signature_publisher/dmq.rs +++ b/mithril-signer/src/services/signature_publisher/dmq.rs @@ -8,18 +8,18 @@ use mithril_common::{ entities::{ProtocolMessage, SignedEntityType, SingleSignature}, messages::RegisterSignatureMessageDmq, }; -use mithril_dmq::DmqPublisher; +use mithril_dmq::DmqPublisherClient; use super::SignaturePublisher; /// DMQ implementation of the [SignaturePublisher] trait. pub struct SignaturePublisherDmq { - dmq_publisher: Arc>, + dmq_publisher: Arc>, } impl SignaturePublisherDmq { /// Creates a new instance of [SignaturePublisherDmq]. - pub fn new(dmq_publisher: Arc>) -> Self { + pub fn new(dmq_publisher: Arc>) -> Self { Self { dmq_publisher } } } diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs index 4b078d6b4b1..523c96c5c6b 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs @@ -75,6 +75,7 @@ impl Aggregator { let public_server_url = format!("http://localhost:{server_port_parameter}/aggregator"); let mut env = HashMap::from([ ("NETWORK", "devnet"), + ("NETWORK_MAGIC", &magic_id), ("RUN_INTERVAL", &mithril_run_interval), ("SERVER_IP", "0.0.0.0"), ("SERVER_PORT", &server_port_parameter), @@ -85,7 +86,6 @@ impl Aggregator { "SNAPSHOT_DIRECTORY", aggregator_config.artifacts_dir.to_str().unwrap(), ), - ("NETWORK_MAGIC", &magic_id), ( "DATA_STORES_DIRECTORY", aggregator_config.store_dir.to_str().unwrap(), @@ -129,6 +129,16 @@ impl Aggregator { if let Some(leader_aggregator_endpoint) = aggregator_config.leader_aggregator_endpoint { env.insert("LEADER_AGGREGATOR_ENDPOINT", leader_aggregator_endpoint); } + // TODO: make this configurable + let dmq_node_socket_path = aggregator_config + .work_dir + .join(format!("dmq-aggregator-{}.socket", aggregator_config.index)); + if false { + env.insert( + "DMQ_NODE_SOCKET_PATH", + dmq_node_socket_path.to_str().unwrap(), + ); + } let args = vec![ "--db-directory", aggregator_config.full_node.db_path.to_str().unwrap(), diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs index 2649d305e66..afd57670b5e 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs @@ -294,7 +294,7 @@ impl MithrilInfrastructure { let mut bootstrap_peer_addr = None; for (index, aggregator_endpoint) in aggregator_endpoints.iter().enumerate() { let mut relay_aggregator = RelayAggregator::new( - Aggregator::name_suffix(index), + index, config.server_port + index as u64 + 100, bootstrap_peer_addr.clone(), aggregator_endpoint, @@ -310,6 +310,7 @@ impl MithrilInfrastructure { for (index, party_id) in signers_party_ids.iter().enumerate() { let mut relay_signer = RelaySigner::new(&RelaySignerConfiguration { + signer_number: index + 1, listen_port: config.server_port + index as u64 + 200, server_port: config.server_port + index as u64 + 300, dial_to: bootstrap_peer_addr.clone(), diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_aggregator.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_aggregator.rs index adb22f03491..e94093dc6a9 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_aggregator.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_aggregator.rs @@ -1,4 +1,5 @@ use crate::utils::MithrilCommand; +use crate::{Aggregator, DEVNET_MAGIC_ID}; use mithril_common::StdResult; use std::collections::HashMap; use std::path::Path; @@ -14,16 +15,25 @@ pub struct RelayAggregator { impl RelayAggregator { pub fn new( - name: String, + index: usize, listen_port: u64, dial_to: Option, aggregator_endpoint: &str, work_dir: &Path, bin_dir: &Path, ) -> StdResult { + let name = Aggregator::name_suffix(index); let listen_port_str = format!("{listen_port}"); + let dmq_node_socket_path = work_dir.join(format!("dmq-aggregator-{index}.socket")); + let magic_id = DEVNET_MAGIC_ID.to_string(); let mut env = HashMap::from([ ("LISTEN_PORT", listen_port_str.as_str()), + ( + "DMQ_NODE_SOCKET_PATH", + dmq_node_socket_path.to_str().unwrap(), + ), + ("NETWORK", "devnet"), + ("NETWORK_MAGIC", &magic_id), ("AGGREGATOR_ENDPOINT", aggregator_endpoint), ]); if let Some(dial_to) = &dial_to { diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs index 55c87781924..8bddd6eaaca 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs @@ -1,3 +1,4 @@ +use crate::DEVNET_MAGIC_ID; use crate::utils::MithrilCommand; use mithril_common::StdResult; use mithril_common::entities::PartyId; @@ -6,6 +7,7 @@ use std::path::Path; use tokio::process::Child; pub struct RelaySignerConfiguration<'a> { + pub signer_number: usize, pub listen_port: u64, pub server_port: u64, pub dial_to: Option, @@ -28,8 +30,13 @@ pub struct RelaySigner { impl RelaySigner { pub fn new(configuration: &RelaySignerConfiguration) -> StdResult { + let party_id = configuration.party_id.to_owned(); let listen_port_str = format!("{}", configuration.listen_port); let server_port_str = format!("{}", configuration.server_port); + let dmq_node_socket_path = configuration + .work_dir + .join(format!("dmq-signer-{}.socket", configuration.signer_number)); + let magic_id = DEVNET_MAGIC_ID.to_string(); let relay_signer_registration_mode = configuration.relay_signer_registration_mode.to_string(); let relay_signature_registration_mode = @@ -37,6 +44,12 @@ impl RelaySigner { let mut env = HashMap::from([ ("LISTEN_PORT", listen_port_str.as_str()), ("SERVER_PORT", server_port_str.as_str()), + ( + "DMQ_NODE_SOCKET_PATH", + dmq_node_socket_path.to_str().unwrap(), + ), + ("NETWORK", "devnet"), + ("NETWORK_MAGIC", &magic_id), ("AGGREGATOR_ENDPOINT", configuration.aggregator_endpoint), ("SIGNER_REPEATER_DELAY", "100"), ( @@ -65,7 +78,7 @@ impl RelaySigner { Ok(Self { listen_port: configuration.listen_port, server_port: configuration.server_port, - party_id: configuration.party_id.to_owned(), + party_id, command, process: None, }) diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs index 496a5b4ede0..7e3f22eb5b6 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs @@ -54,6 +54,7 @@ impl Signer { let mithril_run_interval = format!("{}", signer_config.mithril_run_interval); let mut env = HashMap::from([ ("NETWORK", "devnet"), + ("NETWORK_MAGIC", &magic_id), ("RUN_INTERVAL", &mithril_run_interval), ("AGGREGATOR_ENDPOINT", &signer_config.aggregator_endpoint), ( @@ -65,7 +66,6 @@ impl Signer { signer_config.store_dir.to_str().unwrap(), ), ("STORE_RETENTION_LIMIT", "10"), - ("NETWORK_MAGIC", &magic_id), ( "CARDANO_NODE_SOCKET_PATH", signer_config.pool_node.socket_path.to_str().unwrap(), @@ -96,6 +96,16 @@ impl Signer { } else { env.insert("PARTY_ID", &party_id); } + // TODO: make this configurable + let dmq_node_socket_path = signer_config + .work_dir + .join(format!("dmq-signer-{}.socket", signer_config.signer_number)); + if true { + env.insert( + "DMQ_NODE_SOCKET_PATH", + dmq_node_socket_path.to_str().unwrap(), + ); + } let args = vec!["-vvv"]; let mut command = MithrilCommand::new(