[ISSUE #3491]🚀Add group_commit_request module to log_file for enhanced commit handling✨#3492
[ISSUE #3491]🚀Add group_commit_request module to log_file for enhanced commit handling✨#3492rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
…d commit handling✨
WalkthroughA new module, Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant GroupCommitRequest
participant StorageEngine
Client->>GroupCommitRequest: Create new request (with offset, timeout, ack_nums)
GroupCommitRequest->>StorageEngine: Submit commit request
StorageEngine-->>GroupCommitRequest: Process commit, reach required acks
GroupCommitRequest->>Client: Notify via channel (wakeup_customer)
Client->>GroupCommitRequest: Await flush result (with/without timeout)
GroupCommitRequest-->>Client: Return PutMessageStatus or timeout error
Assessment against linked issues
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Pull Request Overview
Adds a new group_commit_request module to coordinate flush requests with configurable timeouts and acknowledgement counts.
- Introduced
GroupCommitRequestfor handling batched commit flushes, with timeout and ack number support. - Exposed the new module in
log_file.rs.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| rocketmq-store/src/log_file/group_commit_request.rs | Added GroupCommitRequest implementation and tests |
| rocketmq-store/src/log_file.rs | Exported the group_commit_request module |
Comments suppressed due to low confidence (3)
rocketmq-store/src/log_file/group_commit_request.rs:179
- The test starts a
wait_for_resultfuture but never awaits it or asserts its outcome. To cover thewake_up_customerpath, awaitresult_futureand assert it returnsPutMessageStatus::PutOk.
let result_future = request.clone_with_new_channel().wait_for_result();
rocketmq-store/src/log_file/group_commit_request.rs:127
- There's no test verifying that
clone_with_new_channelpreserves the request data and creates an independent channel. Consider adding a test asserting the clone's fields match the original and that waking up one request doesn't affect the other.
pub fn clone_with_new_channel(&self) -> Self {
rocketmq-store/src/log_file/group_commit_request.rs:189
- The test
test_timeoutusesInstant::now()without importingInstant, causing a compilation error. Consider addinguse std::time::Instant;in the tests module.
let start = Instant::now();
| let (sender, receiver) = oneshot::channel(); | ||
| Self { | ||
| next_offset, | ||
| flush_ok_sender: Some(sender), | ||
| flush_ok_receiver: Some(receiver), | ||
| ack_nums: 1, | ||
| deadline: Instant::now() + Duration::from_millis(timeout_millis), | ||
| } | ||
| } | ||
|
|
||
| /// Create a new GroupCommitRequest with timeout and ack numbers | ||
| pub fn with_ack_nums(next_offset: i64, timeout_millis: u64, ack_nums: i32) -> Self { |
There was a problem hiding this comment.
[nitpick] There's duplicated logic between new and with_ack_nums for creating channels and setting the deadline. Consider refactoring by extracting a private constructor that accepts ack_nums, reducing code duplication.
| let (sender, receiver) = oneshot::channel(); | |
| Self { | |
| next_offset, | |
| flush_ok_sender: Some(sender), | |
| flush_ok_receiver: Some(receiver), | |
| ack_nums: 1, | |
| deadline: Instant::now() + Duration::from_millis(timeout_millis), | |
| } | |
| } | |
| /// Create a new GroupCommitRequest with timeout and ack numbers | |
| pub fn with_ack_nums(next_offset: i64, timeout_millis: u64, ack_nums: i32) -> Self { | |
| Self::create_request(next_offset, timeout_millis, 1) | |
| } | |
| /// Create a new GroupCommitRequest with timeout and ack numbers | |
| pub fn with_ack_nums(next_offset: i64, timeout_millis: u64, ack_nums: i32) -> Self { | |
| Self::create_request(next_offset, timeout_millis, ack_nums) | |
| } | |
| /// Private constructor to encapsulate common logic | |
| fn create_request(next_offset: i64, timeout_millis: u64, ack_nums: i32) -> Self { |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
rocketmq-store/src/log_file/group_commit_request.rs (1)
34-57: Eliminate constructor code duplication.The two constructors contain identical logic except for the
ack_numsparameter. Consider refactoring to improve maintainability:/// Create a new GroupCommitRequest with timeout in milliseconds pub fn new(next_offset: i64, timeout_millis: u64) -> Self { - let (sender, receiver) = oneshot::channel(); - Self { - next_offset, - flush_ok_sender: Some(sender), - flush_ok_receiver: Some(receiver), - ack_nums: 1, - deadline: Instant::now() + Duration::from_millis(timeout_millis), - } + Self::with_ack_nums(next_offset, timeout_millis, 1) }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-store/src/log_file.rs(1 hunks)rocketmq-store/src/log_file/group_commit_request.rs(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: auto-approve
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: test
- GitHub Check: build
🔇 Additional comments (3)
rocketmq-store/src/log_file.rs (1)
21-21: LGTM! Clean module integration.The module export follows standard Rust conventions and integrates the new
group_commit_requestmodule into the existing module hierarchy appropriately.rocketmq-store/src/log_file/group_commit_request.rs (2)
139-149: Excellent Debug implementation.The custom Debug implementation provides meaningful information while appropriately hiding internal channel implementation details. The boolean flags for channel presence are much more useful than attempting to debug the actual channels.
26-137: Solid architectural design for async group commit management.The
GroupCommitRequeststruct demonstrates excellent async patterns:
- Proper use of oneshot channels for async communication
- Clean separation between request creation, waiting, and signaling
- Flexible API supporting both timed and untimed operations
- Clever
clone_with_new_channel()method for multiple waitersThe implementation follows Rust async best practices and provides a robust foundation for group commit functionality.
| #[tokio::test] | ||
| async fn test_wakeup_customer() { | ||
| let mut request = GroupCommitRequest::new(12345, 5000); | ||
|
|
||
| // Start waiting for result in background | ||
| let result_future = request.clone_with_new_channel().wait_for_result(); | ||
|
|
||
| // Wakeup with success status | ||
| request.wakeup_customer(PutMessageStatus::PutOk); | ||
| } |
There was a problem hiding this comment.
Fix incomplete test - result future is never awaited.
The test creates result_future but never awaits it, so it doesn't verify that wakeup_customer actually works.
#[tokio::test]
async fn test_wakeup_customer() {
let mut request = GroupCommitRequest::new(12345, 5000);
// Start waiting for result in background
- let result_future = request.clone_with_new_channel().wait_for_result();
+ let cloned_request = request.clone_with_new_channel();
+ let result_future = tokio::spawn(async move {
+ cloned_request.wait_for_result().await
+ });
// Wakeup with success status
request.wakeup_customer(PutMessageStatus::PutOk);
+
+ // Verify the result was received
+ let result = result_future.await.unwrap().unwrap();
+ assert!(matches!(result, PutMessageStatus::PutOk));
}🤖 Prompt for AI Agents
In rocketmq-store/src/log_file/group_commit_request.rs around lines 174 to 183,
the test_wakeup_customer function creates a future result_future but never
awaits it, so it does not verify the effect of wakeup_customer. Fix this by
awaiting result_future after calling wakeup_customer to ensure the test properly
waits for and checks the result.
| pub async fn wait_for_result_with_timeout( | ||
| mut self, | ||
| ) -> Result<PutMessageStatus, Box<dyn std::error::Error + Send + Sync>> { | ||
| if let Some(receiver) = self.flush_ok_receiver.take() { | ||
| let timeout_duration = if self.deadline > Instant::now() { | ||
| self.deadline - Instant::now() | ||
| } else { | ||
| Duration::from_millis(0) | ||
| }; | ||
|
|
||
| match tokio::time::timeout(timeout_duration, receiver).await { | ||
| Ok(Ok(status)) => Ok(status), | ||
| Ok(Err(_)) => Err("Sender was dropped before sending result".into()), | ||
| Err(_) => Ok(PutMessageStatus::FlushDiskTimeout), | ||
| } | ||
| } else { | ||
| Err("Receiver was already consumed".into()) | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Handle past deadline edge case and improve documentation.
Two observations:
-
Edge case: When
deadlineis in the past,timeout_durationbecomes 0, causing immediate timeout. Consider if this is the intended behavior or if it should return an error/expired status immediately. -
Documentation: The one-time consumption nature of
wait_for_result*methods should be documented since they consumeselfand take the receiver.
/// Get a future that resolves when the flush operation completes with timeout
+/// Note: This method consumes self and can only be called once per request instance.
pub async fn wait_for_result_with_timeout(
mut self,
) -> Result<PutMessageStatus, Box<dyn std::error::Error + Send + Sync>> {
if let Some(receiver) = self.flush_ok_receiver.take() {
let timeout_duration = if self.deadline > Instant::now() {
self.deadline - Instant::now()
} else {
- Duration::from_millis(0)
+ return Ok(PutMessageStatus::FlushDiskTimeout);
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn wait_for_result_with_timeout( | |
| mut self, | |
| ) -> Result<PutMessageStatus, Box<dyn std::error::Error + Send + Sync>> { | |
| if let Some(receiver) = self.flush_ok_receiver.take() { | |
| let timeout_duration = if self.deadline > Instant::now() { | |
| self.deadline - Instant::now() | |
| } else { | |
| Duration::from_millis(0) | |
| }; | |
| match tokio::time::timeout(timeout_duration, receiver).await { | |
| Ok(Ok(status)) => Ok(status), | |
| Ok(Err(_)) => Err("Sender was dropped before sending result".into()), | |
| Err(_) => Ok(PutMessageStatus::FlushDiskTimeout), | |
| } | |
| } else { | |
| Err("Receiver was already consumed".into()) | |
| } | |
| } | |
| /// Get a future that resolves when the flush operation completes with timeout | |
| /// Note: This method consumes self and can only be called once per request instance. | |
| pub async fn wait_for_result_with_timeout( | |
| mut self, | |
| ) -> Result<PutMessageStatus, Box<dyn std::error::Error + Send + Sync>> { | |
| if let Some(receiver) = self.flush_ok_receiver.take() { | |
| let timeout_duration = if self.deadline > Instant::now() { | |
| self.deadline - Instant::now() | |
| } else { | |
| return Ok(PutMessageStatus::FlushDiskTimeout); | |
| }; | |
| match tokio::time::timeout(timeout_duration, receiver).await { | |
| Ok(Ok(status)) => Ok(status), | |
| Ok(Err(_)) => Err("Sender was dropped before sending result".into()), | |
| Err(_) => Ok(PutMessageStatus::FlushDiskTimeout), | |
| } | |
| } else { | |
| Err("Receiver was already consumed".into()) | |
| } | |
| } |
🤖 Prompt for AI Agents
In rocketmq-store/src/log_file/group_commit_request.rs around lines 105 to 123,
handle the edge case where the deadline is in the past by returning an immediate
error or expired status instead of setting a zero timeout that causes immediate
timeout. Also, add documentation comments to the wait_for_result_with_timeout
method explaining that it consumes self and the receiver can only be used once,
clarifying its one-time consumption behavior.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3492 +/- ##
==========================================
+ Coverage 26.34% 26.41% +0.06%
==========================================
Files 547 548 +1
Lines 77877 77981 +104
==========================================
+ Hits 20516 20596 +80
- Misses 57361 57385 +24 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Which Issue(s) This PR Fixes(Closes)
Fixes #3491
Brief Description
How Did You Test This Change?
Summary by CodeRabbit