Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rocketmq-store/src/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
pub(crate) mod cold_data_check_service;
pub mod commit_log;
pub mod flush_manager_impl;
pub mod group_commit_request;
pub mod mapped_file;

pub const MAX_PULL_MSG_SIZE: i32 = 128 * 1024 * 1024;
196 changes: 196 additions & 0 deletions rocketmq-store/src/log_file/group_commit_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::time::Duration;
use std::time::Instant;

use tokio::sync::oneshot;
use tracing::warn;

use crate::base::message_status_enum::PutMessageStatus;

pub struct GroupCommitRequest {
next_offset: i64,
flush_ok_sender: Option<oneshot::Sender<PutMessageStatus>>,
flush_ok_receiver: Option<oneshot::Receiver<PutMessageStatus>>,
ack_nums: i32,
deadline: Instant,
}

impl GroupCommitRequest {
/// Create a new GroupCommitRequest with timeout in milliseconds
pub fn new(next_offset: i64, timeout_millis: u64) -> Self {
let (sender, receiver) = oneshot::channel();
Self {
next_offset,
flush_ok_sender: Some(sender),
flush_ok_receiver: Some(receiver),
ack_nums: 1,
deadline: Instant::now() + Duration::from_millis(timeout_millis),
}
}

/// Create a new GroupCommitRequest with timeout and ack numbers
pub fn with_ack_nums(next_offset: i64, timeout_millis: u64, ack_nums: i32) -> Self {
Comment on lines +37 to +48
Copy link

Copilot AI Jun 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] There's duplicated logic between new and with_ack_nums for creating channels and setting the deadline. Consider refactoring by extracting a private constructor that accepts ack_nums, reducing code duplication.

Suggested change
let (sender, receiver) = oneshot::channel();
Self {
next_offset,
flush_ok_sender: Some(sender),
flush_ok_receiver: Some(receiver),
ack_nums: 1,
deadline: Instant::now() + Duration::from_millis(timeout_millis),
}
}
/// Create a new GroupCommitRequest with timeout and ack numbers
pub fn with_ack_nums(next_offset: i64, timeout_millis: u64, ack_nums: i32) -> Self {
Self::create_request(next_offset, timeout_millis, 1)
}
/// Create a new GroupCommitRequest with timeout and ack numbers
pub fn with_ack_nums(next_offset: i64, timeout_millis: u64, ack_nums: i32) -> Self {
Self::create_request(next_offset, timeout_millis, ack_nums)
}
/// Private constructor to encapsulate common logic
fn create_request(next_offset: i64, timeout_millis: u64, ack_nums: i32) -> Self {

Copilot uses AI. Check for mistakes.
let (sender, receiver) = oneshot::channel();
Self {
next_offset,
flush_ok_sender: Some(sender),
flush_ok_receiver: Some(receiver),
ack_nums,
deadline: Instant::now() + Duration::from_millis(timeout_millis),
}
}

/// Get the next offset
pub fn get_next_offset(&self) -> i64 {
self.next_offset
}

/// Get the number of acknowledgments needed
pub fn get_ack_nums(&self) -> i32 {
self.ack_nums
}

/// Get the deadline for this request
pub fn get_deadline(&self) -> Instant {
self.deadline
}

Check warning on line 72 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L70-L72

Added lines #L70 - L72 were not covered by tests

/// Check if the request has expired
pub fn is_expired(&self) -> bool {
Instant::now() > self.deadline
}

/// Wake up the customer/caller with the result
pub fn wakeup_customer(&mut self, status: PutMessageStatus) {
if let Some(sender) = self.flush_ok_sender.take() {
if sender.send(status).is_err() {
warn!("Failed to send flush result - receiver may have been dropped");

Check warning on line 83 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L83

Added line #L83 was not covered by tests
}
} else {
warn!("Attempted to wakeup customer but sender was already consumed");

Check warning on line 86 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L86

Added line #L86 was not covered by tests
}
}

/// Get a future that resolves when the flush operation completes
pub async fn wait_for_result(
mut self,
) -> Result<PutMessageStatus, Box<dyn std::error::Error + Send + Sync>> {
if let Some(receiver) = self.flush_ok_receiver.take() {
match receiver.await {
Ok(status) => Ok(status),
Err(_) => Err("Sender was dropped before sending result".into()),

Check warning on line 97 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L94-L97

Added lines #L94 - L97 were not covered by tests
}
} else {
Err("Receiver was already consumed".into())

Check warning on line 100 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L100

Added line #L100 was not covered by tests
}
}

Check warning on line 102 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L102

Added line #L102 was not covered by tests

/// Get a future that resolves when the flush operation completes with timeout
pub async fn wait_for_result_with_timeout(
mut self,
) -> Result<PutMessageStatus, Box<dyn std::error::Error + Send + Sync>> {
if let Some(receiver) = self.flush_ok_receiver.take() {
let timeout_duration = if self.deadline > Instant::now() {
self.deadline - Instant::now()
} else {
Duration::from_millis(0)

Check warning on line 112 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L112

Added line #L112 was not covered by tests
};

match tokio::time::timeout(timeout_duration, receiver).await {
Ok(Ok(status)) => Ok(status),
Ok(Err(_)) => Err("Sender was dropped before sending result".into()),

Check warning on line 117 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L116-L117

Added lines #L116 - L117 were not covered by tests
Err(_) => Ok(PutMessageStatus::FlushDiskTimeout),
}
} else {
Err("Receiver was already consumed".into())

Check warning on line 121 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L121

Added line #L121 was not covered by tests
}
}
Comment on lines +105 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle past deadline edge case and improve documentation.

Two observations:

  1. Edge case: When deadline is in the past, timeout_duration becomes 0, causing immediate timeout. Consider if this is the intended behavior or if it should return an error/expired status immediately.

  2. Documentation: The one-time consumption nature of wait_for_result* methods should be documented since they consume self and take the receiver.

 /// Get a future that resolves when the flush operation completes with timeout
+/// Note: This method consumes self and can only be called once per request instance.
 pub async fn wait_for_result_with_timeout(
     mut self,
 ) -> Result<PutMessageStatus, Box<dyn std::error::Error + Send + Sync>> {
     if let Some(receiver) = self.flush_ok_receiver.take() {
         let timeout_duration = if self.deadline > Instant::now() {
             self.deadline - Instant::now()
         } else {
-            Duration::from_millis(0)
+            return Ok(PutMessageStatus::FlushDiskTimeout);
         };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn wait_for_result_with_timeout(
mut self,
) -> Result<PutMessageStatus, Box<dyn std::error::Error + Send + Sync>> {
if let Some(receiver) = self.flush_ok_receiver.take() {
let timeout_duration = if self.deadline > Instant::now() {
self.deadline - Instant::now()
} else {
Duration::from_millis(0)
};
match tokio::time::timeout(timeout_duration, receiver).await {
Ok(Ok(status)) => Ok(status),
Ok(Err(_)) => Err("Sender was dropped before sending result".into()),
Err(_) => Ok(PutMessageStatus::FlushDiskTimeout),
}
} else {
Err("Receiver was already consumed".into())
}
}
/// Get a future that resolves when the flush operation completes with timeout
/// Note: This method consumes self and can only be called once per request instance.
pub async fn wait_for_result_with_timeout(
mut self,
) -> Result<PutMessageStatus, Box<dyn std::error::Error + Send + Sync>> {
if let Some(receiver) = self.flush_ok_receiver.take() {
let timeout_duration = if self.deadline > Instant::now() {
self.deadline - Instant::now()
} else {
return Ok(PutMessageStatus::FlushDiskTimeout);
};
match tokio::time::timeout(timeout_duration, receiver).await {
Ok(Ok(status)) => Ok(status),
Ok(Err(_)) => Err("Sender was dropped before sending result".into()),
Err(_) => Ok(PutMessageStatus::FlushDiskTimeout),
}
} else {
Err("Receiver was already consumed".into())
}
}
🤖 Prompt for AI Agents
In rocketmq-store/src/log_file/group_commit_request.rs around lines 105 to 123,
handle the edge case where the deadline is in the past by returning an immediate
error or expired status instead of setting a zero timeout that causes immediate
timeout. Also, add documentation comments to the wait_for_result_with_timeout
method explaining that it consumes self and the receiver can only be used once,
clarifying its one-time consumption behavior.


/// Create a clone of this request with a new channel for the result
/// This is useful when you need to share the request data but want separate result channels
pub fn clone_with_new_channel(&self) -> Self {
let (sender, receiver) = oneshot::channel();
Self {
next_offset: self.next_offset,
flush_ok_sender: Some(sender),
flush_ok_receiver: Some(receiver),
ack_nums: self.ack_nums,
deadline: self.deadline,
}
}
}

impl std::fmt::Debug for GroupCommitRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GroupCommitRequest")
.field("next_offset", &self.next_offset)
.field("ack_nums", &self.ack_nums)
.field("deadline", &self.deadline)
.field("has_sender", &self.flush_ok_sender.is_some())
.field("has_receiver", &self.flush_ok_receiver.is_some())
.finish()
}

