Skip to content

Commit 67b18cd

Browse files
authored
Merge pull request #2672 from input-output-hk/jpraynaud/2670-update-dmq-message
refactor: update DMQ message structure
2 parents 27753f4 + fee0ce1 commit 67b18cd

File tree

26 files changed

+567
-259
lines changed

26 files changed

+567
-259
lines changed

Cargo.lock

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/mithril-dmq/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "mithril-dmq"
33
description = "Mechanisms to publish and consume messages of a 'Decentralized Message Queue network' through a DMQ node"
4-
version = "0.1.10"
4+
version = "0.1.11"
55
authors.workspace = true
66
documentation.workspace = true
77
edition.workspace = true

internal/mithril-dmq/src/consumer/client/pallas.rs

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use tokio::sync::{Mutex, MutexGuard};
77

88
use mithril_common::{
99
CardanoNetwork, StdResult,
10-
crypto_helper::{OpCert, TryFromBytes},
10+
crypto_helper::{
11+
OpCert, OpCertWithoutColdVerificationKey, TryFromBytes, ed25519::Ed25519VerificationKey,
12+
},
1113
entities::PartyId,
1214
logging::LoggerExtensions,
1315
};
@@ -113,10 +115,19 @@ impl<M: TryFromBytes + Debug> DmqConsumerClientPallas<M> {
113115
.0
114116
.into_iter()
115117
.map(|dmq_message| {
116-
let opcert = OpCert::try_from_bytes(&dmq_message.operational_certificate)
118+
let opcert_without_verification_key =
119+
OpCertWithoutColdVerificationKey::try_from_bytes(
120+
&dmq_message.operational_certificate,
121+
)
117122
.with_context(|| "Failed to parse operational certificate")?;
123+
let cold_verification_key =
124+
Ed25519VerificationKey::from_bytes(&dmq_message.cold_verification_key)
125+
.with_context(|| "Failed to parse cold verification key")?
126+
.into_inner();
127+
let opcert: OpCert =
128+
(opcert_without_verification_key, cold_verification_key).into();
118129
let party_id = opcert.compute_protocol_party_id()?;
119-
let payload = M::try_from_bytes(&dmq_message.msg_body)
130+
let payload = M::try_from_bytes(&dmq_message.msg_payload.msg_body)
120131
.with_context(|| "Failed to parse DMQ message body")?;
121132

122133
Ok((payload, party_id))
@@ -158,7 +169,10 @@ mod tests {
158169
use mithril_common::{crypto_helper::TryToBytes, current_function, test::TempDir};
159170
use pallas_network::{
160171
facades::DmqServer,
161-
miniprotocols::{localmsgnotification, localmsgsubmission::DmqMsg},
172+
miniprotocols::{
173+
localmsgnotification,
174+
localmsgsubmission::{DmqMsg, DmqMsgPayload},
175+
},
162176
};
163177
use tokio::{net::UnixListener, task::JoinHandle, time::sleep};
164178

@@ -173,41 +187,46 @@ mod tests {
173187
fn fake_msgs() -> Vec<DmqMsg> {
174188
vec![
175189
DmqMsg {
176-
msg_id: vec![0, 1],
177-
msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(),
178-
block_number: 10,
179-
ttl: 100,
190+
msg_payload: DmqMsgPayload {
191+
msg_id: vec![0, 1],
192+
msg_body: DmqMessageTestPayload::new(b"msg_1").to_bytes_vec().unwrap(),
193+
kes_period: 10,
194+
expires_at: 100,
195+
},
180196
kes_signature: vec![0, 1, 2, 3],
181197
operational_certificate: vec![
182-
130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40,
183-
198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125,
184-
203, 41, 0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198,
185-
171, 83, 19, 80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9,
186-
247, 4, 87, 170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86,
187-
37, 137, 188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5, 88,
188-
32, 32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105,
198+
132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198,
199+
142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41,
200+
0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19,
201+
80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87,
202+
170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137,
203+
188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5,
204+
],
205+
cold_verification_key: vec![
206+
32, 253, 186, 201, 177, 11, 117, 135, 187, 167, 181, 188, 22, 59, 206, 105,
189207
231, 150, 215, 30, 78, 212, 76, 16, 252, 180, 72, 134, 137, 247, 161, 68,
190208
],
191-
kes_period: 10,
192209
},
193210
DmqMsg {
194-
msg_id: vec![1, 2],
195-
msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(),
196-
block_number: 11,
197-
ttl: 100,
211+
msg_payload: DmqMsgPayload {
212+
msg_id: vec![1, 2],
213+
msg_body: DmqMessageTestPayload::new(b"msg_2").to_bytes_vec().unwrap(),
214+
kes_period: 11,
215+
expires_at: 101,
216+
},
198217
kes_signature: vec![1, 2, 3, 4],
199218
operational_certificate: vec![
200-
130, 132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40,
201-
198, 142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125,
202-
203, 41, 0, 0, 88, 64, 132, 4, 199, 39, 190, 173, 88, 102, 121, 117, 55, 62,
203-
39, 189, 113, 96, 175, 24, 171, 240, 74, 42, 139, 202, 128, 185, 44, 130, 209,
204-
77, 191, 122, 196, 224, 33, 158, 187, 156, 203, 190, 173, 150, 247, 87, 172,
205-
58, 153, 185, 157, 87, 128, 14, 187, 107, 187, 215, 105, 195, 107, 135, 172,
206-
43, 173, 9, 88, 32, 77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105,
207-
240, 103, 245, 159, 147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65,
208-
245, 200,
219+
132, 88, 32, 230, 80, 215, 83, 21, 9, 187, 108, 255, 215, 153, 140, 40, 198,
220+
142, 78, 200, 250, 98, 26, 9, 82, 32, 110, 161, 30, 176, 63, 205, 125, 203, 41,
221+
0, 0, 88, 64, 212, 171, 206, 39, 218, 5, 255, 3, 193, 52, 44, 198, 171, 83, 19,
222+
80, 114, 225, 186, 191, 156, 192, 84, 146, 245, 159, 31, 240, 9, 247, 4, 87,
223+
170, 168, 98, 199, 21, 139, 19, 190, 12, 251, 65, 215, 169, 26, 86, 37, 137,
224+
188, 17, 14, 178, 205, 175, 93, 39, 86, 4, 138, 187, 234, 95, 5,
225+
],
226+
cold_verification_key: vec![
227+
77, 75, 24, 6, 47, 133, 2, 89, 141, 224, 69, 202, 123, 105, 240, 103, 245, 159,
228+
147, 177, 110, 58, 248, 115, 58, 152, 138, 220, 35, 65, 245, 200,
209229
],
210-
kes_period: 11,
211230
},
212231
]
213232
}

internal/mithril-dmq/src/consumer/server/pallas.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -289,31 +289,20 @@ mod tests {
289289

290290
use mithril_common::{current_function, test::TempDir};
291291

292-
use crate::test_tools::TestLogger;
292+
use crate::{test::fake_message::compute_fake_msg, test_tools::TestLogger};
293293

294294
use super::*;
295295

296296
fn create_temp_dir(folder_name: &str) -> PathBuf {
297297
TempDir::create_with_short_path("dmq_consumer_server", folder_name)
298298
}
299299

300-
fn fake_msg() -> DmqMsg {
301-
DmqMsg {
302-
msg_id: vec![0, 1],
303-
msg_body: vec![0, 1, 2],
304-
block_number: 10,
305-
ttl: 100,
306-
kes_signature: vec![0, 1, 2, 3],
307-
operational_certificate: vec![0, 1, 2, 3, 4],
308-
kes_period: 10,
309-
}
310-
}
311-
312300
#[tokio::test(flavor = "multi_thread")]
313301
async fn pallas_dmq_consumer_server_non_blocking_success() {
302+
let current_function_name = current_function!();
314303
let (stop_tx, stop_rx) = watch::channel(());
315304
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
316-
let socket_path = create_temp_dir(current_function!()).join("node.socket");
305+
let socket_path = create_temp_dir(current_function_name).join("node.socket");
317306
let cardano_network = CardanoNetwork::TestNet(0);
318307
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
319308
socket_path.to_path_buf(),
@@ -322,7 +311,7 @@ mod tests {
322311
TestLogger::stdout(),
323312
));
324313
dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap();
325-
let message = fake_msg();
314+
let message: DmqMsg = compute_fake_msg(b"test", current_function_name).await.into();
326315
let client = tokio::spawn({
327316
async move {
328317
// sleep to avoid refused connection from the server
@@ -370,9 +359,10 @@ mod tests {
370359

371360
#[tokio::test(flavor = "multi_thread")]
372361
async fn pallas_dmq_consumer_server_blocking_success() {
362+
let current_function_name = current_function!();
373363
let (stop_tx, stop_rx) = watch::channel(());
374364
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
375-
let socket_path = create_temp_dir(current_function!()).join("node.socket");
365+
let socket_path = create_temp_dir(current_function_name).join("node.socket");
376366
let cardano_network = CardanoNetwork::TestNet(0);
377367
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
378368
socket_path.to_path_buf(),
@@ -381,7 +371,7 @@ mod tests {
381371
TestLogger::stdout(),
382372
));
383373
dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap();
384-
let message = fake_msg();
374+
let message: DmqMsg = compute_fake_msg(b"test", current_function_name).await.into();
385375
let client = tokio::spawn({
386376
async move {
387377
// sleep to avoid refused connection from the server

0 commit comments

Comments
 (0)