Skip to content

Commit be9c001

Browse files
committed
feat(dmq): add 'MessageQueue' implementation for the consumer server
1 parent d2400b3 commit be9c001

File tree

2 files changed

+171
-0
lines changed

2 files changed

+171
-0
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
mod interface;
2+
mod pallas;
3+
mod queue;
24

35
pub use interface::*;
6+
pub use pallas::*;
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
use std::collections::VecDeque;
2+
3+
use tokio::sync::{Mutex, Notify};
4+
5+
use crate::DmqMessage;
6+
7+
/// A queue for storing DMQ messages.
8+
pub(crate) struct MessageQueue {
9+
messages: Mutex<VecDeque<DmqMessage>>,
10+
new_message_notify: Notify,
11+
}
12+
13+
impl MessageQueue {
14+
/// Creates a new instance of [BlockingNonBlockingQueue].
15+
pub fn new() -> Self {
16+
Self {
17+
messages: Mutex::new(VecDeque::new()),
18+
new_message_notify: Notify::new(),
19+
}
20+
}
21+
22+
/// Enqueues a new message into the queue.
23+
pub async fn enqueue(&self, message: DmqMessage) {
24+
let mut message_queue_guard = self.messages.lock().await;
25+
(*message_queue_guard).push_back(message);
26+
27+
self.new_message_notify.notify_waiters();
28+
}
29+
30+
/// Returns the messages from the queue in a non blocking way, if available.
31+
pub async fn dequeue_non_blocking(&self, limit: Option<usize>) -> Vec<DmqMessage> {
32+
let mut message_queue_guard = self.messages.lock().await;
33+
let limit = limit.unwrap_or((*message_queue_guard).len());
34+
let mut messages = Vec::new();
35+
for _ in 0..limit {
36+
if let Some(message) = (*message_queue_guard).pop_front() {
37+
messages.push(message);
38+
}
39+
}
40+
41+
messages
42+
}
43+
44+
/// Returns the messages from the queue in a blocking way, waiting for new messages if necessary.
45+
pub async fn dequeue_blocking(&self, limit: Option<usize>) -> Vec<DmqMessage> {
46+
loop {
47+
let messages = self.dequeue_non_blocking(limit).await;
48+
if !messages.is_empty() {
49+
return messages;
50+
}
51+
52+
self.new_message_notify.notified().await;
53+
}
54+
}
55+
56+
/// Checks if the message queue is empty.
57+
pub async fn is_empty(&self) -> bool {
58+
self.len().await == 0
59+
}
60+
61+
/// Get the length of the message queue.
62+
pub async fn len(&self) -> usize {
63+
let message_queue_guard = self.messages.lock().await;
64+
(*message_queue_guard).len()
65+
}
66+
}
67+
68+
#[cfg(test)]
69+
mod tests {
70+
use std::{ops::RangeInclusive, time::Duration};
71+
72+
use anyhow::anyhow;
73+
use pallas_network::miniprotocols::localmsgsubmission::DmqMsg;
74+
use tokio::time::sleep;
75+
76+
use super::*;
77+
78+
fn fake_msg() -> DmqMsg {
79+
DmqMsg {
80+
msg_id: vec![0, 1],
81+
msg_body: vec![0, 1, 2],
82+
block_number: 10,
83+
ttl: 100,
84+
kes_signature: vec![0, 1, 2, 3],
85+
operational_certificate: vec![0, 1, 2, 3, 4],
86+
}
87+
}
88+
89+
fn fake_messages(range: RangeInclusive<u8>) -> Vec<DmqMessage> {
90+
range
91+
.map(|i| {
92+
DmqMsg {
93+
msg_id: vec![i],
94+
..fake_msg()
95+
}
96+
.into()
97+
})
98+
.collect::<Vec<_>>()
99+
}
100+
101+
#[tokio::test]
102+
async fn enqueue_and_dequeue_non_blocking_no_limit() {
103+
let queue = MessageQueue::new();
104+
let messages = fake_messages(1..=5);
105+
for message in messages.clone() {
106+
queue.enqueue(message).await;
107+
}
108+
let limit = None;
109+
110+
let dequeued_messages = queue.dequeue_non_blocking(limit).await;
111+
112+
assert_eq!(messages, dequeued_messages);
113+
}
114+
115+
#[tokio::test]
116+
async fn enqueue_and_dequeue_non_blocking_with_limit() {
117+
let queue = MessageQueue::new();
118+
let messages = fake_messages(1..=5);
119+
for message in messages.clone() {
120+
queue.enqueue(message).await;
121+
}
122+
let limit = Some(2);
123+
124+
let dequeued_messages = queue.dequeue_non_blocking(limit).await;
125+
126+
assert_eq!(messages[0..=1].to_vec(), dequeued_messages);
127+
}
128+
129+
#[tokio::test]
130+
async fn enqueue_and_dequeue_blocking_no_limit() {
131+
let queue = MessageQueue::new();
132+
let messages = fake_messages(1..=5);
133+
for message in messages.clone() {
134+
queue.enqueue(message).await;
135+
}
136+
let limit = None;
137+
138+
let dequeued_messages = queue.dequeue_blocking(limit).await;
139+
140+
assert_eq!(messages, dequeued_messages);
141+
}
142+
143+
#[tokio::test]
144+
async fn enqueue_and_dequeue_blocking_with_limit() {
145+
let queue = MessageQueue::new();
146+
let messages = fake_messages(1..=5);
147+
for message in messages.clone() {
148+
queue.enqueue(message).await;
149+
}
150+
let limit = Some(2);
151+
152+
let dequeued_messages = queue.dequeue_blocking(limit).await;
153+
154+
assert_eq!(messages[0..=1].to_vec(), dequeued_messages);
155+
}
156+
157+
#[tokio::test]
158+
async fn dequeue_blocking_blocks_when_no_message_available() {
159+
let queue = MessageQueue::new();
160+
161+
let result = tokio::select!(
162+
_res = sleep(Duration::from_millis(100)) => {Err(anyhow!("Timeout"))},
163+
_res = queue.dequeue_blocking(None) => {Ok(())},
164+
);
165+
166+
result.expect_err("Should have timed out");
167+
}
168+
}

0 commit comments

Comments
 (0)