Skip to content

Commit 1453509

Browse files
authored
[ISSUE #6592]🚀Add lite pull consumer implementation with assigned message queue management (#6593)
1 parent c5b1a29 commit 1453509

File tree

4 files changed

+629
-0
lines changed

4 files changed

+629
-0
lines changed

rocketmq-client/src/consumer/consumer_impl.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414

1515
use std::sync::LazyLock;
1616

17+
pub(crate) mod assigned_message_queue;
1718
pub(crate) mod consume_message_concurrently_service;
1819
pub(crate) mod consume_message_orderly_service;
1920
pub(crate) mod consume_message_pop_concurrently_service;
2021
pub(crate) mod consume_message_pop_orderly_service;
2122
pub(crate) mod consume_message_service;
23+
pub(crate) mod default_lite_pull_consumer_impl;
2224
pub(crate) mod default_mq_push_consumer_impl;
25+
pub(crate) mod lite_pull_consume_request;
2326
pub(crate) mod message_request;
2427
pub(crate) mod pop_process_queue;
2528
pub(crate) mod pop_request;
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright 2023 The RocketMQ Rust Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use std::collections::HashSet;
17+
use std::sync::atomic::AtomicBool;
18+
use std::sync::atomic::AtomicI64;
19+
use std::sync::atomic::Ordering;
20+
use std::sync::Arc;
21+
22+
use rocketmq_common::common::message::message_queue::MessageQueue;
23+
use tokio::sync::RwLock;
24+
25+
use crate::consumer::consumer_impl::process_queue::ProcessQueue;
26+
27+
/// Represents a message queue with its associated state for lite pull consumer.
28+
/// Contains process queue reference, pause flag, and offset tracking for pull, consume, and seek
29+
/// operations.
30+
#[derive(Clone)]
31+
pub(crate) struct AssignedQueue {
32+
pub(crate) message_queue: MessageQueue,
33+
pub(crate) process_queue: Arc<ProcessQueue>,
34+
pub(crate) paused: Arc<AtomicBool>,
35+
pub(crate) pull_offset: Arc<AtomicI64>,
36+
pub(crate) consume_offset: Arc<AtomicI64>,
37+
pub(crate) seek_offset: Arc<AtomicI64>,
38+
}
39+
40+
impl AssignedQueue {
41+
pub fn new(message_queue: MessageQueue) -> Self {
42+
Self {
43+
message_queue,
44+
process_queue: Arc::new(ProcessQueue::new()),
45+
paused: Arc::new(AtomicBool::new(false)),
46+
pull_offset: Arc::new(AtomicI64::new(-1)),
47+
consume_offset: Arc::new(AtomicI64::new(-1)),
48+
seek_offset: Arc::new(AtomicI64::new(-1)),
49+
}
50+
}
51+
}
52+
53+
/// Manages assigned message queues and their process queues for lite pull consumer.
54+
#[derive(Clone)]
55+
pub struct AssignedMessageQueue {
56+
queue_map: Arc<RwLock<HashMap<MessageQueue, AssignedQueue>>>,
57+
}
58+
59+
impl AssignedMessageQueue {
60+
/// Creates a new empty AssignedMessageQueue.
61+
pub fn new() -> Self {
62+
Self {
63+
queue_map: Arc::new(RwLock::new(HashMap::new())),
64+
}
65+
}
66+
67+
/// Assigns a new message queue.
68+
pub async fn put(&self, mq: MessageQueue) {
69+
let mut map = self.queue_map.write().await;
70+
map.entry(mq.clone()).or_insert_with(|| AssignedQueue::new(mq));
71+
}
72+
73+
/// Removes a message queue and returns its process queue.
74+
pub async fn remove(&self, mq: &MessageQueue) -> Option<Arc<ProcessQueue>> {
75+
let mut map = self.queue_map.write().await;
76+
map.remove(mq).map(|aq| {
77+
aq.process_queue.set_dropped(true);
78+
aq.process_queue
79+
})
80+
}
81+
82+
/// Returns the process queue for the specified message queue.
83+
pub async fn get_process_queue(&self, mq: &MessageQueue) -> Option<Arc<ProcessQueue>> {
84+
let map = self.queue_map.read().await;
85+
map.get(mq).map(|aq| aq.process_queue.clone())
86+
}
87+
88+
/// Checks if the specified queue is paused.
89+
pub async fn is_paused(&self, mq: &MessageQueue) -> bool {
90+
let map = self.queue_map.read().await;
91+
map.get(mq).map(|aq| aq.paused.load(Ordering::Acquire)).unwrap_or(false)
92+
}
93+
94+
/// Sets the pause state for the specified queue.
95+
pub async fn set_paused(&self, mq: &MessageQueue, paused: bool) {
96+
let map = self.queue_map.read().await;
97+
if let Some(aq) = map.get(mq) {
98+
aq.paused.store(paused, Ordering::Release);
99+
}
100+
}
101+
102+
/// Returns the seek offset for the specified queue (-1 if not seeking).
103+
pub async fn get_seek_offset(&self, mq: &MessageQueue) -> i64 {
104+
let map = self.queue_map.read().await;
105+
map.get(mq)
106+
.map(|aq| aq.seek_offset.load(Ordering::Acquire))
107+
.unwrap_or(-1)
108+
}
109+
110+
/// Sets the seek offset for the specified queue.
111+
pub async fn set_seek_offset(&self, mq: &MessageQueue, offset: i64) {
112+
let map = self.queue_map.read().await;
113+
if let Some(aq) = map.get(mq) {
114+
aq.seek_offset.store(offset, Ordering::Release);
115+
}
116+
}
117+
118+
/// Clears the seek offset for the specified queue (sets to -1).
119+
pub async fn clear_seek_offset(&self, mq: &MessageQueue) {
120+
self.set_seek_offset(mq, -1).await;
121+
}
122+
123+
/// Returns the pull offset for the specified queue.
124+
pub async fn get_pull_offset(&self, mq: &MessageQueue) -> i64 {
125+
let map = self.queue_map.read().await;
126+
map.get(mq)
127+
.map(|aq| aq.pull_offset.load(Ordering::Acquire))
128+
.unwrap_or(-1)
129+
}
130+
131+
/// Updates the pull offset for the specified queue.
132+
pub async fn update_pull_offset(&self, mq: &MessageQueue, offset: i64, process_queue: &Arc<ProcessQueue>) {
133+
let map = self.queue_map.read().await;
134+
if let Some(aq) = map.get(mq) {
135+
// Only update if the process queue instance matches to prevent race conditions
136+
if Arc::ptr_eq(&aq.process_queue, process_queue) {
137+
aq.pull_offset.store(offset, Ordering::Release);
138+
}
139+
}
140+
}
141+
142+
/// Returns the consume offset for the specified queue.
143+
pub async fn get_consume_offset(&self, mq: &MessageQueue) -> i64 {
144+
let map = self.queue_map.read().await;
145+
map.get(mq)
146+
.map(|aq| aq.consume_offset.load(Ordering::Acquire))
147+
.unwrap_or(-1)
148+
}
149+
150+
/// Updates the consume offset for the specified queue.
151+
pub async fn update_consume_offset(&self, mq: &MessageQueue, offset: i64) {
152+
let map = self.queue_map.read().await;
153+
if let Some(aq) = map.get(mq) {
154+
aq.consume_offset.store(offset, Ordering::Release);
155+
}
156+
}
157+
158+
/// Returns all assigned message queues.
159+
pub async fn message_queues(&self) -> HashSet<MessageQueue> {
160+
let map = self.queue_map.read().await;
161+
map.keys().cloned().collect()
162+
}
163+
164+
/// Returns the number of assigned queues.
165+
pub async fn size(&self) -> usize {
166+
let map = self.queue_map.read().await;
167+
map.len()
168+
}
169+
170+
/// Clears all assigned queues.
171+
pub async fn clear(&self) {
172+
let mut map = self.queue_map.write().await;
173+
for (_, aq) in map.iter() {
174+
aq.process_queue.set_dropped(true);
175+
}
176+
map.clear();
177+
}
178+
}
179+
180+
impl Default for AssignedMessageQueue {
181+
fn default() -> Self {
182+
Self::new()
183+
}
184+
}

0 commit comments

Comments
 (0)