Skip to content

Commit 0e1f911

Browse files
committed
test(dmq): add integration test for publisher client/server
1 parent 07e1bfa commit 0e1f911

File tree

1 file changed

+123
-0
lines changed

1 file changed

+123
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#![cfg(unix)]
2+
use std::sync::Arc;
3+
4+
use tokio::sync::{mpsc::unbounded_channel, watch};
5+
6+
use mithril_cardano_node_chain::test::double::FakeChainObserver;
7+
use mithril_common::{
8+
CardanoNetwork,
9+
crypto_helper::TryToBytes,
10+
current_function,
11+
test::{TempDir, crypto_helper::KesSignerFake},
12+
};
13+
use mithril_dmq::{
14+
DmqMessage, DmqMessageBuilder, DmqPublisherClient, DmqPublisherClientPallas,
15+
DmqPublisherServer, DmqPublisherServerPallas, test::payload::DmqMessageTestPayload,
16+
};
17+
18+
async fn create_fake_msg(bytes: &[u8]) -> DmqMessage {
19+
let dmq_builder = DmqMessageBuilder::new(
20+
{
21+
let (kes_signature, operational_certificate) = KesSignerFake::dummy_signature();
22+
let kes_signer = KesSignerFake::new(vec![
23+
Ok((kes_signature, operational_certificate.clone())),
24+
Ok((kes_signature, operational_certificate.clone())), // TODO: remove this line once the hack of KES signature is removed in DMQ message builder
25+
]);
26+
27+
Arc::new(kes_signer)
28+
},
29+
Arc::new(FakeChainObserver::default()),
30+
)
31+
.set_ttl(100);
32+
let message = DmqMessageTestPayload::new(bytes);
33+
dmq_builder.build(&message.to_bytes_vec().unwrap()).await.unwrap()
34+
}
35+
36+
#[tokio::test]
37+
async fn dmq_publisher_client_server() {
38+
let cardano_network = CardanoNetwork::TestNet(0);
39+
let socket_path =
40+
TempDir::create_with_short_path("dmq_publisher_client_server", current_function!())
41+
.join("node.socket");
42+
let (stop_tx, stop_rx) = watch::channel(());
43+
44+
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
45+
let server = tokio::spawn({
46+
let socket_path = socket_path.clone();
47+
async move {
48+
let dmq_publisher_server = Arc::new(DmqPublisherServerPallas::new(
49+
socket_path.to_path_buf(),
50+
cardano_network,
51+
stop_rx,
52+
slog_scope::logger(),
53+
));
54+
dmq_publisher_server
55+
.register_transmitter(signature_dmq_tx)
56+
.await
57+
.unwrap();
58+
dmq_publisher_server.run().await.unwrap();
59+
}
60+
});
61+
62+
let client = tokio::spawn({
63+
let socket_path = socket_path.clone();
64+
async move {
65+
let dmq_builder = DmqMessageBuilder::new(
66+
{
67+
let (kes_signature, operational_certificate) = KesSignerFake::dummy_signature();
68+
let kes_signer = KesSignerFake::new(vec![
69+
Ok((kes_signature, operational_certificate.clone())),
70+
Ok((kes_signature, operational_certificate.clone())),
71+
Ok((kes_signature, operational_certificate.clone())), // TODO: remove this line once the hack of KES signature is removed in DMQ message builder
72+
Ok((kes_signature, operational_certificate.clone())), // TODO: remove this line once the hack of KES signature is removed in DMQ message builder
73+
]);
74+
75+
Arc::new(kes_signer)
76+
},
77+
Arc::new(FakeChainObserver::default()),
78+
)
79+
.set_ttl(100);
80+
let publisher_client = DmqPublisherClientPallas::<DmqMessageTestPayload>::new(
81+
socket_path,
82+
cardano_network,
83+
dmq_builder,
84+
slog_scope::logger(),
85+
);
86+
87+
publisher_client
88+
.publish_message(DmqMessageTestPayload::new(b"msg_1"))
89+
.await
90+
.unwrap();
91+
// Sleep to avoid refused connection from the server
92+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
93+
publisher_client
94+
.publish_message(DmqMessageTestPayload::new(b"msg_2"))
95+
.await
96+
.unwrap();
97+
98+
stop_tx
99+
.send(())
100+
.expect("Failed to send stop signal to DMQ publisher server");
101+
}
102+
});
103+
104+
let recorder = tokio::spawn(async move {
105+
let messages: Vec<DmqMessage> = {
106+
let mut messages = vec![];
107+
let mut signature_dmq_rx = signature_dmq_rx;
108+
while let Some(message) = signature_dmq_rx.recv().await {
109+
messages.push(message);
110+
}
111+
112+
messages
113+
};
114+
115+
messages
116+
});
117+
118+
let (_, _, messages) = tokio::try_join!(server, client, recorder).unwrap();
119+
assert_eq!(
120+
vec![create_fake_msg(b"msg_1").await, create_fake_msg(b"msg_2").await],
121+
messages
122+
);
123+
}

0 commit comments

Comments
 (0)