Skip to content

Commit 768268b

Browse files
authored
[ISSUE #6601]šŸš€DefaultLitePullConsumerImpl with message polling and offset management improvements (#6602)
1 parent 71ec071 commit 768268b

File tree

2 files changed

+215
-6
lines changed

2 files changed

+215
-6
lines changed

ā€Žrocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsā€Ž

Lines changed: 206 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::time::Duration;
2424
use cheetah_string::CheetahString;
2525
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
2626
use rocketmq_common::common::message::message_enum::MessageRequestMode;
27+
use rocketmq_common::common::message::message_ext::MessageExt;
2728
use rocketmq_common::common::message::message_queue::MessageQueue;
2829
use rocketmq_common::TimeUtils::current_millis;
2930
use rocketmq_error::RocketMQResult;
@@ -736,6 +737,183 @@ impl DefaultLitePullConsumerImpl {
736737
async fn get_cached_message_size_in_mib(&self) -> i64 {
737738
self.assigned_message_queue.total_msg_size_in_mib().await
738739
}
740+
741+
/// Polls for messages with the specified timeout.
742+
pub async fn poll(&self, timeout_millis: u64) -> RocketMQResult<Vec<ArcMut<MessageExt>>> {
743+
self.make_sure_state_ok()?;
744+
745+
if self.consumer_config.auto_commit {
746+
self.maybe_auto_commit().await;
747+
}
748+
749+
let deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_millis);
750+
751+
loop {
752+
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
753+
if remaining.is_zero() {
754+
return Ok(Vec::new());
755+
}
756+
757+
// Only hold lock during receive, release before async operations.
758+
let request = {
759+
let mut rx = self.consume_request_rx.lock().await;
760+
match tokio::time::timeout(remaining, rx.recv()).await {
761+
Ok(Some(req)) => req,
762+
Ok(None) => return Ok(Vec::new()),
763+
Err(_) => return Ok(Vec::new()),
764+
}
765+
};
766+
767+
// Filter dropped queues without holding lock.
768+
if request.process_queue.is_dropped() {
769+
continue;
770+
}
771+
772+
let messages = request.messages;
773+
// Execute async operations without holding the lock.
774+
let offset = request.process_queue.remove_message(&messages).await;
775+
self.assigned_message_queue
776+
.update_consume_offset(&request.message_queue, offset)
777+
.await;
778+
779+
if !self.consume_message_hook_list.is_empty() {
780+
for hook in &self.consume_message_hook_list {
781+
// Execute consume message hooks
782+
}
783+
}
784+
785+
return Ok(messages);
786+
}
787+
}
788+
789+
/// Checks if auto-commit is needed and performs it.
790+
async fn maybe_auto_commit(&self) {
791+
let now = current_millis() as i64;
792+
let next_deadline = self.next_auto_commit_deadline.load(Ordering::Acquire);
793+
794+
if now >= next_deadline {
795+
let new_deadline = now + self.consumer_config.auto_commit_interval_millis as i64;
796+
797+
// Use CAS to ensure only one thread performs the commit.
798+
if self
799+
.next_auto_commit_deadline
800+
.compare_exchange(next_deadline, new_deadline, Ordering::AcqRel, Ordering::Acquire)
801+
.is_ok()
802+
{
803+
if let Err(e) = self.commit_all().await {
804+
tracing::warn!("Auto-commit failed: {}", e);
805+
// Revert deadline on failure to allow retry
806+
self.next_auto_commit_deadline.store(next_deadline, Ordering::Release);
807+
}
808+
}
809+
}
810+
}
811+
812+
/// Commits offsets for all assigned message queues.
813+
pub async fn commit_all(&self) -> RocketMQResult<()> {
814+
let queues = self.assigned_message_queue.message_queues().await;
815+
816+
for mq in queues {
817+
if let Err(e) = self.commit_sync(&mq, true).await {
818+
tracing::error!("Failed to commit offset for queue {:?}: {}", mq, e);
819+
}
820+
}
821+
822+
Ok(())
823+
}
824+
825+
/// Commits offset for a specific message queue.
826+
pub async fn commit_sync(&self, mq: &MessageQueue, persist: bool) -> RocketMQResult<()> {
827+
self.make_sure_state_ok()?;
828+
829+
let consume_offset = self.assigned_message_queue.get_consume_offset(mq).await;
830+
831+
if consume_offset == -1 {
832+
tracing::warn!("Consume offset is -1 for queue {:?}, skip commit", mq);
833+
return Ok(());
834+
}
835+
836+
if let Some(pq) = self.assigned_message_queue.get_process_queue(mq).await {
837+
if !pq.is_dropped() {
838+
self.update_consume_offset(mq, consume_offset).await?;
839+
840+
if persist {
841+
if let Some(ref offset_store) = self.offset_store {
842+
let mut mqs = HashSet::new();
843+
mqs.insert(mq.clone());
844+
offset_store.mut_from_ref().persist_all(&mqs).await;
845+
}
846+
}
847+
}
848+
}
849+
850+
Ok(())
851+
}
852+
853+
/// Commits offsets for multiple message queues.
854+
pub async fn commit(&self, offsets: HashMap<MessageQueue, i64>, persist: bool) -> RocketMQResult<()> {
855+
self.make_sure_state_ok()?;
856+
857+
if offsets.is_empty() {
858+
tracing::warn!("Offset map is empty, skip commit");
859+
return Ok(());
860+
}
861+
862+
let mut mqs_to_persist = HashSet::new();
863+
864+
for (mq, offset) in offsets {
865+
if offset == -1 {
866+
tracing::error!("Consume offset is -1 for queue {:?}", mq);
867+
continue;
868+
}
869+
870+
if let Some(pq) = self.assigned_message_queue.get_process_queue(&mq).await {
871+
if !pq.is_dropped() {
872+
// Continue on error to allow other queues to commit.
873+
if let Err(e) = self.update_consume_offset(&mq, offset).await {
874+
tracing::error!("Failed to update offset for queue {:?}: {}", mq, e);
875+
continue;
876+
}
877+
mqs_to_persist.insert(mq);
878+
}
879+
}
880+
}
881+
882+
if persist && !mqs_to_persist.is_empty() {
883+
if let Some(ref offset_store) = self.offset_store {
884+
offset_store.mut_from_ref().persist_all(&mqs_to_persist).await;
885+
}
886+
}
887+
888+
Ok(())
889+
}
890+
891+
/// Updates the consume offset for a message queue.
892+
async fn update_consume_offset(&self, mq: &MessageQueue, offset: i64) -> RocketMQResult<()> {
893+
if let Some(ref offset_store) = self.offset_store {
894+
offset_store.update_offset(mq, offset, false).await;
895+
}
896+
Ok(())
897+
}
898+
899+
/// Returns the set of assigned message queues.
900+
pub async fn assignment(&self) -> HashSet<MessageQueue> {
901+
self.assigned_message_queue.message_queues().await
902+
}
903+
904+
/// Pauses consumption for the specified message queues.
905+
pub async fn pause(&self, message_queues: &[MessageQueue]) {
906+
for mq in message_queues {
907+
self.assigned_message_queue.set_paused(mq, true).await;
908+
}
909+
}
910+
911+
/// Resumes consumption for the specified message queues.
912+
pub async fn resume(&self, message_queues: &[MessageQueue]) {
913+
for mq in message_queues {
914+
self.assigned_message_queue.set_paused(mq, false).await;
915+
}
916+
}
739917
}
740918

