Skip to content

Commit 4ce1016

Browse files
committed
fix(dmq): run receive/serve messages on separate threads
1 parent d3d6033 commit 4ce1016

File tree

1 file changed

+95
-40
lines changed
  • internal/mithril-dmq/src/consumer/server

1 file changed

+95
-40
lines changed

internal/mithril-dmq/src/consumer/server/pallas.rs

Lines changed: 95 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
use std::{fs, path::PathBuf};
22

33
use anyhow::{Context, anyhow};
4-
use pallas_network::{facades::DmqServer, miniprotocols::localmsgnotification::Request};
4+
use pallas_network::{
5+
facades::DmqServer,
6+
miniprotocols::localmsgnotification::{Request, State},
7+
};
58
use tokio::{
9+
join,
610
net::UnixListener,
711
select,
812
sync::{Mutex, MutexGuard, mpsc::UnboundedReceiver, watch::Receiver},
@@ -116,6 +120,77 @@ impl DmqConsumerServerPallas {
116120

117121
Ok(())
118122
}
123+
124+
/// Receives incoming messages into the DMQ consumer server.
125+
async fn receive_incoming_messages(&self) -> StdResult<()> {
126+
info!(
127+
self.logger,
128+
"Receive incoming messages into DMQ consumer server...";
129+
"socket" => ?self.socket,
130+
"network" => ?self.network
131+
);
132+
133+
let mut stop_rx = self.stop_rx.clone();
134+
let mut receiver = self.messages_receiver.lock().await;
135+
match *receiver {
136+
Some(ref mut receiver) => loop {
137+
select! {
138+
_ = stop_rx.changed() => {
139+
warn!(self.logger, "Stopping DMQ consumer server...");
140+
141+
return Ok(());
142+
}
143+
message = receiver.recv() => {
144+
if let Some(message) = message {
145+
debug!(self.logger, "Received a message from the DMQ network"; "message" => ?message);
146+
self.messages_buffer.enqueue(message).await;
147+
} else {
148+
warn!(self.logger, "DMQ message receiver channel closed");
149+
return Ok(());
150+
}
151+
152+
}
153+
}
154+
},
155+
None => {
156+
return Err(anyhow!("DMQ message receiver is not registered"));
157+
}
158+
}
159+
}
160+
161+
/// Serves incoming messages from the DMQ consumer server.
162+
async fn serve_incoming_messages(&self) -> StdResult<()> {
163+
info!(
164+
self.logger,
165+
"Serve incoming messages from DMQ consumer server...";
166+
"socket" => ?self.socket,
167+
"network" => ?self.network
168+
);
169+
170+
let mut stop_rx = self.stop_rx.clone();
171+
loop {
172+
select! {
173+
_ = stop_rx.changed() => {
174+
warn!(self.logger, "Stopping DMQ consumer server...");
175+
176+
return Ok(());
177+
}
178+
res = self.process_message() => {
179+
match res {
180+
Ok(_) => {
181+
debug!(self.logger, "Processed a message successfully");
182+
}
183+
Err(err) => {
184+
error!(self.logger, "Failed to process message"; "error" => ?err);
185+
/* if let Err(drop_err) = self.drop_server().await {
186+
error!(self.logger, "Failed to drop DMQ consumer server"; "error" => ?drop_err);
187+
} */
188+
}
189+
}
190+
}
191+
}
192+
}
193+
}
119194
}
120195

121196
#[async_trait::async_trait]
@@ -128,6 +203,12 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
128203
let mut server_guard = self.get_server().await?;
129204
let server = server_guard.as_mut().ok_or(anyhow!("DMQ server does not exist"))?;
130205

206+
debug!(
207+
self.logger,
208+
"DMQ Server state: {:?}",
209+
server.msg_notification().state()
210+
);
211+
131212
let request = server
132213
.msg_notification()
133214
.recv_next_request()
@@ -145,8 +226,13 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
145226
reply_messages.into_iter().map(|msg| msg.into()).collect::<Vec<_>>();
146227
server
147228
.msg_notification()
148-
.send_reply_messages_blocking(reply_messages)
229+
.send_reply_messages_blocking(reply_messages.clone())
149230
.await?;
231+
debug!(
232+
self.logger,
233+
"Blocking notification replied to the DMQ notification client: {:?}",
234+
reply_messages
235+
);
150236
}
151237
Request::NonBlocking => {
152238
debug!(
@@ -177,45 +263,14 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
177263
"network" => ?self.network
178264
);
179265

180-
let mut stop_rx = self.stop_rx.clone();
181-
let mut receiver = self.messages_receiver.lock().await;
182-
match *receiver {
183-
Some(ref mut receiver) => loop {
184-
select! {
185-
_ = stop_rx.changed() => {
186-
warn!(self.logger, "Stopping DMQ consumer server...");
187-
188-
return Ok(());
189-
}
190-
message = receiver.recv() => {
191-
if let Some(message) = message {
192-
debug!(self.logger, "Received a message from the DMQ network"; "message" => ?message);
193-
self.messages_buffer.enqueue(message).await;
194-
} else {
195-
warn!(self.logger, "DMQ message receiver channel closed");
196-
return Ok(());
197-
}
266+
let (receive_result, serve_result) = join!(
267+
self.receive_incoming_messages(),
268+
self.serve_incoming_messages()
269+
);
270+
receive_result?;
271+
serve_result?;
198272

199-
}
200-
res = self.process_message() => {
201-
match res {
202-
Ok(_) => {
203-
debug!(self.logger, "Processed a message successfully");
204-
}
205-
Err(err) => {
206-
error!(self.logger, "Failed to process message"; "error" => ?err);
207-
if let Err(drop_err) = self.drop_server().await {
208-
error!(self.logger, "Failed to drop DMQ consumer server"; "error" => ?drop_err);
209-
}
210-
}
211-
}
212-
}
213-
}
214-
},
215-
None => {
216-
return Err(anyhow!("DMQ message receiver is not registered"));
217-
}
218-
}
273+
Ok(())
219274
}
220275
}
221276

0 commit comments

Comments
 (0)