Skip to content

Commit 1d34617

Browse files
committed
test(dmq): make consumer integration test check client/server deconnection
1 parent 80a6571 commit 1d34617

File tree

1 file changed

+30
-2
lines changed

1 file changed

+30
-2
lines changed

internal/mithril-dmq/tests/consumer_client_server.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ async fn dmq_consumer_client_server() {
4141
.join("node.socket");
4242
let (stop_tx, stop_rx) = watch::channel(());
4343

44+
// Start the server
4445
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
4546
let server = tokio::spawn({
4647
let socket_path = socket_path.clone();
@@ -56,8 +57,10 @@ async fn dmq_consumer_client_server() {
5657
}
5758
});
5859

60+
// Start a first client, receive messages and wait for its deconnection
5961
let client = tokio::spawn({
6062
let socket_path = socket_path.clone();
63+
let signature_dmq_tx = signature_dmq_tx.clone();
6164
async move {
6265
let consumer_client = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
6366
socket_path,
@@ -70,13 +73,12 @@ async fn dmq_consumer_client_server() {
7073
messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap());
7174
signature_dmq_tx.send(create_fake_msg(b"msg_3").await).unwrap();
7275
messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap());
73-
stop_tx.send(()).unwrap();
7476

7577
messages.into_iter().map(|(msg, _)| msg).collect::<Vec<_>>()
7678
}
7779
});
7880

79-
let (_, messages) = tokio::try_join!(server, client).unwrap();
81+
let messages = client.await.unwrap();
8082
assert_eq!(
8183
vec![
8284
DmqMessageTestPayload::new(b"msg_1"),
@@ -85,4 +87,30 @@ async fn dmq_consumer_client_server() {
8587
],
8688
messages
8789
);
90+
91+
// Sleep to avoid refused connection from the server
92+
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
93+
94+
// Start a second client, receive messages
95+
let client = tokio::spawn({
96+
let socket_path = socket_path.clone();
97+
let signature_dmq_tx = signature_dmq_tx.clone();
98+
async move {
99+
let consumer_client = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
100+
socket_path,
101+
cardano_network,
102+
slog_scope::logger(),
103+
);
104+
let mut messages = vec![];
105+
signature_dmq_tx.send(create_fake_msg(b"msg_4").await).unwrap();
106+
messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap());
107+
stop_tx.send(()).unwrap();
108+
109+
messages.into_iter().map(|(msg, _)| msg).collect::<Vec<_>>()
110+
}
111+
});
112+
113+
// Check that all messages have been correctly received
114+
let (_, messages) = tokio::try_join!(server, client).unwrap();
115+
assert_eq!(vec![DmqMessageTestPayload::new(b"msg_4")], messages);
88116
}

0 commit comments

Comments
 (0)