Skip to content

Commit c7c2c63

Browse files
committed
fix(dmq): consumer client can't connect more that once to server
1 parent 1d34617 commit c7c2c63

File tree

4 files changed

+47
-30
lines changed

4 files changed

+47
-30
lines changed

internal/mithril-dmq/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ serde = { workspace = true }
3030
serde_bytes = "0.11.17"
3131
slog = { workspace = true }
3232
slog-scope = "4.4.0"
33-
tokio = { workspace = true, features = ["sync"] }
33+
tokio = { workspace = true, features = ["sync","rt-multi-thread"] }
3434

3535
[dev-dependencies]
3636
mockall = { workspace = true }

internal/mithril-dmq/src/consumer/client/pallas.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{fmt::Debug, marker::PhantomData, path::PathBuf};
22

33
use anyhow::{Context, anyhow};
44
use pallas_network::{facades::DmqClient, miniprotocols::localmsgnotification::State};
5-
use slog::{Logger, debug};
5+
use slog::{Logger, debug, error};
66
use tokio::sync::{Mutex, MutexGuard};
77

88
use mithril_common::{
@@ -138,6 +138,18 @@ impl<M: TryFromBytes + Debug + Sync + Send> DmqConsumerClient<M> for DmqConsumer
138138
}
139139
}
140140

141+
impl<M: TryFromBytes + Debug> Drop for DmqConsumerClientPallas<M> {
142+
fn drop(&mut self) {
143+
tokio::task::block_in_place(|| {
144+
tokio::runtime::Handle::current().block_on(async {
145+
if let Err(e) = self.drop_client().await {
146+
error!(self.logger, "Failed to drop DMQ consumer client: {}", e);
147+
}
148+
});
149+
});
150+
}
151+
}
152+
141153
#[cfg(all(test, unix))]
142154
mod tests {
143155

@@ -236,7 +248,7 @@ mod tests {
236248
})
237249
}
238250

239-
#[tokio::test]
251+
#[tokio::test(flavor = "multi_thread")]
240252
async fn pallas_dmq_consumer_publisher_succeeds_when_messages_are_available() {
241253
let socket_path = create_temp_dir(current_function!()).join("node.socket");
242254
let reply_messages = fake_msgs();
@@ -293,7 +305,7 @@ mod tests {
293305
result.expect_err("Should have timed out");
294306
}
295307

296-
#[tokio::test]
308+
#[tokio::test(flavor = "multi_thread")]
297309
async fn pallas_dmq_consumer_client_is_dropped_when_returning_error() {
298310
let socket_path = create_temp_dir(current_function!()).join("node.socket");
299311
let reply_messages = fake_msgs();

internal/mithril-dmq/src/consumer/server/pallas.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ impl DmqConsumerServerPallas {
9191
}
9292

9393
/// Drops the current `DmqServer`, if it exists.
94-
// TODO: remove allow dead code
95-
#[allow(dead_code)]
9694
async fn drop_server(&self) -> StdResult<()> {
9795
debug!(
9896
self.logger,
@@ -179,9 +177,9 @@ impl DmqConsumerServerPallas {
179177
}
180178
Err(err) => {
181179
error!(self.logger, "Failed to process message"; "error" => ?err);
182-
/* if let Err(drop_err) = self.drop_server().await {
180+
if let Err(drop_err) = self.drop_server().await {
183181
error!(self.logger, "Failed to drop DMQ consumer server"; "error" => ?drop_err);
184-
} */
182+
}
185183
}
186184
}
187185
}
@@ -197,15 +195,9 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
197195
self.logger,
198196
"Waiting for message received from the DMQ network"
199197
);
198+
200199
let mut server_guard = self.get_server().await?;
201200
let server = server_guard.as_mut().ok_or(anyhow!("DMQ server does not exist"))?;
202-
203-
debug!(
204-
self.logger,
205-
"DMQ Server state: {:?}",
206-
server.msg_notification().state()
207-
);
208-
209201
let request = server
210202
.msg_notification()
211203
.recv_next_request()
@@ -253,7 +245,6 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
253245
Ok(())
254246
}
255247

256-
/// Runs the DMQ publisher server, processing messages in a loop.
257248
async fn run(&self) -> StdResult<()> {
258249
info!(
259250
self.logger,
@@ -273,6 +264,18 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
273264
}
274265
}
275266

267+
impl Drop for DmqConsumerServerPallas {
268+
fn drop(&mut self) {
269+
tokio::task::block_in_place(|| {
270+
tokio::runtime::Handle::current().block_on(async {
271+
if let Err(e) = self.drop_server().await {
272+
error!(self.logger, "Failed to drop DMQ consumer server: {}", e);
273+
}
274+
});
275+
});
276+
}
277+
}
278+
276279
#[cfg(test)]
277280
mod tests {
278281
use std::{sync::Arc, time::Duration};
@@ -306,7 +309,7 @@ mod tests {
306309
}
307310
}
308311

