Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,13 @@ impl ConsumeMessageOrderlyService {
) -> bool {
let (continue_consume, commit_offset) = if context.is_auto_commit() {
match status {
ConsumeOrderlyStatus::Success | ConsumeOrderlyStatus::Rollback | ConsumeOrderlyStatus::Commit => {
ConsumeOrderlyStatus::Success => (true, consume_request.process_queue.commit().await),
ConsumeOrderlyStatus::Rollback | ConsumeOrderlyStatus::Commit => {
warn!(
"the message queue consume result is illegal, we think you want to ack these messages, so we \
will ack them: {}",
consume_request.message_queue
);
(true, consume_request.process_queue.commit().await)
}
ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => {
Expand All @@ -307,8 +313,8 @@ impl ConsumeMessageOrderlyService {
} else {
match status {
ConsumeOrderlyStatus::Success => (true, -1),
ConsumeOrderlyStatus::Rollback => (true, consume_request.process_queue.commit().await),
ConsumeOrderlyStatus::Commit => {
ConsumeOrderlyStatus::Commit => (true, consume_request.process_queue.commit().await),
ConsumeOrderlyStatus::Rollback => {
consume_request.process_queue.rollback().await;
self.submit_consume_request_later(
consume_request.process_queue.clone(),
Expand Down Expand Up @@ -524,7 +530,7 @@ impl ConsumeRequest {

consume_message_orderly_service
.active_tasks
.fetch_add(1, Ordering::SeqCst);
.fetch_add(1, Ordering::AcqRel);
let active_tasks = consume_message_orderly_service.active_tasks.clone();

struct TaskGuard {
Expand All @@ -533,13 +539,11 @@ impl ConsumeRequest {

impl Drop for TaskGuard {
fn drop(&mut self) {
self.active_tasks.fetch_sub(1, Ordering::SeqCst);
self.active_tasks.fetch_sub(1, Ordering::AcqRel);
}
}

let _guard = TaskGuard {
active_tasks: active_tasks.clone(),
};
let _guard = TaskGuard { active_tasks };

let mut consume_message_orderly_service_inner = consume_message_orderly_service.clone();
let lock = consume_message_orderly_service_inner
Expand All @@ -548,9 +552,13 @@ impl ConsumeRequest {
.await;
let locked = lock.lock().await;
let mut default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.as_mut().unwrap().clone();
if MessageModel::Broadcasting == default_mqpush_consumer_impl.message_model()
let message_model = default_mqpush_consumer_impl.message_model();
if MessageModel::Broadcasting == message_model
|| self.process_queue.is_locked() && !self.process_queue.is_lock_expired()
{
let consume_batch_size = consume_message_orderly_service_inner
.consumer_config
.consume_message_batch_max_size;
let begin_time = Instant::now();
loop {
if self.process_queue.is_dropped() {
Expand All @@ -560,9 +568,7 @@ impl ConsumeRequest {
);
break;
}
if MessageModel::Clustering == default_mqpush_consumer_impl.message_model()
&& !self.process_queue.is_locked()
{
if MessageModel::Clustering == message_model && !self.process_queue.is_locked() {
warn!(
"the message queue not be able to consume, because it's not locked. {}",
self.message_queue
Expand All @@ -578,9 +584,7 @@ impl ConsumeRequest {
break;
}

if MessageModel::Clustering == default_mqpush_consumer_impl.message_model()
&& self.process_queue.is_lock_expired()
{
if MessageModel::Clustering == message_model && self.process_queue.is_lock_expired() {
warn!(
"the message queue lock expired, so consume later {}",
self.message_queue
Expand All @@ -607,9 +611,6 @@ impl ConsumeRequest {
.await;
break;
}
let consume_batch_size = consume_message_orderly_service_inner
.consumer_config
.consume_message_batch_max_size;
let mut msgs = self.process_queue.take_messages(consume_batch_size).await;
default_mqpush_consumer_impl.reset_retry_and_namespace(
&mut msgs,
Expand All @@ -618,7 +619,6 @@ impl ConsumeRequest {
if msgs.is_empty() {
break;
}
let mut context = ConsumeOrderlyContext::new(self.message_queue.clone());
let mut consume_message_context = None;
let mut status = None;
if default_mqpush_consumer_impl.has_hook() {
Expand All @@ -641,20 +641,34 @@ impl ConsumeRequest {
}
let begin_timestamp = Instant::now();
let mut has_exception = false;
let consume_lock = self.process_queue.consume_lock.write().await;
let consume_lock = self.process_queue.consume_lock.clone().read_owned().await;
if self.process_queue.is_dropped() {
warn!(
"consumeMessage, the message queue not be able to consume, because it's dropped. {}",
self.message_queue
);
break;
}
let vec = &msgs.iter().map(|msg| msg.as_ref()).collect::<Vec<&MessageExt>>()[..];

match consume_message_orderly_service_inner
.message_listener
.consume_message(vec, &mut context)
{
let msgs_owned: Vec<MessageExt> = msgs.iter().map(|m| m.as_ref().clone()).collect();
let listener = consume_message_orderly_service_inner.message_listener.clone();
let mq_for_spawn = self.message_queue.clone();
let (consume_result, context) = tokio::task::spawn_blocking(move || {
let mut ctx = ConsumeOrderlyContext::new(mq_for_spawn);
let vec: Vec<&MessageExt> = msgs_owned.iter().collect();
let result = listener.consume_message(&vec, &mut ctx);
drop(consume_lock);
(result, ctx)
})
.await
.unwrap_or_else(|e| {
(
Err(rocketmq_error::RocketMQError::InvalidProperty(format!(
"orderly consume task panicked: {e}"
))),
ConsumeOrderlyContext::new(self.message_queue.clone()),
)
});
match consume_result {
Ok(value) => {
status = Some(value);
}
Expand All @@ -669,7 +683,6 @@ impl ConsumeRequest {
);
}
}
drop(consume_lock);
if status.is_none()
|| *status.as_ref().unwrap() == ConsumeOrderlyStatus::Rollback
|| *status.as_ref().unwrap() == ConsumeOrderlyStatus::SuspendCurrentQueueAMoment
Expand All @@ -695,10 +708,7 @@ impl ConsumeRequest {
ConsumeReturnType::TimeOut
} else if status_value == ConsumeOrderlyStatus::SuspendCurrentQueueAMoment {
ConsumeReturnType::Failed
} else if status_value == ConsumeOrderlyStatus::Success {
ConsumeReturnType::Success
} else {
// Handle other status cases
ConsumeReturnType::Success
}
}
Expand Down
5 changes: 2 additions & 3 deletions rocketmq-client/src/consumer/consumer_impl/process_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use rocketmq_common::MessageAccessor::MessageAccessor;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_remoting::protocol::body::process_queue_info::ProcessQueueInfo;
use rocketmq_rust::ArcMut;
use rocketmq_rust::RocketMQTokioRwLock;
use tokio::sync::RwLock;
use tracing::info;

Expand Down Expand Up @@ -67,7 +66,7 @@ impl ProcessQueueStore {

pub struct ProcessQueue {
store: RwLock<ProcessQueueStore>,
pub(crate) consume_lock: Arc<RocketMQTokioRwLock<()>>,
pub(crate) consume_lock: Arc<RwLock<()>>,

msg_count: AtomicI64,
msg_size: AtomicU64,
Expand All @@ -94,7 +93,7 @@ impl ProcessQueue {
let now = get_current_millis();
ProcessQueue {
store: RwLock::new(ProcessQueueStore::new()),
consume_lock: Arc::new(RocketMQTokioRwLock::new(())),
consume_lock: Arc::new(RwLock::new(())),

msg_count: AtomicI64::new(0),
msg_size: AtomicU64::new(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ impl RebalancePushImpl {

let force_unlock =
pq.is_dropped() && (get_current_millis() > pq.get_last_lock_timestamp() + *UNLOCK_DELAY_TIME_MILLS);
let consume_lock = pq.consume_lock.try_write_timeout(Duration::from_millis(500)).await;
let consume_lock = tokio::time::timeout(Duration::from_millis(500), pq.consume_lock.write())
.await
.ok();
if force_unlock || consume_lock.is_some() {
let Some(offset_store) = default_mqpush_consumer_impl.offset_store.as_mut() else {
error!("Offset store not initialized");
Expand Down
Loading