Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::Duration;
use cheetah_string::CheetahString;
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
use rocketmq_common::common::message::message_enum::MessageRequestMode;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::TimeUtils::current_millis;
use rocketmq_error::RocketMQResult;
Expand Down Expand Up @@ -736,6 +737,183 @@ impl DefaultLitePullConsumerImpl {
async fn get_cached_message_size_in_mib(&self) -> i64 {
self.assigned_message_queue.total_msg_size_in_mib().await
}

/// Polls for messages with the specified timeout.
pub async fn poll(&self, timeout_millis: u64) -> RocketMQResult<Vec<ArcMut<MessageExt>>> {
self.make_sure_state_ok()?;

if self.consumer_config.auto_commit {
self.maybe_auto_commit().await;
}

let deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_millis);

loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Ok(Vec::new());
}

// Only hold lock during receive, release before async operations.
let request = {
let mut rx = self.consume_request_rx.lock().await;
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Some(req)) => req,
Ok(None) => return Ok(Vec::new()),
Err(_) => return Ok(Vec::new()),
}
};

// Filter dropped queues without holding lock.
if request.process_queue.is_dropped() {
continue;
}

let messages = request.messages;
// Execute async operations without holding the lock.
let offset = request.process_queue.remove_message(&messages).await;
self.assigned_message_queue
.update_consume_offset(&request.message_queue, offset)
.await;

if !self.consume_message_hook_list.is_empty() {
for hook in &self.consume_message_hook_list {
// Execute consume message hooks
}
}
Comment on lines +779 to +783
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Empty hook execution loop - hooks are not being invoked.

The loop iterates over consume_message_hook_list but the body is empty. Either invoke the hooks or remove the dead code.

🔧 Suggested fix

If hooks should be invoked, implement the call:

 if !self.consume_message_hook_list.is_empty() {
     for hook in &self.consume_message_hook_list {
-        // Execute consume message hooks
+        // TODO: Build ConsumeMessageContext and invoke hook
+        // hook.consume_message_before(&context);
     }
 }

Alternatively, if this is placeholder code, consider removing it entirely until ready to implement, or add a TODO comment outside the loop to clarify intent.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if !self.consume_message_hook_list.is_empty() {
for hook in &self.consume_message_hook_list {
// Execute consume message hooks
}
}
if !self.consume_message_hook_list.is_empty() {
for hook in &self.consume_message_hook_list {
// TODO: Build ConsumeMessageContext and invoke hook
// hook.consume_message_before(&context);
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 779 - 783, The loop over consume_message_hook_list is a no-op so
hooks never run; either call the hook method or remove the dead loop: locate the
loop in default_lite_pull_consumer_impl.rs where consume_message_hook_list is
iterated and replace the empty body with invocations of the hook trait's method
(pass the current message(s) and any consumer/context needed per the hook trait,
e.g., call hook.consume_message(consumer, msgs, context) or the actual method
name defined by your ConsumeMessageHook trait), or if hooks are not yet
supported remove the loop and add a TODO explaining future intent.


return Ok(messages);
}
}

/// Checks if auto-commit is needed and performs it.
async fn maybe_auto_commit(&self) {
let now = current_millis() as i64;
let next_deadline = self.next_auto_commit_deadline.load(Ordering::Acquire);

if now >= next_deadline {
let new_deadline = now + self.consumer_config.auto_commit_interval_millis as i64;

// Use CAS to ensure only one thread performs the commit.
if self
.next_auto_commit_deadline
.compare_exchange(next_deadline, new_deadline, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
if let Err(e) = self.commit_all().await {
tracing::warn!("Auto-commit failed: {}", e);
// Revert deadline on failure to allow retry
self.next_auto_commit_deadline.store(next_deadline, Ordering::Release);
}
}
}
}

/// Commits offsets for all assigned message queues.
pub async fn commit_all(&self) -> RocketMQResult<()> {
let queues = self.assigned_message_queue.message_queues().await;

for mq in queues {
if let Err(e) = self.commit_sync(&mq, true).await {
tracing::error!("Failed to commit offset for queue {:?}: {}", mq, e);
}
}

Ok(())
}

/// Commits offset for a specific message queue.
pub async fn commit_sync(&self, mq: &MessageQueue, persist: bool) -> RocketMQResult<()> {
self.make_sure_state_ok()?;

let consume_offset = self.assigned_message_queue.get_consume_offset(mq).await;

if consume_offset == -1 {
tracing::warn!("Consume offset is -1 for queue {:?}, skip commit", mq);
return Ok(());
}

if let Some(pq) = self.assigned_message_queue.get_process_queue(mq).await {
if !pq.is_dropped() {
self.update_consume_offset(mq, consume_offset).await?;

if persist {
if let Some(ref offset_store) = self.offset_store {
let mut mqs = HashSet::new();
mqs.insert(mq.clone());
offset_store.mut_from_ref().persist_all(&mqs).await;
}
}
}
}

Ok(())
}

