Skip to content

Commit 07e1bfa

Browse files
committed
fix(dmq): run receive/serve messages on separate threads
1 parent 71760f6 commit 07e1bfa

File tree

1 file changed

+95
-40
lines changed
  • internal/mithril-dmq/src/consumer/server

1 file changed

+95
-40
lines changed

internal/mithril-dmq/src/consumer/server/pallas.rs

Lines changed: 95 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
use std::{fs, path::PathBuf};
22

33
use anyhow::{Context, anyhow};
4-
use pallas_network::{facades::DmqServer, miniprotocols::localmsgnotification::Request};
4+
use pallas_network::{
5+
facades::DmqServer,
6+
miniprotocols::localmsgnotification::{Request, State},
7+
};
58
use tokio::{
9+
join,
610
net::UnixListener,
711
select,
812
sync::{Mutex, MutexGuard, mpsc::UnboundedReceiver, watch::Receiver},
@@ -118,6 +122,77 @@ impl DmqConsumerServerPallas {
118122

119123
Ok(())
120124
}
125+
126+
/// Receives incoming messages into the DMQ consumer server.
127+
async fn receive_incoming_messages(&self) -> StdResult<()> {
128+
info!(
129+
self.logger,
130+
"Receive incoming messages into DMQ consumer server...";
131+
"socket" => ?self.socket,
132+
"network" => ?self.network
133+
);
134+
135+
let mut stop_rx = self.stop_rx.clone();
136+
let mut receiver = self.messages_receiver.lock().await;
137+
match *receiver {
138+
Some(ref mut receiver) => loop {
139+
select! {
140+
_ = stop_rx.changed() => {
141+
warn!(self.logger, "Stopping DMQ consumer server...");
142+
143+
return Ok(());
144+
}
145+
message = receiver.recv() => {
146+
if let Some(message) = message {
147+
debug!(self.logger, "Received a message from the DMQ network"; "message" => ?message);
148+
self.messages_buffer.enqueue(message).await;
149+
} else {
150+
warn!(self.logger, "DMQ message receiver channel closed");
151+
return Ok(());
152+
}
153+
154+
}
155+
}
156+
},
157+
None => {
158+
return Err(anyhow!("DMQ message receiver is not registered"));
159+
}
160+
}
161+
}
162+
163+
/// Serves incoming messages from the DMQ consumer server.
164+
async fn serve_incoming_messages(&self) -> StdResult<()> {
165+
info!(
166+
self.logger,
167+
"Serve incoming messages from DMQ consumer server...";
168+
"socket" => ?self.socket,
169+
"network" => ?self.network
170+
);
171+
172+
let mut stop_rx = self.stop_rx.clone();
173+
loop {
174+
select! {
175+
_ = stop_rx.changed() => {
176+
warn!(self.logger, "Stopping DMQ consumer server...");
177+
178+
return Ok(());
179+
}
180+
res = self.process_message() => {
181+
match res {
182+
Ok(_) => {
183+
debug!(self.logger, "Processed a message successfully");
184+
}
185+
Err(err) => {
186+
error!(self.logger, "Failed to process message"; "error" => ?err);
187+
/* if let Err(drop_err) = self.drop_server().await {
188+
error!(self.logger, "Failed to drop DMQ consumer server"; "error" => ?drop_err);
189+
} */
190+
}
191+
}
192+
}
193+
}
194+
}
195+
}
121196
}
122197

123198
#[async_trait::async_trait]
@@ -130,6 +205,12 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
130205
let mut server_guard = self.get_server().await?;
131206
let server = server_guard.as_mut().ok_or(anyhow!("DMQ server does not exist"))?;
132207

208+
debug!(
209+
self.logger,
210+
"DMQ Server state: {:?}",
211+
server.msg_notification().state()
212+
);
213+
133214
let request = server
134215
.msg_notification()
135216
.recv_next_request()
@@ -147,8 +228,13 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
147228
reply_messages.into_iter().map(|msg| msg.into()).collect::<Vec<_>>();
148229
server
149230
.msg_notification()
150-
.send_reply_messages_blocking(reply_messages)
231+
.send_reply_messages_blocking(reply_messages.clone())
151232
.await?;
233+
debug!(
234+
self.logger,
235+
"Blocking notification replied to the DMQ notification client: {:?}",
236+
reply_messages
237+
);
152238
}
153239
Request::NonBlocking => {
154240
debug!(
@@ -179,45 +265,14 @@ impl DmqConsumerServer for DmqConsumerServerPallas {
179265
"network" => ?self.network
180266
);
181267

182-
let mut stop_rx = self.stop_rx.clone();
183-
let mut receiver = self.messages_receiver.lock().await;
184-
match *receiver {
185-
Some(ref mut receiver) => loop {
186-
select! {
187-
_ = stop_rx.changed() => {
188-
warn!(self.logger, "Stopping DMQ consumer server...");
189-
190-
return Ok(());
191-
}
192-
message = receiver.recv() => {
193-
if let Some(message) = message {
194-
debug!(self.logger, "Received a message from the DMQ network"; "message" => ?message);
195-
self.messages_buffer.enqueue(message).await;
196-
} else {
197-
warn!(self.logger, "DMQ message receiver channel closed");
198-
return Ok(());
199-
}
268+
let (receive_result, serve_result) = join!(
269+
self.receive_incoming_messages(),
270+
self.serve_incoming_messages()
271+
);
272+
receive_result?;
273+
serve_result?;
200274

201-
}
202-
res = self.process_message() => {
203-
match res {
204-
Ok(_) => {
205-
debug!(self.logger, "Processed a message successfully");
206-
}
207-
Err(err) => {
208-
error!(self.logger, "Failed to process message"; "error" => ?err);
209-
if let Err(drop_err) = self.drop_server().await {
210-
error!(self.logger, "Failed to drop DMQ consumer server"; "error" => ?drop_err);
211-
}
212-
}
213-
}
214-
}
215-
}
216-
},
217-
None => {
218-
return Err(anyhow!("DMQ message receiver is not registered"));
219-
}
220-
}
275+
Ok(())
221276
}
222277
}
223278

0 commit comments

Comments
 (0)