Skip to content

[ISSUE #6594]🚀Add lite pull consumer with configuration validation and graceful shutdown#6595

Merged
rocketmq-rust-bot merged 1 commit intomainfrom
feat-6594
Mar 1, 2026
Merged

[ISSUE #6594]🚀Add lite pull consumer with configuration validation and graceful shutdown#6595
rocketmq-rust-bot merged 1 commit intomainfrom
feat-6594

Conversation

@mxsm
Copy link
Owner

@mxsm mxsm commented Mar 1, 2026

Which Issue(s) This PR Fixes(Closes)

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Added explicit start() and shutdown() lifecycle methods to properly manage DefaultLitePullConsumer initialization and cleanup.
    • Enhanced configuration validation to verify consumer parameters at startup.
  • Improvements

    • Refined offset persistence and rebalancing message logging for better operational clarity.

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Mar 1, 2026
@rocketmq-rust-bot
Copy link
Collaborator

🔊@mxsm 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 1, 2026

Walkthrough

Added lifecycle and configuration validation methods (start, shutdown, check_config) to DefaultLitePullConsumerImpl, implementing guarded startup/shutdown state machines with offset store initialization based on message model. Updated RebalanceLitePullImpl to use shared ownership semantics for offset store via ArcMut wrapper.

Changes

Cohort / File(s) Summary
Consumer Lifecycle Management
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs
Added start() and shutdown() async methods with state machine guards, configuration validation via check_config(), and offset store initialization (LocalFileOffsetStore for Broadcasting, RemoteBrokerOffsetStore for Clustering). Imported timing and logging utilities (Ordering, Duration, current_millis, info, warn). Updated MQConsumerInner trait implementation methods with explicit unimplemented placeholders.
Rebalance Offset Store Ownership
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_lite_pull_impl.rs
Changed offset_store field from Option<OffsetStore> to Option<ArcMut<OffsetStore>> and updated set_offset_store() signature accordingly for shared ownership. Refined log messages in cleanup methods to use "Rebalance cleanup for..." terminology instead of "Fix Offset...".

Sequence Diagram(s)

sequenceDiagram
    actor User
    participant Consumer as DefaultLitePullConsumerImpl
    participant Config as Configuration
    participant MQClient as MQ Client
    participant Rebalance as Rebalance Manager
    participant OffsetStore as Offset Store
    
    User->>Consumer: start()
    Consumer->>Config: check_config()
    Config-->>Consumer: validation passed
    Consumer->>MQClient: initialize
    MQClient-->>Consumer: initialized
    alt MessageModel == Broadcasting
        Consumer->>OffsetStore: create LocalFileOffsetStore
    else MessageModel == Clustering
        Consumer->>OffsetStore: create RemoteBrokerOffsetStore
    end
    Consumer->>OffsetStore: load offsets
    Consumer->>Rebalance: do_rebalance
    Rebalance-->>Consumer: rebalanced
    Consumer->>Consumer: state = Running
    Consumer-->>User: started successfully
    
    User->>Consumer: shutdown()
    Consumer->>Consumer: signal shutdown
    Consumer->>Consumer: await pull tasks
    Consumer->>OffsetStore: persist offsets
    OffsetStore-->>Consumer: persisted
    Consumer->>MQClient: unregister and shutdown
    MQClient-->>Consumer: shutdown complete
    Consumer->>Consumer: state = ShutdownAlready
    Consumer-->>User: shutdown complete
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested labels

feature🚀, ready to review

Suggested reviewers

  • TeslaRustor
  • SpaceXCN
  • rocketmq-rust-bot

Poem

🐰 A startup spring and graceful fall,
Configuration checks before it all,
Offset stores dance with shared embrace,
State machines run their steady pace,
From birth to rest, the consumer's tale,
Through rebalance winds and offset trails!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 72.73% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main changes: adding a lite pull consumer with configuration validation and graceful shutdown mechanisms.
Linked Issues check ✅ Passed The PR implements all core requirements from issue #6594: lite pull consumer with configuration validation (check_config method) and graceful shutdown (shutdown method).
Out of Scope Changes check ✅ Passed All changes are within scope: consumer implementation, offset store handling, and rebalance adjustments directly support the lite pull consumer feature.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat-6594

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs (1)

515-535: ⚠️ Potential issue | 🟠 Major

Multiple unimplemented!() macros pose runtime panic risks.

These trait methods contain unimplemented!() which will panic if invoked:

Method Risk
do_rebalance (line 518) Panics when SubscriptionType::Subscribe is used
persist_consumer_offset (line 529) Called during shutdown - will panic
update_topic_subscribe_info (line 534) May be invoked by MQClientInstance during topic updates

Consider either:

  1. Implementing these methods before merging
  2. Replacing unimplemented!() with todo!() macros for clearer intent
  3. Adding early runtime checks to prevent users from triggering these code paths

Would you like me to help implement these methods or create tracking issues for them?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 515 - 535, The methods do_rebalance, persist_consumer_offset, and
update_topic_subscribe_info currently use unimplemented!() which will panic at
runtime; replace those panics with safe, non-panicking implementations or
guarded early-returns: in do_rebalance() (and thus try_rebalance()) check the
subscription type (SubscriptionType::Subscribe) and delegate to the actual
rebalance implementation or perform a no-op with a debug/warn log instead of
panicking; in persist_consumer_offset() make it a safe no-op or persist offsets
via the existing commit logic while guarding with any shutdown flag if present
so it won’t panic during shutdown; in update_topic_subscribe_info(topic, info)
apply the topic updates via the rebalance/topic management path or log and
return if MQClientInstance is not ready—do not use unimplemented!(), use todo!()
only if you intend an immediate visible placeholder, but prefer safe
no-op/delegation to avoid runtime panics.
🧹 Nitpick comments (1)
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs (1)

448-455: Cumulative task timeout could cause extended shutdown delays.

Each pull task gets an individual 5-second timeout. With many message queues assigned, shutdown time could become excessive (e.g., 100 queues × 5s = 500s worst case).

Consider using tokio::time::timeout around the entire drain loop with a global shutdown deadline, or aborting tasks that exceed the timeout:

♻️ Suggested improvement with global timeout and task abort
                 // Wait for all pull tasks to complete (5s timeout)
+                let shutdown_deadline = tokio::time::Instant::now() + Duration::from_secs(30);
                 let mut handles = self.task_handles.write().await;
                 for (mq, handle) in handles.drain() {
-                    if let Err(e) = tokio::time::timeout(Duration::from_secs(5), handle).await {
-                        warn!("Pull task for {:?} did not finish in time: {}", mq, e);
+                    let remaining = shutdown_deadline.saturating_duration_since(tokio::time::Instant::now());
+                    if remaining.is_zero() {
+                        handle.abort();
+                        warn!("Aborting pull task for {:?} - shutdown deadline exceeded", mq);
+                    } else if let Err(e) = tokio::time::timeout(remaining, &handle).await {
+                        handle.abort();
+                        warn!("Pull task for {:?} did not finish in time: {}", mq, e);
                     }
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 448 - 455, The shutdown loop currently applies tokio::time::timeout
per task via self.task_handles.write().await and handles.drain(), which can
cumulatively delay shutdown; wrap the entire drain/await sequence in a single
global timeout (e.g., call tokio::time::timeout(Duration::from_secs(N), async {
for (mq, handle) in handles.drain() { let _ = handle.await; } }) around the
drain) or, alternatively, after a short global wait iterate remaining handles
and call handle.abort() to force-stop tasks (use the task_handles write lock and
the same handle identifiers) so that task_handles, handle.await/join, and mq are
all handled under one global deadline rather than N separate 5s timeouts.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Line 457: The shutdown path calls self.persist_consumer_offset().await but
MQConsumerInner's persist_consumer_offset is currently unimplemented
(unimplemented!("persist_consumer_offset")), which will panic at runtime; either
implement persist_consumer_offset to flush offsets from the consumer's
offset_store (use the existing offset_store field/API to collect and persist
offsets, update any remote broker/store and handle errors) or change the
shutdown call site to conditionally skip or safely await a no-op when
persistence is not available (e.g., replace the direct call with a safe wrapper
like try_persist_consumer_offset or check a flag/Option before awaiting) so
shutdown won't call the unimplemented method; focus changes on the
persist_consumer_offset implementation in the MQConsumerInner impl and the
shutdown caller where self.persist_consumer_offset().await is invoked.
- Around line 410-414: The consumer isn't registered before starting the client
instance and uses redundant unwraps; update the startup sequence in
default_lite_pull_consumer_impl to call client_instance.register_consumer(group,
MQConsumerInnerImpl { ... }).await (mirroring default_mq_push_consumer_impl)
before calling client_instance.start(...). Also simplify the unwrap pattern by
taking a single mutable reference or cloning the client_instance once (reuse the
existing cloned variable) and pass that to start rather than calling
as_mut().cloned().unwrap() and as_mut().unwrap() twice.

---

Outside diff comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 515-535: The methods do_rebalance, persist_consumer_offset, and
update_topic_subscribe_info currently use unimplemented!() which will panic at
runtime; replace those panics with safe, non-panicking implementations or
guarded early-returns: in do_rebalance() (and thus try_rebalance()) check the
subscription type (SubscriptionType::Subscribe) and delegate to the actual
rebalance implementation or perform a no-op with a debug/warn log instead of
panicking; in persist_consumer_offset() make it a safe no-op or persist offsets
via the existing commit logic while guarding with any shutdown flag if present
so it won’t panic during shutdown; in update_topic_subscribe_info(topic, info)
apply the topic updates via the rebalance/topic management path or log and
return if MQClientInstance is not ready—do not use unimplemented!(), use todo!()
only if you intend an immediate visible placeholder, but prefer safe
no-op/delegation to avoid runtime panics.

---

Nitpick comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 448-455: The shutdown loop currently applies tokio::time::timeout
per task via self.task_handles.write().await and handles.drain(), which can
cumulatively delay shutdown; wrap the entire drain/await sequence in a single
global timeout (e.g., call tokio::time::timeout(Duration::from_secs(N), async {
for (mq, handle) in handles.drain() { let _ = handle.await; } }) around the
drain) or, alternatively, after a short global wait iterate remaining handles
and call handle.abort() to force-stop tasks (use the task_handles write lock and
the same handle identifiers) so that task_handles, handle.await/join, and mq are
all handled under one global deadline rather than N separate 5s timeouts.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1453509 and 1ba4aa2.

📒 Files selected for processing (2)
  • rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs
  • rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_lite_pull_impl.rs

Comment on lines +410 to +414

// Consumer registration integrated through MQClientInstance rebalance mechanism

let cloned = self.client_instance.as_mut().cloned().unwrap();
self.client_instance.as_mut().unwrap().start(cloned).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for register_consumer patterns
echo "=== Searching for register_consumer patterns ==="
rg -i "register.*consumer" --type rust -n -B2 -A2

echo ""
echo "=== Searching for how client_instance is used ==="
rg "client_instance.*start" --type rust -n -B2 -A2

echo ""
echo "=== Checking MQClientInstance structure ==="
fd ".*client_instance.*" --type rust | head -20

Repository: mxsm/rocketmq-rust

Length of output: 20664


🏁 Script executed:

#!/bin/bash
# Look at the full context of the start method
echo "=== Examining default_lite_pull_consumer_impl.rs around lines 410-414 ==="
wc -l rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs

Repository: mxsm/rocketmq-rust

Length of output: 217


🏁 Script executed:

#!/bin/bash
# Get more context from the start method
echo "=== Reading the start method context ==="
sed -n '350,435p' rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs | cat -n

Repository: mxsm/rocketmq-rust

Length of output: 4664


🏁 Script executed:

#!/bin/bash
# Search for rebalance_impl registration or consumer registration
echo "=== Searching for rebalance setup or consumer group registration ==="
rg "rebalance_impl.*register\|register.*consumer\|consumer_group" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -n -B1 -A1

Repository: mxsm/rocketmq-rust

Length of output: 130


🏁 Script executed:

#!/bin/bash
# Check MQClientInstance to see what start method expects
echo "=== Checking MQClientInstance definition ==="
fd "client_instance.rs\|client_impl.rs" --type rust | grep -v test

echo ""
echo "=== Looking for MQClientInstance start method signature ==="
rg "fn start\(" --type rust -B2 -A8 | grep -A8 "MQClientInstance"

Repository: mxsm/rocketmq-rust

Length of output: 2321


🏁 Script executed:

#!/bin/bash
# Check if rebalance automatically registers consumer
echo "=== Checking RebalanceImpl for consumer registration ==="
rg "register_consumer" rocketmq-client/src/consumer/consumer_impl/re_balance/ -n -B2 -A2

echo ""
echo "=== Checking MQClientInstance start method ==="
sed -n '1,150p' rocketmq-client/src/factory/mq_client_instance.rs | grep -n "pub async fn start" -A20

Repository: mxsm/rocketmq-rust

Length of output: 166


🏁 Script executed:

#!/bin/bash
# Compare what happens in default_mq_push_consumer_impl around registration
echo "=== Push consumer registration pattern ==="
sed -n '300,330p' rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs | cat -n

Repository: mxsm/rocketmq-rust

Length of output: 1762


Consumer registration is missing before starting the client instance.

The lite pull consumer should call register_consumer() before start(), consistent with the push consumer implementation. Without explicit registration, the consumer won't be properly tracked by the broker for rebalancing and coordination.

The pattern should match default_mq_push_consumer_impl which calls:

client_instance.register_consumer(group, MQConsumerInnerImpl { ... }).await;
client_instance.start(cloned).await?;

Additionally, the double unwrap pattern on lines 413-414 can be simplified to reduce redundant calls.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 410 - 414, The consumer isn't registered before starting the client
instance and uses redundant unwraps; update the startup sequence in
default_lite_pull_consumer_impl to call client_instance.register_consumer(group,
MQConsumerInnerImpl { ... }).await (mirroring default_mq_push_consumer_impl)
before calling client_instance.start(...). Also simplify the unwrap pattern by
taking a single mutable reference or cloning the client_instance once (reuse the
existing cloned variable) and pass that to start rather than calling
as_mut().cloned().unwrap() and as_mut().unwrap() twice.

}
drop(handles);

self.persist_consumer_offset().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Shutdown will panic due to calling unimplemented persist_consumer_offset.

Line 457 calls self.persist_consumer_offset().await, but the MQConsumerInner trait implementation at line 529 contains unimplemented!("persist_consumer_offset"). This will cause a runtime panic during shutdown, preventing graceful termination and potentially losing uncommitted offsets.

🐛 Proposed fix: implement offset persistence or guard the call

Option 1: Implement the method using the stored offset_store:

     async fn persist_consumer_offset(&self) {
-        // Offset persistence handled by commit operations
-        unimplemented!("persist_consumer_offset")
+        if let Some(offset_store) = &self.offset_store {
+            let mqs: Vec<MessageQueue> = self.assigned_message_queue.get_assigned_message_queues();
+            offset_store.persist_all(&mqs).await;
+        }
     }

Option 2: Guard the shutdown call until implementation is ready:

-                self.persist_consumer_offset().await;
+                if let Some(offset_store) = &self.offset_store {
+                    // TODO: Implement proper offset persistence
+                    let _ = offset_store;
+                }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.persist_consumer_offset().await;
if let Some(offset_store) = &self.offset_store {
// TODO: Implement proper offset persistence
let _ = offset_store;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
at line 457, The shutdown path calls self.persist_consumer_offset().await but
MQConsumerInner's persist_consumer_offset is currently unimplemented
(unimplemented!("persist_consumer_offset")), which will panic at runtime; either
implement persist_consumer_offset to flush offsets from the consumer's
offset_store (use the existing offset_store field/API to collect and persist
offsets, update any remote broker/store and handle errors) or change the
shutdown call site to conditionally skip or safely await a no-op when
persistence is not available (e.g., replace the direct call with a safe wrapper
like try_persist_consumer_offset or check a flag/Option before awaiting) so
shutdown won't call the unimplemented method; focus changes on the
persist_consumer_offset implementation in the MQConsumerInner impl and the
shutdown caller where self.persist_consumer_offset().await is invoked.

@codecov
Copy link

codecov bot commented Mar 1, 2026

Codecov Report

❌ Patch coverage is 0% with 107 lines in your changes missing coverage. Please review.
✅ Project coverage is 41.74%. Comparing base (1453509) to head (1ba4aa2).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...r/consumer_impl/default_lite_pull_consumer_impl.rs 0.00% 106 Missing ⚠️
...nsumer_impl/re_balance/rebalance_lite_pull_impl.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #6595      +/-   ##
==========================================
- Coverage   41.77%   41.74%   -0.04%     
==========================================
  Files         959      959              
  Lines      133824   133927     +103     
==========================================
- Hits        55911    55908       -3     
- Misses      77913    78019     +106     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@rocketmq-rust-bot rocketmq-rust-bot merged commit 74ab87d into main Mar 1, 2026
17 of 20 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Mar 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Add lite pull consumer with configuration validation and graceful shutdown

3 participants