[ISSUE #6450][Enhancement✨] Remove useless code from pull_message_processor.rs#6524
[ISSUE #6450][Enhancement✨] Remove useless code from pull_message_processor.rs#6524
Conversation
|
🔊@oopscompiled 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughSimplified end-offset handling in broker pull logic and minor refactors: iterator-pattern changes in utilities and map iteration, guarded match patterns for specific response codes, and extraction refactor in admin CLI. No public API signature changes. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR removes a commented-out code fragment from pull_message_processor.rs that was checking if request_header.max_msg_nums.is_some() in a conditional statement. The commented-out condition is unnecessary because max_msg_nums is defined as a non-optional i32 field (marked with #[required]) in the PullMessageRequestHeader struct, meaning it cannot be None and the check would never compile.
Changes:
- Removed commented-out condition
/* && request_header.max_msg_nums.is_some() */from the if statement on line 230
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rocketmq-broker/src/processor/pull_message_processor.rs (1)
230-234:⚠️ Potential issue | 🟠 MajorPrevent
i64 -> i32truncation inmax_msg_numsclampLine 232 uses
as i32on an offset span without bounds checking. Bothend_offsetandstart_offsetarei64, so if the span exceedsi32::MAX(~2.1 billion), the cast silently truncates and produces an invalidmax_msg_nums(potentially negative), breaking pull behavior on large queues.🛠️ Suggested fix
if mapping_item.check_if_end_offset_decided() { - request_header.max_msg_nums = std::cmp::min( - (mapping_item.end_offset - mapping_item.start_offset) as i32, - request_header.max_msg_nums, - ); + let span = mapping_item + .end_offset + .saturating_sub(mapping_item.start_offset); + let span_i32 = i32::try_from(span).unwrap_or(i32::MAX); + request_header.max_msg_nums = span_i32.min(request_header.max_msg_nums); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-broker/src/processor/pull_message_processor.rs` around lines 230 - 234, The span calculation for max_msg_nums casts (mapping_item.end_offset - mapping_item.start_offset) as i32 which can overflow/truncate; instead compute the span in i64 using mapping_item.end_offset.saturating_sub(mapping_item.start_offset), clamp it to i32::MAX (and to zero) and then convert to i32 safely before taking the min with request_header.max_msg_nums; update the block around mapping_item.check_if_end_offset_decided() to use the saturating_sub result, cap it to i32::MAX, convert to i32, and then set request_header.max_msg_nums = std::cmp::min(clamped_span_i32, request_header.max_msg_nums) so no silent truncation or negative values occur (refer to mapping_item.end_offset, mapping_item.start_offset, and request_header.max_msg_nums).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@rocketmq-broker/src/processor/pull_message_processor.rs`:
- Around line 230-234: The span calculation for max_msg_nums casts
(mapping_item.end_offset - mapping_item.start_offset) as i32 which can
overflow/truncate; instead compute the span in i64 using
mapping_item.end_offset.saturating_sub(mapping_item.start_offset), clamp it to
i32::MAX (and to zero) and then convert to i32 safely before taking the min with
request_header.max_msg_nums; update the block around
mapping_item.check_if_end_offset_decided() to use the saturating_sub result, cap
it to i32::MAX, convert to i32, and then set request_header.max_msg_nums =
std::cmp::min(clamped_span_i32, request_header.max_msg_nums) so no silent
truncation or negative values occur (refer to mapping_item.end_offset,
mapping_item.start_offset, and request_header.max_msg_nums).
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rocketmq-common/src/utils/util_all.rs (1)
214-220: Consider applying the same refactor towrite_shortfor consistency.The
write_shortfunction still uses the old pattern with mutablecurrent_pos. For consistency withwrite_int, consider refactoring it similarly.♻️ Suggested refactor for consistency
pub fn write_short(buffer: &mut [char], pos: usize, value: i16) { - let mut current_pos = pos; - for move_bits in (0..=12).rev().step_by(4) { - buffer[current_pos] = HEX_ARRAY[((value >> move_bits) & 0xF) as usize]; - current_pos += 1; + for (current_pos, move_bits) in (pos..).zip((0..=12).rev().step_by(4)) { + buffer[current_pos] = HEX_ARRAY[((value >> move_bits) & 0xF) as usize]; } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-common/src/utils/util_all.rs` around lines 214 - 220, The write_short function uses a mutable current_pos while write_int was refactored for immutability; update write_short (function write_short) to mirror write_int by removing current_pos and writing into buffer at pos + i (or equivalent) using an iterator/enumerate over the 4 nybbles (move_bits 12,8,4,0), computing HEX_ARRAY[((value >> move_bits) & 0xF) as usize] for each index so the function is consistent and avoids the mutable position variable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@rocketmq-common/src/utils/util_all.rs`:
- Around line 214-220: The write_short function uses a mutable current_pos while
write_int was refactored for immutability; update write_short (function
write_short) to mirror write_int by removing current_pos and writing into buffer
at pos + i (or equivalent) using an iterator/enumerate over the 4 nybbles
(move_bits 12,8,4,0), computing HEX_ARRAY[((value >> move_bits) & 0xF) as usize]
for each index so the function is consistent and avoids the mutable position
variable.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6524 +/- ##
==========================================
- Coverage 42.16% 42.15% -0.01%
==========================================
Files 946 946
Lines 132145 132140 -5
==========================================
- Hits 55714 55708 -6
- Misses 76431 76432 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Bug Fixes
Refactor