[ISSUE #6597]🚀Add total message count and size methods to AssignedMessageQueue; implement async pull task and flow control checks in DefaultLitePullConsumerImpl#6598
Conversation
…sageQueue; implement async pull task and flow control checks in DefaultLitePullConsumerImpl
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThis PR adds message aggregation methods to AssignedMessageQueue and introduces an async pull task launcher with flow control enforcement to DefaultLitePullConsumerImpl, establishing infrastructure for message consumption and throttling management. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20–25 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rocketmq-client/src/consumer/consumer_impl/assigned_message_queue.rs`:
- Around line 185-190: The method total_msg_size_in_mib currently divides each
queue's byte size by (1024*1024) before summing, losing sub‑MiB precision;
change the logic to sum all process_queue.msg_size() values from queue_map
(using the existing read lock and map.values() / process_queue.msg_size()) into
a 64‑bit accumulator, then perform a single division by (1024*1024) and cast to
i64 for the return value so truncation only occurs once after aggregation.
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 575-577: cached_msg_size is computed in MiB by flooring
pq.msg_size() / (1024*1024) and thus can miss up to ~1 MiB; change the
comparison to use byte-level values instead: use pq.msg_size() as an i64 for
cached_msg_size (or simply compare pq.msg_size() as i64) against
(config.pull_threshold_size_for_queue as i64 * 1024 * 1024) so the flow-control
check in default_lite_pull_consumer_impl.rs (symbols: cached_msg_size,
pq.msg_size(), config.pull_threshold_size_for_queue, queue_flow_control_times)
triggers precisely when the queue exceeds the configured byte threshold.
- Around line 541-548: The pull_inner function currently uses unimplemented!()
which panics when a pull task runs; replace that panic with a proper async
implementation or at minimum a non-panicking placeholder that returns a
RocketMQResult<u64> so pull tasks continue running. Locate pull_inner(&self,
_mq: &MessageQueue, _process_queue: &Arc<...::ProcessQueue>) and either
implement the core pull logic (perform pull, compute retry/delay, and return
Ok(delay_ms)) or, as a safe interim fix, return an Ok(<reasonable_delay>) and
propagate errors via RocketMQResult instead of panicking; ensure any error paths
map to Err(...) rather than calling panic macros.
- Around line 501-538: The code inserts a new JoinHandle into self.task_handles
without cancelling any previous handle, which leaves the old background task
running; update the logic in start_pull_task (the code that writes to
self.task_handles and calls .insert(mq, handle)) to first check for an existing
handle for that MessageQueue, call abort() on it, and await/join it (e.g., let _
= old_handle.await; or handle.abort(); let _ = old_handle.await; to reap the
task) before inserting the new handle so the previous pull task is properly
cancelled and cleaned up.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-client/src/consumer/consumer_impl/assigned_message_queue.rsrocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs
| /// Returns the total cached message size in MiB across all queues. | ||
| pub async fn total_msg_size_in_mib(&self) -> i64 { | ||
| let map = self.queue_map.read().await; | ||
| map.values() | ||
| .map(|aq| (aq.process_queue.msg_size() / (1024 * 1024)) as i64) | ||
| .sum() |
There was a problem hiding this comment.
Avoid per-queue MiB truncation before summing total size.
Line [189] divides each queue size by MiB before aggregation, which drops sub‑MiB bytes per queue and underreports total cached size.
💡 Proposed fix
pub async fn total_msg_size_in_mib(&self) -> i64 {
let map = self.queue_map.read().await;
- map.values()
- .map(|aq| (aq.process_queue.msg_size() / (1024 * 1024)) as i64)
- .sum()
+ let total_bytes: i64 = map
+ .values()
+ .map(|aq| aq.process_queue.msg_size() as i64)
+ .sum();
+ total_bytes / (1024 * 1024)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Returns the total cached message size in MiB across all queues. | |
| pub async fn total_msg_size_in_mib(&self) -> i64 { | |
| let map = self.queue_map.read().await; | |
| map.values() | |
| .map(|aq| (aq.process_queue.msg_size() / (1024 * 1024)) as i64) | |
| .sum() | |
| /// Returns the total cached message size in MiB across all queues. | |
| pub async fn total_msg_size_in_mib(&self) -> i64 { | |
| let map = self.queue_map.read().await; | |
| let total_bytes: i64 = map | |
| .values() | |
| .map(|aq| aq.process_queue.msg_size() as i64) | |
| .sum(); | |
| total_bytes / (1024 * 1024) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rocketmq-client/src/consumer/consumer_impl/assigned_message_queue.rs` around
lines 185 - 190, The method total_msg_size_in_mib currently divides each queue's
byte size by (1024*1024) before summing, losing sub‑MiB precision; change the
logic to sum all process_queue.msg_size() values from queue_map (using the
existing read lock and map.values() / process_queue.msg_size()) into a 64‑bit
accumulator, then perform a single division by (1024*1024) and cast to i64 for
the return value so truncation only occurs once after aggregation.
| let handle = tokio::spawn(async move { | ||
| loop { | ||
| if shutdown_signal.load(Ordering::Acquire) { | ||
| break; | ||
| } | ||
|
|
||
| let pq = match assigned_mq.get_process_queue(&mq_clone).await { | ||
| Some(pq) if !pq.is_dropped() => pq, | ||
| _ => break, | ||
| }; | ||
|
|
||
| if assigned_mq.is_paused(&mq_clone).await { | ||
| tokio::time::sleep(Duration::from_millis( | ||
| default_impl.consumer_config.pull_time_delay_millis_when_exception, | ||
| )) | ||
| .await; | ||
| continue; | ||
| } | ||
|
|
||
| match default_impl.pull_inner(&mq_clone, &pq).await { | ||
| Ok(delay) => { | ||
| if delay > 0 { | ||
| tokio::time::sleep(Duration::from_millis(delay)).await; | ||
| } | ||
| } | ||
| Err(e) => { | ||
| tracing::error!("Pull error for {:?}: {}", mq_clone, e); | ||
| tokio::time::sleep(Duration::from_millis( | ||
| default_impl.consumer_config.pull_time_delay_millis_when_exception, | ||
| )) | ||
| .await; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| self.task_handles.write().await.insert(mq, handle); | ||
| Ok(()) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "default_lite_pull_consumer_impl.rs" | head -5Repository: mxsm/rocketmq-rust
Length of output: 141
🏁 Script executed:
wc -l rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 143
🏁 Script executed:
sed -n '501,538p' rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 1488
🌐 Web query:
Tokio 1.x JoinHandle drop behavior without abort await
💡 Result:
In Tokio 1.x, dropping a tokio::task::JoinHandle<T> does not abort/cancel the task.
- Drop = detach: when the
JoinHandleis dropped, the task keeps running in the background; you just lose the ability toawait/“join” it and retrieve its result. [1] - What happens to the return value: once the task eventually finishes, if the
JoinHandleis already gone, Tokio will drop the produced output automatically (there’s no longer any consumer). [2] - How to actually cancel: cancellation is done explicitly via
JoinHandle::abort()(or anAbortHandle). A task is signaled to stop the next time it yields at an.awaitpoint. [3] - If you want “cancel on drop”: wrap the handle in
tokio_util::task::AbortOnDropHandle, which does abort the task when dropped. [4] - Runtime shutdown: when the Tokio runtime shuts down (e.g., returning from
#[tokio::main]), it cancels remaining tasks on that runtime. [3]
Sources: JoinHandle docs [1], Tokio runtime task internals re: dropping output when handle is gone [2], Tokio task cancellation docs [3], AbortOnDropHandle docs [4].
🏁 Script executed:
sed -n '1,100p' rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs | grep -A 5 "task_handles"Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
rg "task_handles" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -A 3 -B 3Repository: mxsm/rocketmq-rust
Length of output: 1346
🏁 Script executed:
sed -n '450,510p' rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 2345
🏁 Script executed:
rg "async fn.*start.*pull" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -A 10Repository: mxsm/rocketmq-rust
Length of output: 557
🏁 Script executed:
rg "start_pull_task" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -B 5 -A 5Repository: mxsm/rocketmq-rust
Length of output: 478
🏁 Script executed:
sed -n '1,50p' rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs | grep -E "struct|pub fn"Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
rg "start_pull_task" rocketmq-client/src/consumer/consumer_impl/ -B 3 -A 3Repository: mxsm/rocketmq-rust
Length of output: 845
🏁 Script executed:
sed -n '300,400p' rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 4591
🏁 Script executed:
rg "start_pull_task" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 139
🏁 Script executed:
rg "assign" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -B 2 -A 10 | head -100Repository: mxsm/rocketmq-rust
Length of output: 4264
🏁 Script executed:
rg "start_pull_task" rocketmq-client/src/ -B 5 -A 5Repository: mxsm/rocketmq-rust
Length of output: 1336
🏁 Script executed:
rg "assigned_message_queue" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs | grep -E "fn|pub|add|assign"Repository: mxsm/rocketmq-rust
Length of output: 462
🏁 Script executed:
rg "on_assign" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -A 30Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
cat rocketmq-client/src/consumer/consumer_impl/assigned_message_queue.rsRepository: mxsm/rocketmq-rust
Length of output: 7110
🏁 Script executed:
rg "start_pull_task" rocketmq-client/src/ -B 10 -A 2Repository: mxsm/rocketmq-rust
Length of output: 1488
🏁 Script executed:
rg "fn.*MQConsumerInner\|notify_consumer_queue_change" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -A 30Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
rg "notify_consumer_queue_change\|on_assign\|reassign_queues" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -A 20Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
rg "start_pull_task" rocketmq-client/src/consumer/ -B 3 -A 1Repository: mxsm/rocketmq-rust
Length of output: 623
🏁 Script executed:
rg "start_pull_task" rocketmq-client/src/consumer/consumer_impl/re_balance/ -B 5 -A 5Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
sed -n '550,659p' rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 4098
🏁 Script executed:
rg "impl.*RebalanceLitePullImpl\|update_assigned_message_queue" rocketmq-client/src/consumer/ -A 20Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
rg "trait RebalanceLitePullImpl" rocketmq-client/src/consumer/ -A 5Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
rg "MQConsumerInner\|RebalanceLite" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs | head -20Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
sed -n '1,30p' rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 1314
🏁 Script executed:
rg "task_handles.*insert\|start_pull_task" rocketmq-client/src/ -B 3 -A 3Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
rg "start_pull_task\|task_handles" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
rg "assign\|queue_change" rocketmq-client/src/consumer/consumer_impl/re_balance/ -lRepository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
grep -r "start_pull_task" rocketmq-client/src/consumer/ 2>/dev/null | head -20Repository: mxsm/rocketmq-rust
Length of output: 217
🏁 Script executed:
grep -r "task_handles.*insert" rocketmq-client/src/consumer/ 2>/dev/nullRepository: mxsm/rocketmq-rust
Length of output: 199
🏁 Script executed:
rg "struct DefaultLitePullConsumerImpl" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -A 50 | head -70Repository: mxsm/rocketmq-rust
Length of output: 2009
🏁 Script executed:
sed -n '200,300p' rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_lite_pull_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 4568
🏁 Script executed:
find rocketmq-client/src/consumer/consumer_impl/re_balance -name "*.rs" -exec grep -l "start_pull_task\|assign" {} \;Repository: mxsm/rocketmq-rust
Length of output: 215
🏁 Script executed:
cat rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_lite_pull_impl.rs | head -200Repository: mxsm/rocketmq-rust
Length of output: 9158
🏁 Script executed:
grep -n "start_pull_task" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsRepository: mxsm/rocketmq-rust
Length of output: 143
🏁 Script executed:
rg "update_assigned_message_queue\|put_process_queue" rocketmq-client/src/consumer/consumer_impl/ -A 10Repository: mxsm/rocketmq-rust
Length of output: 44
Abort the previous pull task handle before overwriting.
Line 537 inserts a new task handle without aborting the previous one. In Tokio 1.x, dropping a JoinHandle does not cancel the task—it continues running in the background. If start_pull_task is called again for the same MessageQueue, both the old and new pull tasks will run concurrently, creating a race condition on message offsets.
Recommended fix
- self.task_handles.write().await.insert(mq, handle);
+ let mut handles = self.task_handles.write().await;
+ if let Some(old_handle) = handles.insert(mq, handle) {
+ old_handle.abort();
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 501 - 538, The code inserts a new JoinHandle into self.task_handles
without cancelling any previous handle, which leaves the old background task
running; update the logic in start_pull_task (the code that writes to
self.task_handles and calls .insert(mq, handle)) to first check for an existing
handle for that MessageQueue, call abort() on it, and await/join it (e.g., let _
= old_handle.await; or handle.abort(); let _ = old_handle.await; to reap the
task) before inserting the new handle so the previous pull task is properly
cancelled and cleaned up.
| /// Core pull logic (returns delay in milliseconds). | ||
| async fn pull_inner( | ||
| &self, | ||
| _mq: &MessageQueue, | ||
| _process_queue: &Arc<crate::consumer::consumer_impl::process_queue::ProcessQueue>, | ||
| ) -> RocketMQResult<u64> { | ||
| unimplemented!("pull_inner") | ||
| } |
There was a problem hiding this comment.
pull_inner panics at runtime and blocks pull-task execution.
Line [547] uses unimplemented!, so any started pull task will panic on first execution of pull logic.
💡 Proposed fix
async fn pull_inner(
&self,
_mq: &MessageQueue,
_process_queue: &Arc<crate::consumer::consumer_impl::process_queue::ProcessQueue>,
) -> RocketMQResult<u64> {
- unimplemented!("pull_inner")
+ if let Some(delay) = self.check_flow_control(_mq, _process_queue).await? {
+ return Ok(delay);
+ }
+ Err(crate::mq_client_err!("pull_inner is not implemented yet"))
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 541 - 548, The pull_inner function currently uses unimplemented!()
which panics when a pull task runs; replace that panic with a proper async
implementation or at minimum a non-panicking placeholder that returns a
RocketMQResult<u64> so pull tasks continue running. Locate pull_inner(&self,
_mq: &MessageQueue, _process_queue: &Arc<...::ProcessQueue>) and either
implement the core pull logic (perform pull, compute retry/delay, and return
Ok(delay_ms)) or, as a safe interim fix, return an Ok(<reasonable_delay>) and
propagate errors via RocketMQResult instead of panicking; ensure any error paths
map to Err(...) rather than calling panic macros.
| let cached_msg_size = (pq.msg_size() / (1024 * 1024)) as i64; | ||
| if cached_msg_size > config.pull_threshold_size_for_queue as i64 { | ||
| self.queue_flow_control_times.fetch_add(1, Ordering::Relaxed); |
There was a problem hiding this comment.
Use byte-level comparison for queue-size flow control.
Line [575] floors bytes to MiB before comparing at Line [576]. A queue can exceed the configured size threshold by up to almost 1 MiB without being throttled.
💡 Proposed fix
- let cached_msg_size = (pq.msg_size() / (1024 * 1024)) as i64;
- if cached_msg_size > config.pull_threshold_size_for_queue as i64 {
+ let cached_msg_size_bytes = pq.msg_size() as i64;
+ let threshold_size_bytes = (config.pull_threshold_size_for_queue as i64) * 1024 * 1024;
+ if cached_msg_size_bytes > threshold_size_bytes {
self.queue_flow_control_times.fetch_add(1, Ordering::Relaxed);
return Ok(Some(config.pull_time_delay_millis_when_cache_flow_control));
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let cached_msg_size = (pq.msg_size() / (1024 * 1024)) as i64; | |
| if cached_msg_size > config.pull_threshold_size_for_queue as i64 { | |
| self.queue_flow_control_times.fetch_add(1, Ordering::Relaxed); | |
| let cached_msg_size_bytes = pq.msg_size() as i64; | |
| let threshold_size_bytes = (config.pull_threshold_size_for_queue as i64) * 1024 * 1024; | |
| if cached_msg_size_bytes > threshold_size_bytes { | |
| self.queue_flow_control_times.fetch_add(1, Ordering::Relaxed); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 575 - 577, cached_msg_size is computed in MiB by flooring
pq.msg_size() / (1024*1024) and thus can miss up to ~1 MiB; change the
comparison to use byte-level values instead: use pq.msg_size() as an i64 for
cached_msg_size (or simply compare pq.msg_size() as i64) against
(config.pull_threshold_size_for_queue as i64 * 1024 * 1024) so the flow-control
check in default_lite_pull_consumer_impl.rs (symbols: cached_msg_size,
pq.msg_size(), config.pull_threshold_size_for_queue, queue_flow_control_times)
triggers precisely when the queue exceeds the configured byte threshold.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6598 +/- ##
==========================================
- Coverage 41.74% 41.71% -0.04%
==========================================
Files 959 959
Lines 133927 134016 +89
==========================================
- Hits 55910 55905 -5
- Misses 78017 78111 +94 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes