Skip to content

Commit df080cd

Browse files
authored
Merge pull request #2972 from input-output-hk/jpraynaud/2833-fix-dmq-publisher-connection-on-error
fix: DMQ publisher error on rejected local submission response
2 parents 78d4c0d + a0c32c6 commit df080cd

File tree

5 files changed

+50
-42
lines changed

5 files changed

+50
-42
lines changed

Cargo.lock

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/cardano-node/mithril-cardano-node-chain/Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-cardano-node-chain"
3-
version = "0.1.18"
3+
version = "0.1.19"
44
authors.workspace = true
55
documentation.workspace = true
66
edition.workspace = true
@@ -14,11 +14,11 @@ async-trait = { workspace = true }
1414
hex = { workspace = true }
1515
mithril-common = { path = "../../../mithril-common" }
1616
nom = "8.0.0"
17-
pallas-addresses = { version = "0.34.0" }
18-
pallas-codec = { version = "0.34.0" }
19-
pallas-network = { version = "0.34.0" }
20-
pallas-primitives = { version = "0.34.0" }
21-
pallas-traverse = { version = "0.34.0" }
17+
pallas-addresses = { version = "0.35.0" }
18+
pallas-codec = { version = "0.35.0" }
19+
pallas-network = { version = "0.35.0" }
20+
pallas-primitives = { version = "0.35.0" }
21+
pallas-traverse = { version = "0.35.0" }
2222
rand_core = { workspace = true }
2323
serde = { workspace = true }
2424
serde_json = { workspace = true }
@@ -33,7 +33,7 @@ kes-summed-ed25519 = { version = "0.2.1", features = [
3333
"sk_clone_enabled",
3434
] }
3535
mockall = { workspace = true }
36-
pallas-crypto = "0.34.0"
36+
pallas-crypto = "0.35.0"
3737
slog-async = { workspace = true }
3838
slog-term = { workspace = true }
3939
tokio = { workspace = true, features = ["macros", "test-util"] }

internal/mithril-dmq/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "mithril-dmq"
33
description = "Mechanisms to publish and consume messages of a 'Decentralized Message Queue network' through a DMQ node"
4-
version = "0.1.22"
4+
version = "0.1.23"
55
authors.workspace = true
66
documentation.workspace = true
77
edition.workspace = true
@@ -22,8 +22,8 @@ blake2 = "0.10.6"
2222
hex = { workspace = true }
2323
mithril-cardano-node-chain = { path = "../cardano-node/mithril-cardano-node-chain" }
2424
mithril-common = { path = "../../mithril-common" }
25-
pallas-codec = { version = "0.34.0" }
26-
pallas-network = { version = "0.34.0" }
25+
pallas-codec = { version = "0.35.0" }
26+
pallas-network = { version = "0.35.0" }
2727
serde = { workspace = true }
2828
serde_bytes = "0.11.19"
2929
slog = { workspace = true }

internal/mithril-dmq/src/publisher/client/pallas.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,12 @@ impl<M: TryToBytes + Debug + Sync + Send> DmqPublisherClient<M> for DmqPublisher
6565
.send_submit_tx(dmq_message.into())
6666
.await
6767
.with_context(|| "Failed to submit DMQ message")?;
68-
let response = client.msg_submission().recv_submit_tx_response().await?;
68+
let response = client.msg_submission().recv_submit_tx_response().await;
6969
if let Err(e) = client.msg_submission().terminate_gracefully().await {
7070
error!(self.logger, "Failed to send Done"; "error" => ?e);
71+
client.abort().await;
7172
}
72-
client.abort().await;
73+
let response = response?;
7374

