Skip to content

Commit 036ab54

Browse files
committed
test(dmq): add integration test for consumer client/server
1 parent 0e1f911 commit 036ab54

File tree

6 files changed

+94
-8
lines changed

6 files changed

+94
-8
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/mithril-dmq/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ bincode = { version = "2.0.1" }
2424
blake2 = "0.10.6"
2525
mithril-cardano-node-chain = { path = "../cardano-node/mithril-cardano-node-chain" }
2626
mithril-common = { path = "../../mithril-common" }
27-
pallas-network = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
2827
pallas-codec = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
28+
pallas-network = { git = "https://github.com/txpipe/pallas.git", branch = "main" }
2929
serde = { workspace = true }
3030
serde_bytes = "0.11.17"
3131
slog = { workspace = true }
32+
slog-scope = "4.4.0"
3233
tokio = { workspace = true, features = ["sync"] }
3334

3435
[dev-dependencies]

internal/mithril-dmq/src/consumer/server/pallas.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
use std::{fs, path::PathBuf};
22

33
use anyhow::{Context, anyhow};
4-
use pallas_network::{
5-
facades::DmqServer,
6-
miniprotocols::localmsgnotification::{Request, State},
7-
};
4+
use pallas_network::{facades::DmqServer, miniprotocols::localmsgnotification::Request};
85
use tokio::{
96
join,
107
net::UnixListener,

internal/mithril-dmq/src/test/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66
77
pub mod double;
88

9-
#[cfg(test)]
10-
pub(crate) mod payload;
9+
pub mod payload;
1110

1211
#[cfg(test)]
1312
mithril_common::define_test_logger!();

internal/mithril-dmq/src/test/payload.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! DmqMessageTestPayload module for tests only
2+
13
use std::fmt::Debug;
24

35
use mithril_common::{
@@ -7,7 +9,7 @@ use mithril_common::{
79
};
810

911
/// A test message payload for the DMQ.
10-
#[derive(PartialEq, Eq)]
12+
#[derive(PartialEq, Eq, Clone)]
1113
pub struct DmqMessageTestPayload {
1214
message: Vec<u8>,
1315
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#![cfg(unix)]
2+
use std::sync::Arc;
3+
4+
use tokio::sync::{mpsc::unbounded_channel, watch};
5+
6+
use mithril_cardano_node_chain::test::double::FakeChainObserver;
7+
use mithril_common::{
8+
CardanoNetwork,
9+
crypto_helper::TryToBytes,
10+
current_function,
11+
test::{TempDir, crypto_helper::KesSignerFake},
12+
};
13+
use mithril_dmq::{
14+
DmqConsumerClient, DmqConsumerClientPallas, DmqConsumerServer, DmqConsumerServerPallas,
15+
DmqMessage, DmqMessageBuilder, test::payload::DmqMessageTestPayload,
16+
};
17+
18+
async fn create_fake_msg(bytes: &[u8]) -> DmqMessage {
19+
let dmq_builder = DmqMessageBuilder::new(
20+
{
21+
let (kes_signature, operational_certificate) = KesSignerFake::dummy_signature();
22+
let kes_signer = KesSignerFake::new(vec![
23+
Ok((kes_signature, operational_certificate.clone())),
24+
Ok((kes_signature, operational_certificate.clone())), // TODO: remove this line once the hack of KES signature is removed in DMQ message builder
25+
]);
26+
27+
Arc::new(kes_signer)
28+
},
29+
Arc::new(FakeChainObserver::default()),
30+
)
31+
.set_ttl(100);
32+
let message = DmqMessageTestPayload::new(bytes);
33+
dmq_builder.build(&message.to_bytes_vec().unwrap()).await.unwrap()
34+
}
35+
36+
#[tokio::test]
37+
async fn dmq_consumer_client_server() {
38+
let cardano_network = CardanoNetwork::TestNet(0);
39+
let socket_path =
40+
TempDir::create_with_short_path("dmq_consumer_client_server", current_function!())
41+
.join("node.socket");
42+
let (stop_tx, stop_rx) = watch::channel(());
43+
44+
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
45+
let server = tokio::spawn({
46+
let socket_path = socket_path.clone();
47+
async move {
48+
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
49+
socket_path.to_path_buf(),
50+
cardano_network.to_owned(),
51+
stop_rx,
52+
slog_scope::logger(),
53+
));
54+
dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap();
55+
dmq_consumer_server.run().await.unwrap();
56+
}
57+
});
58+
59+
let client = tokio::spawn({
60+
let socket_path = socket_path.clone();
61+
async move {
62+
let consumer_client = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
63+
socket_path,
64+
cardano_network.clone(),
65+
slog_scope::logger(),
66+
);
67+
let mut messages = vec![];
68+
signature_dmq_tx.send(create_fake_msg(b"msg_1").await.into()).unwrap();
69+
messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap());
70+
signature_dmq_tx.send(create_fake_msg(b"msg_2").await.into()).unwrap();
71+
messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap());
72+
stop_tx.send(()).unwrap();
73+
74+
messages.into_iter().map(|(msg, _)| msg).collect::<Vec<_>>()
75+
}
76+
});
77+
78+
let (_, messages) = tokio::try_join!(server, client).unwrap();
79+
assert_eq!(
80+
vec![
81+
DmqMessageTestPayload::new(b"msg_1"),
82+
DmqMessageTestPayload::new(b"msg_2")
83+
],
84+
messages
85+
);
86+
}

0 commit comments

Comments
 (0)