Skip to content

Commit f62ef03

Browse files
committed
feat(dmq): make consumer server queue drain expired messages
1 parent 68436e8 commit f62ef03

File tree

1 file changed

+114
-22
lines changed
  • internal/mithril-dmq/src/consumer/server

1 file changed

+114
-22
lines changed

internal/mithril-dmq/src/consumer/server/queue.rs

Lines changed: 114 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1-
use std::collections::VecDeque;
1+
use std::{collections::VecDeque, sync::Arc};
22

3+
use mithril_common::StdResult;
34
use tokio::sync::{Mutex, Notify};
45

5-
use crate::DmqMessage;
6+
use crate::{
7+
DmqMessage,
8+
model::{SystemUnixTimestampProvider, UnixTimestampProvider},
9+
};
610

711
/// A queue for storing DMQ messages.
812
pub(crate) struct MessageQueue {
913
messages: Mutex<VecDeque<DmqMessage>>,
1014
new_message_notify: Notify,
15+
timestamp_provider: Arc<dyn UnixTimestampProvider>,
1116
max_size: usize,
1217
}
1318

@@ -16,33 +21,62 @@ impl MessageQueue {
1621
const MAX_SIZE_DEFAULT: usize = 10000;
1722

1823
/// Creates a new instance of [BlockingNonBlockingQueue].
19-
pub fn new(max_size: usize) -> Self {
24+
pub fn new(max_size: usize, timestamp_provider: Arc<dyn UnixTimestampProvider>) -> Self {
2025
Self {
2126
messages: Mutex::new(VecDeque::new()),
2227
new_message_notify: Notify::new(),
28+
timestamp_provider,
2329
max_size,
2430
}
2531
}
2632

27-
/// Enqueues a new message into the queue.
28-
pub async fn enqueue(&self, message: DmqMessage) {
33+
/// Cleans the queue
34+
///
35+
/// Removes expired messages and ensures the queue does not exceed the maximum size.
36+
async fn clean_queue(&self) {
2937
let mut message_queue_guard = self.messages.lock().await;
30-
(*message_queue_guard).push_back(message);
38+
// Remove expired messages from the front of the queue
39+
// There may be other expired messages in the queue, but they will be removed on dequeue
40+
// This avoids full scan of the queue.
41+
while let Some(message) = message_queue_guard.front()
42+
&& self.has_message_expired(message).unwrap_or(false)
43+
{
44+
message_queue_guard.pop_front();
45+
}
3146

3247
while message_queue_guard.len() > self.max_size {
3348
message_queue_guard.pop_front();
3449
}
50+
}
51+
52+
/// Checks if a message has expired.
53+
fn has_message_expired(&self, message: &DmqMessage) -> StdResult<bool> {
54+
let current_timestamp: u32 = self.timestamp_provider.current_timestamp()?.try_into()?;
55+
Ok(message.msg_payload.expires_at < current_timestamp)
56+
}
57+
58+
/// Enqueues a new message into the queue.
59+
pub async fn enqueue(&self, message: DmqMessage) {
60+
{
61+
// Run in a block to avoid Mutex deadlock in clean_queue
62+
let mut message_queue_guard = self.messages.lock().await;
63+
(*message_queue_guard).push_back(message);
64+
}
65+
self.clean_queue().await;
3566

3667
self.new_message_notify.notify_waiters();
3768
}
3869

3970
/// Returns the messages from the queue in a non blocking way, if available.
4071
pub async fn dequeue_non_blocking(&self, limit: Option<usize>) -> Vec<DmqMessage> {
72+
self.clean_queue().await;
4173
let mut message_queue_guard = self.messages.lock().await;
4274
let limit = limit.unwrap_or((*message_queue_guard).len());
4375
let mut messages = Vec::new();
4476
for _ in 0..limit {
45-
if let Some(message) = (*message_queue_guard).pop_front() {
77+
if let Some(message) = (*message_queue_guard).pop_front()
78+
&& !self.has_message_expired(&message).unwrap_or(false)
79+
{
4680
messages.push(message);
4781
}
4882
}
@@ -76,7 +110,10 @@ impl MessageQueue {
76110

77111
impl Default for MessageQueue {
78112
fn default() -> Self {
79-
Self::new(Self::MAX_SIZE_DEFAULT)
113+
Self::new(
114+
Self::MAX_SIZE_DEFAULT,
115+
Arc::new(SystemUnixTimestampProvider),
116+
)
80117
}
81118
}
82119

@@ -88,6 +125,8 @@ mod tests {
88125
use pallas_network::miniprotocols::localmsgsubmission::{DmqMsg, DmqMsgPayload};
89126
use tokio::time::sleep;
90127

128+
use crate::model::MockUnixTimestampProvider;
129+
91130
use super::*;
92131

93132
fn fake_msg() -> DmqMsg {
@@ -104,20 +143,38 @@ mod tests {
104143
}
105144
}
106145

107-
fn fake_messages(range: RangeInclusive<u8>) -> Vec<DmqMessage> {
146+
fn fake_messages(range: RangeInclusive<u8>, expires_at: u32) -> Vec<DmqMessage> {
108147
range
109148
.map(|i| {
110149
let mut message = fake_msg();
111150
message.msg_payload.msg_id = vec![i];
151+
message.msg_payload.expires_at = expires_at;
112152
message.into()
113153
})
114154
.collect::<Vec<_>>()
115155
}
116156

157+
fn create_queue(max_size: usize, current_timestamp: u64) -> MessageQueue {
158+
MessageQueue::new(
159+
max_size,
160+
Arc::new({
161+
let mut mock_timestamp_provider = MockUnixTimestampProvider::new();
162+
mock_timestamp_provider
163+
.expect_current_timestamp()
164+
.returning(move || Ok(current_timestamp));
165+
166+
mock_timestamp_provider
167+
}),
168+
)
169+
}
170+
117171
#[tokio::test]
118172
async fn enqueue_and_dequeue_non_blocking_no_limit() {
119-
let queue = MessageQueue::default();
120-
let messages = fake_messages(1..=5);
173+
let max_size = 100;
174+
let current_timestamp = 10;
175+
let expires_at = 100;
176+
let queue = create_queue(max_size, current_timestamp);
177+
let messages = fake_messages(1..=5, expires_at);
121178
for message in messages.clone() {
122179
queue.enqueue(message).await;
123180
}
@@ -130,8 +187,11 @@ mod tests {
130187

131188
#[tokio::test]
132189
async fn enqueue_and_dequeue_non_blocking_with_limit() {
133-
let queue = MessageQueue::default();
134-
let messages = fake_messages(1..=5);
190+
let max_size = 100;
191+
let current_timestamp = 10;
192+
let expires_at = 100;
193+
let queue = create_queue(max_size, current_timestamp);
194+
let messages = fake_messages(1..=5, expires_at);
135195
for message in messages.clone() {
136196
queue.enqueue(message).await;
137197
}
@@ -144,8 +204,11 @@ mod tests {
144204

145205
#[tokio::test]
146206
async fn enqueue_and_dequeue_blocking_no_limit() {
147-
let queue = MessageQueue::default();
148-
let messages = fake_messages(1..=5);
207+
let max_size = 100;
208+
let current_timestamp = 10;
209+
let expires_at = 100;
210+
let queue = create_queue(max_size, current_timestamp);
211+
let messages = fake_messages(1..=5, expires_at);
149212
for message in messages.clone() {
150213
queue.enqueue(message).await;
151214
}
@@ -158,8 +221,11 @@ mod tests {
158221

159222
#[tokio::test]
160223
async fn enqueue_and_dequeue_blocking_with_limit() {
161-
let queue = MessageQueue::default();
162-
let messages = fake_messages(1..=5);
224+
let max_size = 100;
225+
let current_timestamp = 10;
226+
let expires_at = 100;
227+
let queue = create_queue(max_size, current_timestamp);
228+
let messages = fake_messages(1..=5, expires_at);
163229
for message in messages.clone() {
164230
queue.enqueue(message).await;
165231
}
@@ -172,7 +238,9 @@ mod tests {
172238

173239
#[tokio::test]
174240
async fn dequeue_blocking_blocks_when_no_message_available() {
175-
let queue = MessageQueue::default();
241+
let max_size = 100;
242+
let current_timestamp = 10;
243+
let queue = create_queue(max_size, current_timestamp);
176244

177245
let result = tokio::select!(
178246
_res = sleep(Duration::from_millis(100)) => {Err(anyhow!("Timeout"))},
@@ -183,10 +251,12 @@ mod tests {
183251
}
184252

185253
#[tokio::test]
186-
async fn enqueue_blocks_over_max_size_drains_oldest_messages() {
187-
let max_queue_size = 3;
188-
let queue = MessageQueue::new(max_queue_size);
189-
let messages = fake_messages(1..=5);
254+
async fn queue_drains_oldest_messages_when_full() {
255+
let max_size = 3;
256+
let current_timestamp = 10;
257+
let expires_at = 100;
258+
let queue = create_queue(max_size, current_timestamp);
259+
let messages = fake_messages(1..=5, expires_at);
190260
for message in messages.clone() {
191261
queue.enqueue(message).await;
192262
}
@@ -196,4 +266,26 @@ mod tests {
196266

197267
assert_eq!(messages[2..=4].to_vec(), dequeued_messages);
198268
}
269+
270+
#[tokio::test]
271+
async fn queue_drains_expired_message() {
272+
let max_size = 3;
273+
let current_timestamp = 10;
274+
let expires_at_expired = 1;
275+
let expires_at_non_expired = 100;
276+
let queue = create_queue(max_size, current_timestamp);
277+
let mut messages = fake_messages(1..=10, expires_at_expired);
278+
for (index, mut message) in messages.clone().into_iter().enumerate() {
279+
if index >= 5 {
280+
message.msg_payload.expires_at = expires_at_non_expired;
281+
messages[index] = message.clone();
282+
}
283+
queue.enqueue(message).await;
284+
}
285+
let limit = None;
286+
287+
let dequeued_messages = queue.dequeue_blocking(limit).await;
288+
289+
assert_eq!(messages[7..=9].to_vec(), dequeued_messages);
290+
}
199291
}

0 commit comments

Comments
 (0)