7475
if response != Response::Accepted {
7576
anyhow::bail!("Failed to publish DMQ message: {:?}", response);
@@ -84,7 +85,8 @@ mod tests {
8485
use std::{fs, sync::Arc, time::Duration};
8586

8687
use pallas_network::miniprotocols::{
87-
localmsgsubmission::DmqMsgValidationError, localtxsubmission,
88+
localmsgsubmission::{DmqMsgRejectReason, DmqMsgValidationError},
89+
localtxsubmission,
8890
};
8991
use tokio::{net::UnixListener, task::JoinHandle};
9092

@@ -102,14 +104,14 @@ mod tests {
102104
TempDir::create_with_short_path("dmq_publisher", folder_name)
103105
}
104106

105-
fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<()> {
107+
fn setup_dmq_server(socket_path: PathBuf, reply_success: bool) -> JoinHandle<StdResult<()>> {
106108
tokio::spawn({
107109
async move {
108110
// server setup
109111
if socket_path.exists() {
110-
fs::remove_file(socket_path.clone()).unwrap();
112+
fs::remove_file(socket_path.clone())?;
111113
}
112-
let listener = UnixListener::bind(socket_path).unwrap();
114+
let listener = UnixListener::bind(socket_path)?;
113115
let mut server = pallas_network::facades::DmqServer::accept(&listener, 0)
114116
.await
115117
.unwrap();
@@ -118,7 +120,7 @@ mod tests {
118120
let server_msg = server.msg_submission();
119121

120122
// server waits for request from client and replies to it
121-
let request = server_msg.recv_next_request().await.unwrap();
123+
let request = server_msg.recv_next_request().await?;
122124
match &request {
123125
localtxsubmission::Request::Submit(_) => (),
124126
request => panic!("Expected a Submit request, but received: {request:?}"),
@@ -127,14 +129,18 @@ mod tests {
127129
localtxsubmission::Response::Accepted
128130
} else {
129131
localtxsubmission::Response::Rejected(DmqMsgValidationError(
130-
"fake error".to_string(),
132+
DmqMsgRejectReason::Other("fake error".to_string()),
131133
))
132134
};
133-
server_msg.send_submit_tx_response(response).await.unwrap();
135+
server_msg.send_submit_tx_response(response).await?;
134136

135137
// server receives done from client
136-
let request = server_msg.recv_next_request().await.unwrap();
137-
assert_eq!(localtxsubmission::Request::Done, request);
138+
let request = server_msg.recv_next_request().await?;
139+
if request != localtxsubmission::Request::Done {
140+
anyhow::bail!("Expected a Done request, but received: {request:?}");
141+
}
142+
143+
Ok(())
138144
}
139145
})
140146
}
@@ -173,9 +179,10 @@ mod tests {
173179
publisher.publish_message(DmqMessageTestPayload::dummy()).await
174180
});
175181

176-
let (_, res) = tokio::join!(server, client);
182+
let (res_server, res_client) = tokio::join!(server, client);
177183

178-
res.unwrap().unwrap();
184+
res_server.unwrap().unwrap();
185+
res_client.unwrap().unwrap();
179186
}
180187

181188
#[tokio::test(flavor = "multi_thread")]
@@ -211,8 +218,9 @@ mod tests {
211218
publisher.publish_message(DmqMessageTestPayload::dummy()).await
212219
});
213220

214-
let (_, res) = tokio::join!(server, client);
221+
let (res_server, res_client) = tokio::join!(server, client);
215222

216-
res.unwrap().expect_err("Publishing DMQ message should fail");
223+
res_server.unwrap().unwrap();
224+
res_client.unwrap().expect_err("Publishing DMQ message should fail");
217225
}
218226
}

internal/mithril-dmq/src/publisher/server/pallas.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::{Context, anyhow};
44
use pallas_network::{
55
facades::DmqServer,
66
miniprotocols::{
7-
localmsgsubmission::DmqMsgValidationError,
7+
localmsgsubmission::{DmqMsgRejectReason, DmqMsgValidationError},
88
localtxsubmission::{Request, Response},
99
},
1010
};
@@ -148,8 +148,8 @@ impl DmqPublisherServer for DmqPublisherServerPallas {
148148
);
149149
(
150150
None,
151-
Response::Rejected(DmqMsgValidationError(format!(
152-
"Expected a Submit request, but received: {request:?}"
151+
Response::Rejected(DmqMsgValidationError(DmqMsgRejectReason::Invalid(
152+
format!("Expected a Submit request, but received: {request:?}"),
153153
))),
154154
)
155155
}

0 commit comments

Comments
 (0)