Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions internal/mithril-dmq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ bincode = { version = "2.0.1" }
blake2 = "0.10.6"
mithril-cardano-node-chain = { path = "../cardano-node/mithril-cardano-node-chain" }
mithril-common = { path = "../../mithril-common" }
pallas-codec = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
pallas-network = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
#pallas-codec = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
#pallas-network = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
pallas-codec = { path = "../../../pallas-fork-scaling/pallas-codec" }
pallas-network = { path = "../../../pallas-fork-scaling/pallas-network" }
serde = { workspace = true }
serde_bytes = "0.11.17"
slog = { workspace = true }
slog-scope = "4.4.0"
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync","rt-multi-thread"] }

[dev-dependencies]
Expand Down
72 changes: 43 additions & 29 deletions internal/mithril-dmq/src/consumer/client/pallas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,30 @@ use slog::{Logger, debug, error};
use tokio::sync::{Mutex, MutexGuard};

use mithril_common::{
CardanoNetwork, StdResult,
StdResult,
crypto_helper::{
OpCert, OpCertWithoutColdVerificationKey, TryFromBytes, ed25519::Ed25519VerificationKey,
},
entities::PartyId,
logging::LoggerExtensions,
};

use crate::DmqConsumerClient;
use crate::{DmqConsumerClient, model::DmqNetwork};

/// A DMQ client consumer implementation.
///
/// This implementation is built upon the n2c mini-protocols DMQ implementation in Pallas.
pub struct DmqConsumerClientPallas<M: TryFromBytes + Debug> {
socket: PathBuf,
network: CardanoNetwork,
network: DmqNetwork,
client: Mutex<Option<DmqClient>>,
logger: Logger,
phantom: PhantomData<M>,
}

