Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
use rocketmq_common::common::message::message_enum::MessageRequestMode;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::mix_all;
use rocketmq_common::TimeUtils::current_millis;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::protocol::filter::filter_api::FilterAPI;
Expand All @@ -51,6 +52,7 @@ use crate::consumer::default_mq_push_consumer::ConsumerConfig;
use crate::consumer::mq_consumer_inner::MQConsumerInner;
use crate::consumer::store::local_file_offset_store::LocalFileOffsetStore;
use crate::consumer::store::offset_store::OffsetStore;
use crate::consumer::store::read_offset_type::ReadOffsetType;
use crate::consumer::store::remote_broker_offset_store::RemoteBrokerOffsetStore;
use crate::consumer::topic_message_queue_change_listener::TopicMessageQueueChangeListener;
use crate::factory::mq_client_instance::MQClientInstance;
Expand Down Expand Up @@ -914,6 +916,179 @@ impl DefaultLitePullConsumerImpl {
self.assigned_message_queue.set_paused(mq, false).await;
}
}

/// Seeks to the specified offset for the given message queue.
pub async fn seek(&self, message_queue: &MessageQueue, offset: i64) -> RocketMQResult<()> {
self.make_sure_state_ok()?;
self.seek_internal(message_queue, offset, true).await
}

/// Seeks to the beginning of the message queue.
pub async fn seek_to_begin(&self, message_queue: &MessageQueue) -> RocketMQResult<()> {
self.make_sure_state_ok()?;
let begin = self.min_offset(message_queue).await?;
self.seek_internal(message_queue, begin, false).await
}

/// Seeks to the end of the message queue.
pub async fn seek_to_end(&self, message_queue: &MessageQueue) -> RocketMQResult<()> {
self.make_sure_state_ok()?;
let end = self.max_offset(message_queue).await?;
self.seek_internal(message_queue, end, false).await
}

/// Internal seek implementation with optional offset validation.
async fn seek_internal(
&self,
message_queue: &MessageQueue,
offset: i64,
validate_offset: bool,
) -> RocketMQResult<()> {
// Validate offset range if requested (skip for seek_to_begin/end to avoid duplicate queries)
if validate_offset {
let min_offset = self.min_offset(message_queue).await?;
let max_offset = self.max_offset(message_queue).await?;
if offset < min_offset || offset > max_offset {
return Err(crate::mq_client_err!(format!(
"Seek offset illegal, seek offset = {}, min offset = {}, max offset = {}",
offset, min_offset, max_offset
)));
}
}

// Get or create lock for this message queue
let lock = {
let mut locks = self.message_queue_locks.write().await;
locks
.entry(message_queue.clone())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
};

// Execute seek with lock to prevent race conditions
let _guard = lock.lock().await;

// Check if message queue is still assigned (rebalance protection)
let assigned = self.assigned_message_queue.message_queues().await;
if !assigned.contains(message_queue) {
let subscription_type = *self.subscription_type.read().await;
let error_msg = if subscription_type == SubscriptionType::Subscribe {
format!(
"The message queue is not in assigned list, may be rebalancing, message queue: {:?}",
message_queue
)
} else {
format!(
"The message queue is not in assigned list, message queue: {:?}",
message_queue
)
};
return Err(crate::mq_client_err!(error_msg));
}

// Check if ProcessQueue is dropped
if let Some(pq) = self.assigned_message_queue.get_process_queue(message_queue).await {
if pq.is_dropped() {
return Err(crate::mq_client_err!(format!(
"ProcessQueue is dropped for message queue: {:?}",
message_queue
)));
}
}

self.clear_message_queue_in_cache(message_queue).await;

// Stop old pull task
let mut task_handles = self.task_handles.write().await;
if let Some(handle) = task_handles.remove(message_queue) {
handle.abort();
}

// Set seek offset
self.assigned_message_queue.set_seek_offset(message_queue, offset).await;

// Start new pull task
if !task_handles.contains_key(message_queue) {
drop(task_handles); // Release write lock before starting pull task
self.start_pull_task(message_queue.clone()).await?;
}

Ok(())
}

/// Returns the committed offset for the message queue.
pub async fn committed(&self, message_queue: &MessageQueue) -> RocketMQResult<i64> {
self.make_sure_state_ok()?;

if let Some(ref offset_store) = self.offset_store {
let offset = offset_store
.read_offset(message_queue, ReadOffsetType::MemoryFirstThenStore)
.await;
if offset == -2 {
return Err(crate::mq_client_err!("Fetch consume offset from broker exception"));
}
if offset == -1 {
tracing::warn!("No offset found for message queue: {:?}, returning -1", message_queue);
}
return Ok(offset);
}

Err(crate::mq_client_err!("Offset store is not initialized"))
}