Check warning on line 148 in rocketmq-store/src/log_file/group_commit_request.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/log_file/group_commit_request.rs#L140-L148

Added lines #L140 - L148 were not covered by tests
}

#[cfg(test)]
mod tests {
use tokio::time::Duration;

use super::*;

#[tokio::test]
async fn test_group_commit_request_creation() {
let request = GroupCommitRequest::new(12345, 5000);

assert_eq!(request.get_next_offset(), 12345);
assert_eq!(request.get_ack_nums(), 1);
assert!(!request.is_expired());
}

#[tokio::test]
async fn test_group_commit_request_with_ack_nums() {
let request = GroupCommitRequest::with_ack_nums(67890, 3000, 3);

assert_eq!(request.get_next_offset(), 67890);
assert_eq!(request.get_ack_nums(), 3);
}

#[tokio::test]
async fn test_wakeup_customer() {
let mut request = GroupCommitRequest::new(12345, 5000);

// Start waiting for result in background
let result_future = request.clone_with_new_channel().wait_for_result();

// Wakeup with success status
request.wakeup_customer(PutMessageStatus::PutOk);
}
Comment on lines +174 to +183
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incomplete test - result future is never awaited.

The test creates result_future but never awaits it, so it doesn't verify that wakeup_customer actually works.

 #[tokio::test]
 async fn test_wakeup_customer() {
     let mut request = GroupCommitRequest::new(12345, 5000);

     // Start waiting for result in background
-    let result_future = request.clone_with_new_channel().wait_for_result();
+    let cloned_request = request.clone_with_new_channel();
+    let result_future = tokio::spawn(async move {
+        cloned_request.wait_for_result().await
+    });

     // Wakeup with success status
     request.wakeup_customer(PutMessageStatus::PutOk);
+    
+    // Verify the result was received
+    let result = result_future.await.unwrap().unwrap();
+    assert!(matches!(result, PutMessageStatus::PutOk));
 }
🤖 Prompt for AI Agents
In rocketmq-store/src/log_file/group_commit_request.rs around lines 174 to 183,
the test_wakeup_customer function creates a future result_future but never
awaits it, so it does not verify the effect of wakeup_customer. Fix this by
awaiting result_future after calling wakeup_customer to ensure the test properly
waits for and checks the result.


#[tokio::test]
async fn test_timeout() {
let request = GroupCommitRequest::new(12345, 100); // 100ms timeout

let start = Instant::now();
let result = request.wait_for_result_with_timeout().await;
let elapsed = start.elapsed();

assert!(elapsed >= Duration::from_millis(90)); // Allow some tolerance
assert!(matches!(result, Ok(PutMessageStatus::FlushDiskTimeout)));
}
}
Loading