impl<M: TryFromBytes + Debug> DmqConsumerClientPallas<M> {
/// Creates a new `DmqConsumerClientPallas` instance.
pub fn new(socket: PathBuf, network: CardanoNetwork, logger: Logger) -> Self {
pub fn new(socket: PathBuf, network: DmqNetwork, logger: Logger) -> Self {
Self {
socket,
network,
Expand Down Expand Up @@ -115,11 +115,13 @@ impl<M: TryFromBytes + Debug> DmqConsumerClientPallas<M> {
.0
.into_iter()
.map(|dmq_message| {
let opcert_without_verification_key =
OpCertWithoutColdVerificationKey::try_from_bytes(
&dmq_message.operational_certificate,
)
.with_context(|| "Failed to parse operational certificate")?;
let opcert_without_verification_key = OpCertWithoutColdVerificationKey::try_new(
&dmq_message.operational_certificate.kes_vk,
dmq_message.operational_certificate.issue_number,
dmq_message.operational_certificate.start_kes_period,
&dmq_message.operational_certificate.cert_sig,
)
.with_context(|| "Failed to parse operational certificate")?;
let cold_verification_key =
Ed25519VerificationKey::from_bytes(&dmq_message.cold_verification_key)
.with_context(|| "Failed to parse cold verification key")?
Expand Down Expand Up @@ -171,7 +173,7 @@ mod tests {
facades::DmqServer,
miniprotocols::{
localmsgnotification,
localmsgsubmission::{DmqMsg, DmqMsgPayload},
localmsgsubmission::{DmqMsg, DmqMsgOperationalCertificate, DmqMsgPayload},
},
};
use tokio::{net::UnixListener, task::JoinHandle, time::sleep};
Expand All @@ -194,14 +196,20 @@ mod tests {
expires_at: 100,
},
kes_signature: vec![0, 1, 2, 3],
operational_certificate: vec![
132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198,
142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41,
0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19,
80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87,
170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137,
188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5,
],
operational_certificate: DmqMsgOperationalCertificate {
kes_vk: vec![
50, 45, 160, 42, 80, 78, 184, 20, 210, 77, 140, 152, 63, 49, 165, 168, 5,
131, 101, 152, 110, 242, 144, 157, 176, 210, 5, 10, 166, 91, 196, 168,
],
issue_number: 0,
start_kes_period: 0,
cert_sig: vec![
207, 135, 144, 168, 238, 41, 179, 216, 245, 74, 164, 231, 4, 158, 234, 141,
5, 19, 166, 11, 78, 34, 210, 211, 183, 72, 127, 83, 185, 156, 107, 55, 160,
190, 73, 251, 204, 47, 197, 86, 174, 231, 13, 49, 7, 83, 173, 177, 27, 53,
209, 66, 24, 203, 226, 152, 3, 91, 66, 56, 244, 206, 79, 0,
],
},
cold_verification_key: vec![
32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105,
231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, 134, 137, 247, 161, 68,
Expand All @@ -215,14 +223,20 @@ mod tests {
expires_at: 101,
},
kes_signature: vec![1, 2, 3, 4],
operational_certificate: vec![
132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198,
142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41,
0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19,
80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87,
170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137,
188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5,
],
operational_certificate: DmqMsgOperationalCertificate {
kes_vk: vec![
50, 45, 160, 42, 80, 78, 184, 20, 210, 77, 140, 152, 63, 49, 165, 168, 5,
131, 101, 152, 110, 242, 144, 157, 176, 210, 5, 10, 166, 91, 196, 168,
],
issue_number: 0,
start_kes_period: 0,
cert_sig: vec![
207, 135, 144, 168, 238, 41, 179, 216, 245, 74, 164, 231, 4, 158, 234, 141,
5, 19, 166, 11, 78, 34, 210, 211, 183, 72, 127, 83, 185, 156, 107, 55, 160,
190, 73, 251, 204, 47, 197, 86, 174, 231, 13, 49, 7, 83, 173, 177, 27, 53,
209, 66, 24, 203, 226, 152, 3, 91, 66, 56, 244, 206, 79, 0,
],
},
cold_verification_key: vec![
77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, 240, 103, 245, 159,
147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, 245, 200,
Expand Down Expand Up @@ -278,7 +292,7 @@ mod tests {

let consumer = DmqConsumerClientPallas::new(
socket_path,
CardanoNetwork::TestNet(0),
DmqNetwork::TestNet(0),
TestLogger::stdout(),
);

Expand Down Expand Up @@ -314,7 +328,7 @@ mod tests {

let consumer = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
socket_path,
CardanoNetwork::TestNet(0),
DmqNetwork::TestNet(0),
TestLogger::stdout(),
);

Expand All @@ -341,7 +355,7 @@ mod tests {

let consumer = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
socket_path,
CardanoNetwork::TestNet(0),
DmqNetwork::TestNet(0),
TestLogger::stdout(),
);

Expand Down
20 changes: 10 additions & 10 deletions internal/mithril-dmq/src/consumer/server/pallas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ use tokio::{

use slog::{Logger, debug, error, info, warn};

use mithril_common::{CardanoNetwork, StdResult, logging::LoggerExtensions};
use mithril_common::{StdResult, logging::LoggerExtensions};

use crate::{DmqConsumerServer, DmqMessage};
use crate::{DmqConsumerServer, DmqMessage, DmqNetwork};

use super::queue::MessageQueue;

/// A DMQ server implementation for messages notification from a DMQ node.
pub struct DmqConsumerServerPallas {
socket: PathBuf,
network: CardanoNetwork,
network: DmqNetwork,
server: Mutex<Option<DmqServer>>,
messages_receiver: Mutex<Option<UnboundedReceiver<DmqMessage>>>,
messages_buffer: MessageQueue,
Expand All @@ -32,7 +32,7 @@ impl DmqConsumerServerPallas {
/// Creates a new instance of [DmqConsumerServerPallas].
pub fn new(
socket: PathBuf,
network: CardanoNetwork,
network: DmqNetwork,
stop_rx: Receiver<()>,
logger: Logger,
) -> Self {
Expand Down Expand Up @@ -303,10 +303,10 @@ mod tests {
let (stop_tx, stop_rx) = watch::channel(());
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
let socket_path = create_temp_dir(current_function_name).join("node.socket");
let cardano_network = CardanoNetwork::TestNet(0);
let dmq_network = DmqNetwork::TestNet(0);
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
socket_path.to_path_buf(),
cardano_network.to_owned(),
dmq_network.to_owned(),
stop_rx,
TestLogger::stdout(),
));
Expand Down Expand Up @@ -363,10 +363,10 @@ mod tests {
let (stop_tx, stop_rx) = watch::channel(());
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
let socket_path = create_temp_dir(current_function_name).join("node.socket");
let cardano_network = CardanoNetwork::TestNet(0);
let dmq_network = DmqNetwork::TestNet(0);
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
socket_path.to_path_buf(),
cardano_network.to_owned(),
dmq_network.to_owned(),
stop_rx,
TestLogger::stdout(),
));
Expand Down Expand Up @@ -422,10 +422,10 @@ mod tests {
let (_stop_tx, stop_rx) = watch::channel(());
let (_signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
let socket_path = create_temp_dir(current_function!()).join("node.socket");
let cardano_network = CardanoNetwork::TestNet(0);
let dmq_network = DmqNetwork::TestNet(0);
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
socket_path.to_path_buf(),
cardano_network.to_owned(),
dmq_network.to_owned(),
stop_rx,
TestLogger::stdout(),
));
Expand Down
11 changes: 9 additions & 2 deletions internal/mithril-dmq/src/consumer/server/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ mod tests {
use std::{ops::RangeInclusive, time::Duration};

use anyhow::anyhow;
use pallas_network::miniprotocols::localmsgsubmission::{DmqMsg, DmqMsgPayload};
use pallas_network::miniprotocols::localmsgsubmission::{
DmqMsg, DmqMsgOperationalCertificate, DmqMsgPayload,
};
use tokio::time::sleep;

use crate::model::MockUnixTimestampProvider;
Expand All @@ -138,7 +140,12 @@ mod tests {
expires_at: 100,
},
kes_signature: vec![0, 1, 2, 3],
operational_certificate: vec![0, 1, 2, 3, 4],
operational_certificate: DmqMsgOperationalCertificate {
kes_vk: vec![12, 13, 14],
issue_number: 15,
start_kes_period: 16,
cert_sig: vec![17],
},
cold_verification_key: vec![0, 1, 2, 3, 4, 5],
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/mithril-dmq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod test;
#[cfg(unix)]
pub use consumer::DmqConsumerServerPallas;
pub use consumer::{DmqConsumerClient, DmqConsumerClientPallas, DmqConsumerServer};
pub use model::{DmqMessage, DmqMessageBuilder};
pub use model::{DmqMessage, DmqMessageBuilder, DmqNetwork};
#[cfg(unix)]
pub use publisher::DmqPublisherServerPallas;
pub use publisher::{DmqPublisherClient, DmqPublisherClientPallas, DmqPublisherServer};
Expand Down
43 changes: 35 additions & 8 deletions internal/mithril-dmq/src/model/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::sync::Arc;

use anyhow::Context;
use blake2::{Blake2b, Digest, digest::consts::U64};
use pallas_network::miniprotocols::localmsgsubmission::{DmqMsg, DmqMsgPayload};
use pallas_network::miniprotocols::localmsgsubmission::{
DmqMsg, DmqMsgOperationalCertificate, DmqMsgPayload,
};

use mithril_cardano_node_chain::chain_observer::ChainObserver;
use mithril_common::{
StdResult,
crypto_helper::{KesSigner, SerDeShelleyFileFormat, TryToBytes},
crypto_helper::{KesSigner, TryToBytes},
};

use crate::model::{DmqMessage, SystemUnixTimestampProvider, UnixTimestampProvider};
Expand Down Expand Up @@ -99,8 +101,19 @@ impl DmqMessageBuilder {
let dmq_message = DmqMsg {
msg_payload: dmq_message_payload,
kes_signature: kes_signature.to_bytes_vec()?,
operational_certificate: operational_certificate_without_cold_verification_key
.to_cbor_bytes()?,
operational_certificate: DmqMsgOperationalCertificate {
kes_vk: operational_certificate_without_cold_verification_key
.kes_vk()
.as_bytes()
.to_vec(),
issue_number: operational_certificate_without_cold_verification_key.issue_number(),
start_kes_period: operational_certificate_without_cold_verification_key
.start_kes_period(),
cert_sig: operational_certificate_without_cold_verification_key
.cert_sig()
.to_bytes()
.to_vec(),
},
cold_verification_key: cold_verification_key.to_bytes().to_vec(),
};

Expand Down Expand Up @@ -177,10 +190,24 @@ mod tests {
DmqMsg {
msg_payload: expected_msg_payload.clone(),
kes_signature: kes_signature.to_bytes_vec().unwrap(),
operational_certificate: operational_certificate
.get_opcert_without_cold_verification_key()
.to_cbor_bytes()
.unwrap(),
operational_certificate: DmqMsgOperationalCertificate {
kes_vk: operational_certificate
.get_opcert_without_cold_verification_key()
.kes_vk()
.as_bytes()
.to_vec(),
issue_number: operational_certificate
.get_opcert_without_cold_verification_key()
.issue_number(),
start_kes_period: operational_certificate
.get_opcert_without_cold_verification_key()
.start_kes_period(),
cert_sig: operational_certificate
.get_opcert_without_cold_verification_key()
.cert_sig()
.to_bytes()
.to_vec(),
},
cold_verification_key: operational_certificate
.get_cold_verification_key()
.to_bytes()
Expand Down
11 changes: 9 additions & 2 deletions internal/mithril-dmq/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ impl<'de> Deserialize<'de> for DmqMessage {

#[cfg(test)]
mod tests {
use pallas_network::miniprotocols::localmsgsubmission::DmqMsgPayload;
use pallas_network::miniprotocols::localmsgsubmission::{
DmqMsgOperationalCertificate, DmqMsgPayload,
};

use super::*;

Expand All @@ -86,7 +88,12 @@ mod tests {
expires_at: 100,
},
kes_signature: vec![0, 1, 2, 3],
operational_certificate: vec![0, 1, 2, 3, 4],
operational_certificate: DmqMsgOperationalCertificate {
kes_vk: vec![12, 13, 14],
issue_number: 15,
start_kes_period: 16,
cert_sig: vec![17],
},
cold_verification_key: vec![0, 1, 2, 3, 4, 5],
};

Expand Down
Loading
Loading