async fn max_offset(&self, message_queue: &MessageQueue) -> RocketMQResult<i64> {
self.make_sure_state_ok()?;

let mut client_instance = self
.client_instance
.as_ref()
.ok_or_else(|| crate::mq_client_err!("Client instance not initialized"))?
.clone();

let broker_result = client_instance
.find_broker_address_in_subscribe(message_queue.broker_name(), mix_all::MASTER_ID, true)
.await
.ok_or_else(|| {
crate::mq_client_err!(format!("Broker address not found for: {}", message_queue.broker_name()))
})?;

client_instance
.mq_client_api_impl
.as_mut()
.unwrap()
.get_max_offset(broker_result.broker_addr.as_str(), message_queue, 3000)
.await
}

async fn min_offset(&self, message_queue: &MessageQueue) -> RocketMQResult<i64> {
self.make_sure_state_ok()?;

let mut client_instance = self
.client_instance
.as_ref()
.ok_or_else(|| crate::mq_client_err!("Client instance not initialized"))?
.clone();

let broker_result = client_instance
.find_broker_address_in_subscribe(message_queue.broker_name(), mix_all::MASTER_ID, true)
.await
.ok_or_else(|| {
crate::mq_client_err!(format!("Broker address not found for: {}", message_queue.broker_name()))
})?;

client_instance
.mq_client_api_impl
.as_mut()
.unwrap()
.get_min_offset(broker_result.broker_addr.as_str(), message_queue, 3000)
.await
}

async fn clear_message_queue_in_cache(&self, message_queue: &MessageQueue) {
if let Some(process_queue) = self.assigned_message_queue.get_process_queue(message_queue).await {
process_queue.clear().await;
}
}
}

impl MQConsumerInner for DefaultLitePullConsumerImpl {
Expand Down
46 changes: 46 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ use rocketmq_remoting::protocol::header::get_lite_topic_info_request_header::Get
use rocketmq_remoting::protocol::header::get_max_offset_request_header::GetMaxOffsetRequestHeader;
use rocketmq_remoting::protocol::header::get_max_offset_response_header::GetMaxOffsetResponseHeader;
use rocketmq_remoting::protocol::header::get_meta_data_response_header::GetMetaDataResponseHeader;
use rocketmq_remoting::protocol::header::get_min_offset_request_header::GetMinOffsetRequestHeader;
use rocketmq_remoting::protocol::header::get_min_offset_response_header::GetMinOffsetResponseHeader;
use rocketmq_remoting::protocol::header::get_parent_topic_info_request_header::GetParentTopicInfoRequestHeader;
use rocketmq_remoting::protocol::header::get_user_request_headers::GetUserRequestHeader;
use rocketmq_remoting::protocol::header::heartbeat_request_header::HeartbeatRequestHeader;
Expand Down Expand Up @@ -2133,6 +2135,50 @@ impl MQClientAPIImpl {
))
}

pub async fn get_min_offset(
&mut self,
addr: &str,
message_queue: &MessageQueue,
timeout_millis: u64,
) -> rocketmq_error::RocketMQResult<i64> {
let request_header = GetMinOffsetRequestHeader {
topic: CheetahString::from_slice(message_queue.topic_str()),
queue_id: message_queue.queue_id(),
topic_request_header: Some(TopicRequestHeader {
rpc_request_header: Some(RpcRequestHeader {
broker_name: Some(CheetahString::from_slice(message_queue.broker_name())),
..Default::default()
}),
lo: None,
}),
};

let request = RemotingCommand::create_request_command(RequestCode::GetMinOffset, request_header);

let response = self
.remoting_client
.invoke_request(
Some(&mix_all::broker_vip_channel(
self.client_config.vip_channel_enabled,
addr,
)),
request,
timeout_millis,
)
.await?;
if ResponseCode::from(response.code()) == ResponseCode::Success {
let response_header = response
.decode_command_custom_header::<GetMinOffsetResponseHeader>()
.expect("decode error");
return Ok(response_header.offset);
}
Err(client_broker_err!(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string()),
addr.to_string()
))
}

/// Searches for the queue offset whose store timestamp is closest to `timestamp`.
///
/// When `boundary_type` is [`BoundaryType::Lower`], the returned offset is the earliest one
Expand Down
Loading