309-
#[tokio::test]
312+
#[tokio::test(flavor = "multi_thread")]
310313
async fn pallas_dmq_consumer_server_non_blocking_success() {
311314
let (stop_tx, stop_rx) = watch::channel(());
312315
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
@@ -322,7 +325,8 @@ mod tests {
322325
let message = fake_msg();
323326
let client = tokio::spawn({
324327
async move {
325-
tokio::time::sleep(Duration::from_secs(1)).await;
328+
// sleep to avoid refused connection from the server
329+
tokio::time::sleep(Duration::from_millis(10)).await;
326330

327331
// client setup
328332
let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap();
@@ -364,7 +368,7 @@ mod tests {
364368
assert_eq!(vec![message], messages_received);
365369
}
366370

367-
#[tokio::test]
371+
#[tokio::test(flavor = "multi_thread")]
368372
async fn pallas_dmq_consumer_server_blocking_success() {
369373
let (stop_tx, stop_rx) = watch::channel(());
370374
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
@@ -380,7 +384,8 @@ mod tests {
380384
let message = fake_msg();
381385
let client = tokio::spawn({
382386
async move {
383-
tokio::time::sleep(Duration::from_secs(1)).await;
387+
// sleep to avoid refused connection from the server
388+
tokio::time::sleep(Duration::from_millis(10)).await;
384389

385390
// client setup
386391
let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap();
@@ -422,10 +427,10 @@ mod tests {
422427
assert_eq!(vec![message], messages_received);
423428
}
424429

425-
#[tokio::test]
430+
#[tokio::test(flavor = "multi_thread")]
426431
async fn pallas_dmq_consumer_server_blocking_blocks_when_no_message_available() {
427432
let (_stop_tx, stop_rx) = watch::channel(());
428-
let (signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
433+
let (_signature_dmq_tx, signature_dmq_rx) = unbounded_channel::<DmqMessage>();
429434
let socket_path = create_temp_dir(current_function!()).join("node.socket");
430435
let cardano_network = CardanoNetwork::TestNet(0);
431436
let dmq_consumer_server = Arc::new(DmqConsumerServerPallas::new(
@@ -437,6 +442,9 @@ mod tests {
437442
dmq_consumer_server.register_receiver(signature_dmq_rx).await.unwrap();
438443
let client = tokio::spawn({
439444
async move {
445+
// sleep to avoid refused connection from the server
446+
tokio::time::sleep(Duration::from_millis(10)).await;
447+
440448
// client setup
441449
let mut client = DmqClient::connect(socket_path.clone(), 0).await.unwrap();
442450

@@ -451,10 +459,9 @@ mod tests {
451459
localmsgnotification::State::BusyBlocking
452460
);
453461

454-
client_msg.recv_next_reply().await.unwrap();
462+
let _ = client_msg.recv_next_reply().await;
455463
}
456464
});
457-
let _signature_dmq_tx_clone = signature_dmq_tx.clone();
458465

459466
let result = tokio::select!(
460467
_res = sleep(Duration::from_millis(1000)) => {Err(anyhow!("Timeout"))},

internal/mithril-dmq/tests/consumer_client_server.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ async fn create_fake_msg(bytes: &[u8]) -> DmqMessage {
1919
let dmq_builder = DmqMessageBuilder::new(
2020
{
2121
let (kes_signature, operational_certificate) = KesSignerFake::dummy_signature();
22-
let kes_signer = KesSignerFake::new(vec![
23-
Ok((kes_signature, operational_certificate.clone())),
24-
Ok((kes_signature, operational_certificate.clone())), // TODO: remove this line once the hack of KES signature is removed in DMQ message builder
25-
]);
22+
let kes_signer =
23+
KesSignerFake::new(vec![Ok((kes_signature, operational_certificate.clone()))]);
2624

2725
Arc::new(kes_signer)
2826
},
@@ -33,7 +31,7 @@ async fn create_fake_msg(bytes: &[u8]) -> DmqMessage {
3331
dmq_builder.build(&message.to_bytes_vec().unwrap()).await.unwrap()
3432
}
3533

36-
#[tokio::test]
34+
#[tokio::test(flavor = "multi_thread")]
3735
async fn dmq_consumer_client_server() {
3836
let cardano_network = CardanoNetwork::TestNet(0);
3937
let socket_path =
@@ -89,7 +87,7 @@ async fn dmq_consumer_client_server() {
8987
);
9088

9189
// Sleep to avoid refused connection from the server
92-
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
90+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
9391

9492
// Start a second client, receive messages
9593
let client = tokio::spawn({

0 commit comments

Comments
 (0)