Skip to content

Commit 18da2a0

Browse files
committed
fix(dmq): consumer can not serve messages multiple times
1 parent 036ab54 commit 18da2a0

File tree

3 files changed

+23
-26
lines changed

3 files changed

+23
-26
lines changed

internal/mithril-dmq/src/consumer/client/pallas.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
22

33
use anyhow::{Context, anyhow};
4-
use pallas_network::facades::DmqClient;
5-
use slog::{Logger, debug, error};
4+
use pallas_network::{facades::DmqClient, miniprotocols::localmsgnotification::State};
5+
use slog::{Logger, debug};
66
use tokio::sync::{Mutex, MutexGuard};
77

88
use mithril_common::{
@@ -94,21 +94,20 @@ impl<M: TryFromBytes + Debug> DmqConsumerClientPallas<M> {
9494
debug!(self.logger, "Waiting for messages from DMQ...");
9595
let mut client_guard = self.get_client().await?;
9696
let client = client_guard.as_mut().ok_or(anyhow!("DMQ client does not exist"))?;
97-
client
98-
.msg_notification()
99-
.send_request_messages_blocking()
100-
.await
101-
.with_context(|| "Failed to request notifications from DMQ server: {}")?;
97+
if *client.msg_notification().state() == State::Idle {
98+
client
99+
.msg_notification()
100+
.send_request_messages_blocking()
101+
.await
102+
.with_context(|| "Failed to request notifications from DMQ server: {}")?;
103+
}
102104

103105
let reply = client
104106
.msg_notification()
105107
.recv_next_reply()
106108
.await
107109
.with_context(|| "Failed to receive notifications from DMQ server")?;
108110
debug!(self.logger, "Received single signatures from DMQ"; "messages" => ?reply);
109-
if let Err(e) = client.msg_notification().send_done().await {
110-
error!(self.logger, "Failed to send Done"; "error" => ?e);
111-
}
112111

113112
reply
114113
.0
@@ -227,10 +226,6 @@ mod tests {
227226
// server replies with messages if any
228227
server_msg.send_reply_messages_blocking(reply_messages).await.unwrap();
229228
assert_eq!(*server_msg.state(), localmsgnotification::State::Idle);
230-
231-
// server receives done from client
232-
server_msg.recv_done().await.unwrap();
233-
assert_eq!(*server_msg.state(), localmsgnotification::State::Done);
234229
} else {
235230
// server waits if no message available
236231
future::pending().await

internal/mithril-dmq/src/consumer/server/pallas.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,7 @@ impl DmqConsumerServerPallas {
151151
}
152152
}
153153
},
154-
None => {
155-
return Err(anyhow!("DMQ message receiver is not registered"));
156-
}
154+
None => Err(anyhow!("DMQ message receiver is not registered")),
157155
}
158156
}
159157

@@ -229,8 +227,7 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
229227
.await?;
230228
debug!(
231229
self.logger,
232-
"Blocking notification replied to the DMQ notification client: {:?}",
233-
reply_messages
230+
"Messages replied to the DMQ notification client: {:?}", reply_messages
234231
);
235232
}
236233
Request::NonBlocking => {
@@ -244,9 +241,12 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
244241
let has_more = !self.messages_buffer.is_empty().await;
245242
server
246243
.msg_notification()
247-
.send_reply_messages_non_blocking(reply_messages, has_more)
244+
.send_reply_messages_non_blocking(reply_messages.clone(), has_more)
248245
.await?;
249-
server.msg_notification().recv_done().await?;
246+
debug!(
247+
self.logger,
248+
"Messages replied to the DMQ notification client: {:?}", reply_messages
249+
);
250250
}
251251
};
252252

internal/mithril-dmq/tests/consumer_client_server.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ async fn dmq_consumer_client_server() {
4747
async move {
4848
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
4949
socket_path.to_path_buf(),
50-
cardano_network.to_owned(),
50+
cardano_network,
5151
stop_rx,
5252
slog_scope::logger(),
5353
));
@@ -61,13 +61,14 @@ async fn dmq_consumer_client_server() {
6161
async move {
6262
let consumer_client = DmqConsumerClientPallas::<DmqMessageTestPayload>::new(
6363
socket_path,
64-
cardano_network.clone(),
64+
cardano_network,
6565
slog_scope::logger(),
6666
);
6767
let mut messages = vec![];
68-
signature_dmq_tx.send(create_fake_msg(b"msg_1").await.into()).unwrap();
68+
signature_dmq_tx.send(create_fake_msg(b"msg_1").await).unwrap();
69+
signature_dmq_tx.send(create_fake_msg(b"msg_2").await).unwrap();
6970
messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap());
70-
signature_dmq_tx.send(create_fake_msg(b"msg_2").await.into()).unwrap();
71+
signature_dmq_tx.send(create_fake_msg(b"msg_3").await).unwrap();
7172
messages.extend_from_slice(&consumer_client.consume_messages().await.unwrap());
7273
stop_tx.send(()).unwrap();
7374

@@ -79,7 +80,8 @@ async fn dmq_consumer_client_server() {
7980
assert_eq!(
8081
vec![
8182
DmqMessageTestPayload::new(b"msg_1"),
82-
DmqMessageTestPayload::new(b"msg_2")
83+
DmqMessageTestPayload::new(b"msg_2"),
84+
DmqMessageTestPayload::new(b"msg_3")
8385
],
8486
messages
8587
);

0 commit comments

Comments
 (0)