741919
impl MQConsumerInner for DefaultLitePullConsumerImpl {
@@ -773,8 +951,34 @@ impl MQConsumerInner for DefaultLitePullConsumerImpl {
773951
}
774952

775953
async fn persist_consumer_offset(&self) {
776-
// Offset persistence handled by commit operations
777-
unimplemented!("persist_consumer_offset")
954+
// Check service state before persisting
955+
if let Err(e) = self.make_sure_state_ok() {
956+
tracing::error!("Persist consumer offset error, service state invalid: {}", e);
957+
return;
958+
}
959+
960+
// Collect message queues quickly while holding locks, then release.
961+
let subscription_type = *self.subscription_type.read().await;
962+
963+
let mqs = match subscription_type {
964+
SubscriptionType::Subscribe => {
965+
let process_queue_table = self
966+
.rebalance_impl
967+
.rebalance_impl_inner
968+
.process_queue_table
969+
.read()
970+
.await;
971+
process_queue_table.keys().cloned().collect::<HashSet<_>>()
972+
}
973+
SubscriptionType::Assign => self.assigned_message_queue.message_queues().await,
974+
SubscriptionType::None => HashSet::new(),
975+
};
976+
977+
if !mqs.is_empty() {
978+
if let Some(ref offset_store) = self.offset_store {
979+
offset_store.mut_from_ref().persist_all(&mqs).await;
980+
}
981+
}
778982
}
779983

780984
async fn update_topic_subscribe_info(&self, topic: CheetahString, info: &HashSet<MessageQueue>) {

ā€Žrocketmq-client/src/consumer/consumer_impl/lite_pull_consume_request.rsā€Ž

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::sync::Arc;
1616

1717
use rocketmq_common::common::message::message_ext::MessageExt;
1818
use rocketmq_common::common::message::message_queue::MessageQueue;
19+
use rocketmq_rust::ArcMut;
1920

2021
use crate::consumer::consumer_impl::process_queue::ProcessQueue;
2122

@@ -25,7 +26,7 @@ use crate::consumer::consumer_impl::process_queue::ProcessQueue;
2526
#[derive(Clone)]
2627
pub struct LitePullConsumeRequest {
2728
/// Messages to be consumed.
28-
pub(crate) messages: Vec<Arc<MessageExt>>,
29+
pub(crate) messages: Vec<ArcMut<MessageExt>>,
2930

3031
/// The message queue these messages belong to.
3132
pub(crate) message_queue: MessageQueue,
@@ -36,7 +37,11 @@ pub struct LitePullConsumeRequest {
3637

3738
impl LitePullConsumeRequest {
3839
/// Creates a new consume request.
39-
pub fn new(messages: Vec<Arc<MessageExt>>, message_queue: MessageQueue, process_queue: Arc<ProcessQueue>) -> Self {
40+
pub fn new(
41+
messages: Vec<ArcMut<MessageExt>>,
42+
message_queue: MessageQueue,
43+
process_queue: Arc<ProcessQueue>,
44+
) -> Self {
4045
Self {
4146
messages,
4247
message_queue,
@@ -45,7 +50,7 @@ impl LitePullConsumeRequest {
4550
}
4651

4752
/// Returns the messages in this request.
48-
pub fn messages(&self) -> &[Arc<MessageExt>] {
53+
pub fn messages(&self) -> &[ArcMut<MessageExt>] {
4954
&self.messages
5055
}
5156

@@ -60,7 +65,7 @@ impl LitePullConsumeRequest {
6065
}
6166

6267
/// Consumes the request and returns its components.
63-
pub fn into_parts(self) -> (Vec<Arc<MessageExt>>, MessageQueue, Arc<ProcessQueue>) {
68+
pub fn into_parts(self) -> (Vec<ArcMut<MessageExt>>, MessageQueue, Arc<ProcessQueue>) {
6469
(self.messages, self.message_queue, self.process_queue)
6570
}
6671
}

0 commit comments

Comments
Ā (0)