/// Commits offsets for multiple message queues.
pub async fn commit(&self, offsets: HashMap<MessageQueue, i64>, persist: bool) -> RocketMQResult<()> {
self.make_sure_state_ok()?;

if offsets.is_empty() {
tracing::warn!("Offset map is empty, skip commit");
return Ok(());
}

let mut mqs_to_persist = HashSet::new();

for (mq, offset) in offsets {
if offset == -1 {
tracing::error!("Consume offset is -1 for queue {:?}", mq);
continue;
}

if let Some(pq) = self.assigned_message_queue.get_process_queue(&mq).await {
if !pq.is_dropped() {
// Continue on error to allow other queues to commit.
if let Err(e) = self.update_consume_offset(&mq, offset).await {
tracing::error!("Failed to update offset for queue {:?}: {}", mq, e);
continue;
}
mqs_to_persist.insert(mq);
}
}
}

if persist && !mqs_to_persist.is_empty() {
if let Some(ref offset_store) = self.offset_store {
offset_store.mut_from_ref().persist_all(&mqs_to_persist).await;
}
}

Ok(())
}

/// Updates the consume offset for a message queue.
async fn update_consume_offset(&self, mq: &MessageQueue, offset: i64) -> RocketMQResult<()> {
if let Some(ref offset_store) = self.offset_store {
offset_store.update_offset(mq, offset, false).await;
}
Ok(())
}

/// Returns the set of assigned message queues.
pub async fn assignment(&self) -> HashSet<MessageQueue> {
self.assigned_message_queue.message_queues().await
}

/// Pauses consumption for the specified message queues.
pub async fn pause(&self, message_queues: &[MessageQueue]) {
for mq in message_queues {
self.assigned_message_queue.set_paused(mq, true).await;
}
}

/// Resumes consumption for the specified message queues.
pub async fn resume(&self, message_queues: &[MessageQueue]) {
for mq in message_queues {
self.assigned_message_queue.set_paused(mq, false).await;
}
}
}

impl MQConsumerInner for DefaultLitePullConsumerImpl {
Expand Down Expand Up @@ -773,8 +951,34 @@ impl MQConsumerInner for DefaultLitePullConsumerImpl {
}

async fn persist_consumer_offset(&self) {
// Offset persistence handled by commit operations
unimplemented!("persist_consumer_offset")
// Check service state before persisting
if let Err(e) = self.make_sure_state_ok() {
tracing::error!("Persist consumer offset error, service state invalid: {}", e);
return;
}

// Collect message queues quickly while holding locks, then release.
let subscription_type = *self.subscription_type.read().await;

let mqs = match subscription_type {
SubscriptionType::Subscribe => {
let process_queue_table = self
.rebalance_impl
.rebalance_impl_inner
.process_queue_table
.read()
.await;
process_queue_table.keys().cloned().collect::<HashSet<_>>()
}
SubscriptionType::Assign => self.assigned_message_queue.message_queues().await,
SubscriptionType::None => HashSet::new(),
};

if !mqs.is_empty() {
if let Some(ref offset_store) = self.offset_store {
offset_store.mut_from_ref().persist_all(&mqs).await;
}
}
}

async fn update_topic_subscribe_info(&self, topic: CheetahString, info: &HashSet<MessageQueue>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_rust::ArcMut;

use crate::consumer::consumer_impl::process_queue::ProcessQueue;

Expand All @@ -25,7 +26,7 @@ use crate::consumer::consumer_impl::process_queue::ProcessQueue;
#[derive(Clone)]
pub struct LitePullConsumeRequest {
/// Messages to be consumed.
pub(crate) messages: Vec<Arc<MessageExt>>,
pub(crate) messages: Vec<ArcMut<MessageExt>>,

/// The message queue these messages belong to.
pub(crate) message_queue: MessageQueue,
Expand All @@ -36,7 +37,11 @@ pub struct LitePullConsumeRequest {

impl LitePullConsumeRequest {
/// Creates a new consume request.
pub fn new(messages: Vec<Arc<MessageExt>>, message_queue: MessageQueue, process_queue: Arc<ProcessQueue>) -> Self {
pub fn new(
messages: Vec<ArcMut<MessageExt>>,
message_queue: MessageQueue,
process_queue: Arc<ProcessQueue>,
) -> Self {
Self {
messages,
message_queue,
Expand All @@ -45,7 +50,7 @@ impl LitePullConsumeRequest {
}

/// Returns the messages in this request.
pub fn messages(&self) -> &[Arc<MessageExt>] {
pub fn messages(&self) -> &[ArcMut<MessageExt>] {
&self.messages
}

Expand All @@ -60,7 +65,7 @@ impl LitePullConsumeRequest {
}

/// Consumes the request and returns its components.
pub fn into_parts(self) -> (Vec<Arc<MessageExt>>, MessageQueue, Arc<ProcessQueue>) {
pub fn into_parts(self) -> (Vec<ArcMut<MessageExt>>, MessageQueue, Arc<ProcessQueue>) {
(self.messages, self.message_queue, self.process_queue)
}
}
Loading