Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use cheetah_string::CheetahString;
use dashmap::DashSet;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::MessageAccessor::MessageAccessor;
use rocketmq_remoting::protocol::body::cm_result::CMResult;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
use rocketmq_rust::ArcMut;
Expand Down Expand Up @@ -170,15 +172,19 @@ impl ConsumeMessagePopOrderlyService {
let max_times = self.get_max_reconsume_times();

for msg in msgs {
let msg_mut = msg.mut_from_ref();
if msg_mut.reconsume_times >= max_times {
let reconsume_times = msg.reconsume_times;
if reconsume_times >= max_times {
MessageAccessor::set_reconsume_time(
msg.mut_from_ref(),
CheetahString::from_string(reconsume_times.to_string()),
);
if !self.send_message_back(msg.as_ref()).await {
suspend = true;
msg_mut.reconsume_times += 1;
msg.mut_from_ref().reconsume_times = reconsume_times + 1;
}
} else {
suspend = true;
msg_mut.reconsume_times += 1;
msg.mut_from_ref().reconsume_times = reconsume_times + 1;
}
}

Expand All @@ -196,16 +202,27 @@ impl ConsumeMessagePopOrderlyService {
Vec::new()
};

let mut new_msg = Message::builder()
.topic(&retry_topic)
.body(body.clone())
.build_unchecked();
let mut new_msg = Message::builder().topic(&retry_topic).body(body).build_unchecked();

MessageAccessor::set_properties(&mut new_msg, msg.properties().clone());
let origin_msg_id = MessageAccessor::get_origin_message_id(msg).unwrap_or(msg.msg_id.clone());
MessageAccessor::set_origin_message_id(&mut new_msg, origin_msg_id);
new_msg.set_flag(msg.get_flag());
MessageAccessor::put_property(
&mut new_msg,
CheetahString::from_static_str(MessageConst::PROPERTY_RETRY_TOPIC),
msg.topic().to_owned(),
);
MessageAccessor::set_reconsume_time(
&mut new_msg,
CheetahString::from_string(msg.reconsume_times.to_string()),
);
MessageAccessor::set_max_reconsume_times(
&mut new_msg,
CheetahString::from_string(self.get_max_reconsume_times().to_string()),
);
new_msg.set_delay_time_level(3 + msg.reconsume_times);

let properties = msg.properties();
new_msg.set_properties(properties.clone());

if let Some(ref impl_) = self.default_mqpush_consumer_impl {
if let Some(client_factory) = impl_.client_instance.as_ref() {
if let Some(producer_impl) = client_factory.default_producer.default_mqproducer_impl.as_ref() {
Expand Down Expand Up @@ -263,7 +280,7 @@ impl ConsumeMessagePopOrderlyService {
&self,
msgs: &[ArcMut<MessageExt>],
status: Result<ConsumeOrderlyStatus, rocketmq_error::RocketMQError>,
_context: &ConsumeOrderlyContext,
context: &ConsumeOrderlyContext,
) -> bool {
let status = match status {
Ok(s) => s,
Expand All @@ -281,14 +298,29 @@ impl ConsumeMessagePopOrderlyService {
true
}
ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => {
let suspend_time = if context.get_suspend_current_queue_time_millis() > 0 {
context.get_suspend_current_queue_time_millis() as u64
} else {
1000
};
for msg in msgs {
self.change_invisible_time(msg.as_ref(), 1000).await;
self.change_invisible_time(msg.as_ref(), suspend_time).await;
}
false
}
#[allow(deprecated)]
ConsumeOrderlyStatus::Rollback | ConsumeOrderlyStatus::Commit => {
warn!("Deprecated status: {:?}, treating as NACK", status);
ConsumeOrderlyStatus::Commit => {
for msg in msgs {
self.ack_message(msg.as_ref()).await;
}
true
}
#[allow(deprecated)]
ConsumeOrderlyStatus::Rollback => {
warn!(
"Consumer group {} received deprecated Rollback status, reverting messages",
self.consumer_group
);
for msg in msgs {
self.change_invisible_time(msg.as_ref(), 1000).await;
}
Expand Down Expand Up @@ -386,7 +418,6 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService {
info!("consumeMessageDirectly receive new message: {}", msg);
let mq = MessageQueue::from_parts(msg.topic().clone(), broker_name.unwrap_or_default(), msg.queue_id());
let mut msgs = vec![ArcMut::new(msg)];
let mut context = ConsumeOrderlyContext::new(mq);
self.default_mqpush_consumer_impl
.as_ref()
.unwrap()
Expand All @@ -395,31 +426,48 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService {

let begin_timestamp = Instant::now();

let status = self.message_listener.consume_message(
&msgs.iter().map(|msg| msg.as_ref()).collect::<Vec<&MessageExt>>()[..],
&mut context,
);
let listener = self.message_listener.clone();
let msgs_cloned: Vec<MessageExt> = msgs.iter().map(|m| m.as_ref().clone()).collect();
let blocking_result = tokio::task::spawn_blocking(move || {
let msg_refs: Vec<&MessageExt> = msgs_cloned.iter().collect();
let mut ctx = ConsumeOrderlyContext::new(mq);
let result = listener.consume_message(&msg_refs, &mut ctx);
(result, ctx)
})
.await;

let mut result = ConsumeMessageDirectlyResult::default();
result.set_order(true);
result.set_auto_commit(context.is_auto_commit());
match status {
Ok(status) => match status {
ConsumeOrderlyStatus::Success => {
result.set_consume_result(CMResult::CRSuccess);
}
ConsumeOrderlyStatus::Rollback => {
result.set_consume_result(CMResult::CRRollback);
}
ConsumeOrderlyStatus::Commit => {
result.set_consume_result(CMResult::CRCommit);
}
ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => {
result.set_consume_result(CMResult::CRLater);
match blocking_result {
Ok((status, context)) => {
result.set_auto_commit(context.is_auto_commit());
match status {
Ok(status) => match status {
ConsumeOrderlyStatus::Success => {
result.set_consume_result(CMResult::CRSuccess);
}
ConsumeOrderlyStatus::Rollback => {
result.set_consume_result(CMResult::CRRollback);
}
ConsumeOrderlyStatus::Commit => {
result.set_consume_result(CMResult::CRCommit);
}
ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => {
result.set_consume_result(CMResult::CRLater);
}
},
Err(e) => {
result.set_consume_result(CMResult::CRThrowException);
result.set_remark(CheetahString::from_string(e.to_string()))
}
}
},
Err(e) => {
}
Err(join_err) => {
result.set_auto_commit(true);
result.set_consume_result(CMResult::CRThrowException);
result.set_remark(CheetahString::from_string(e.to_string()))
result.set_remark(CheetahString::from_string(format!(
"consume_message panicked: {join_err}"
)));
}
}
result.set_spent_time_mills(begin_timestamp.elapsed().as_millis() as u64);
Expand Down Expand Up @@ -521,11 +569,30 @@ impl ConsumeRequest {
return;
}

let mut context = ConsumeOrderlyContext::new(self.message_queue.clone());
let msg_refs: Vec<&MessageExt> = msgs.iter().map(|msg| msg.as_ref()).collect();
let status = consume_message_pop_orderly_service
.message_listener
.consume_message(&msg_refs, &mut context);
let listener = consume_message_pop_orderly_service.message_listener.clone();
let msgs_cloned: Vec<MessageExt> = msgs.iter().map(|m| m.as_ref().clone()).collect();
let mq = self.message_queue.clone();
let mq_for_fallback = self.message_queue.clone();
let blocking_result = tokio::task::spawn_blocking(move || {
let msg_refs: Vec<&MessageExt> = msgs_cloned.iter().collect();
let mut ctx = ConsumeOrderlyContext::new(mq);
let result = listener.consume_message(&msg_refs, &mut ctx);
(result, ctx)
})
.await;
let (status, context) = match blocking_result {
Ok(pair) => pair,
Err(e) => {
error!(
"consume_message task panicked: {:?}, Group: {}, MQ: {}",
e, consume_message_pop_orderly_service.consumer_group, self.message_queue
);
(
Ok(ConsumeOrderlyStatus::SuspendCurrentQueueAMoment),
ConsumeOrderlyContext::new(mq_for_fallback),
)
}
};

let continue_consume = consume_message_pop_orderly_service
.process_consume_result(msgs, status, &context)
Expand Down
Loading