Skip to content

Commit d59dc56

Browse files
authored
[ISSUE #6529]♻️Refactor pop order consume message handling for improved error handling and clarity (#6530)
1 parent 25d5c5b commit d59dc56

File tree

1 file changed

+109
-42
lines changed

1 file changed

+109
-42
lines changed

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs

Lines changed: 109 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use cheetah_string::CheetahString;
2525
use dashmap::DashSet;
2626
use rocketmq_common::common::message::message_ext::MessageExt;
2727
use rocketmq_common::common::message::message_queue::MessageQueue;
28+
use rocketmq_common::common::message::MessageConst;
2829
use rocketmq_common::common::message::MessageTrait;
30+
use rocketmq_common::MessageAccessor::MessageAccessor;
2931
use rocketmq_remoting::protocol::body::cm_result::CMResult;
3032
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
3133
use rocketmq_rust::ArcMut;
@@ -170,15 +172,19 @@ impl ConsumeMessagePopOrderlyService {
170172
let max_times = self.get_max_reconsume_times();
171173

172174
for msg in msgs {
173-
let msg_mut = msg.mut_from_ref();
174-
if msg_mut.reconsume_times >= max_times {
175+
let reconsume_times = msg.reconsume_times;
176+
if reconsume_times >= max_times {
177+
MessageAccessor::set_reconsume_time(
178+
msg.mut_from_ref(),
179+
CheetahString::from_string(reconsume_times.to_string()),
180+
);
175181
if !self.send_message_back(msg.as_ref()).await {
176182
suspend = true;
177-
msg_mut.reconsume_times += 1;
183+
msg.mut_from_ref().reconsume_times = reconsume_times + 1;
178184
}
179185
} else {
180186
suspend = true;
181-
msg_mut.reconsume_times += 1;
187+
msg.mut_from_ref().reconsume_times = reconsume_times + 1;
182188
}
183189
}
184190

@@ -196,16 +202,27 @@ impl ConsumeMessagePopOrderlyService {
196202
Vec::new()
197203
};
198204

199-
let mut new_msg = Message::builder()
200-
.topic(&retry_topic)
201-
.body(body.clone())
202-
.build_unchecked();
205+
let mut new_msg = Message::builder().topic(&retry_topic).body(body).build_unchecked();
203206

207+
MessageAccessor::set_properties(&mut new_msg, msg.properties().clone());
208+
let origin_msg_id = MessageAccessor::get_origin_message_id(msg).unwrap_or(msg.msg_id.clone());
209+
MessageAccessor::set_origin_message_id(&mut new_msg, origin_msg_id);
210+
new_msg.set_flag(msg.get_flag());
211+
MessageAccessor::put_property(
212+
&mut new_msg,
213+
CheetahString::from_static_str(MessageConst::PROPERTY_RETRY_TOPIC),
214+
msg.topic().to_owned(),
215+
);
216+
MessageAccessor::set_reconsume_time(
217+
&mut new_msg,
218+
CheetahString::from_string(msg.reconsume_times.to_string()),
219+
);
220+
MessageAccessor::set_max_reconsume_times(
221+
&mut new_msg,
222+
CheetahString::from_string(self.get_max_reconsume_times().to_string()),
223+
);
204224
new_msg.set_delay_time_level(3 + msg.reconsume_times);
205225

206-
let properties = msg.properties();
207-
new_msg.set_properties(properties.clone());
208-
209226
if let Some(ref impl_) = self.default_mqpush_consumer_impl {
210227
if let Some(client_factory) = impl_.client_instance.as_ref() {
211228
if let Some(producer_impl) = client_factory.default_producer.default_mqproducer_impl.as_ref() {
@@ -263,7 +280,7 @@ impl ConsumeMessagePopOrderlyService {
263280
&self,
264281
msgs: &[ArcMut<MessageExt>],
265282
status: Result<ConsumeOrderlyStatus, rocketmq_error::RocketMQError>,
266-
_context: &ConsumeOrderlyContext,
283+
context: &ConsumeOrderlyContext,
267284
) -> bool {
268285
let status = match status {
269286
Ok(s) => s,
@@ -281,14 +298,29 @@ impl ConsumeMessagePopOrderlyService {
281298
true
282299
}
283300
ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => {
301+
let suspend_time = if context.get_suspend_current_queue_time_millis() > 0 {
302+
context.get_suspend_current_queue_time_millis() as u64
303+
} else {
304+
1000
305+
};
284306
for msg in msgs {
285-
self.change_invisible_time(msg.as_ref(), 1000).await;
307+
self.change_invisible_time(msg.as_ref(), suspend_time).await;
286308
}
287309
false
288310
}
289311
#[allow(deprecated)]
290-
ConsumeOrderlyStatus::Rollback | ConsumeOrderlyStatus::Commit => {
291-
warn!("Deprecated status: {:?}, treating as NACK", status);
312+
ConsumeOrderlyStatus::Commit => {
313+
for msg in msgs {
314+
self.ack_message(msg.as_ref()).await;
315+
}
316+
true
317+
}
318+
#[allow(deprecated)]
319+
ConsumeOrderlyStatus::Rollback => {
320+
warn!(
321+
"Consumer group {} received deprecated Rollback status, reverting messages",
322+
self.consumer_group
323+
);
292324
for msg in msgs {
293325
self.change_invisible_time(msg.as_ref(), 1000).await;
294326
}
@@ -386,7 +418,6 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService {
386418
info!("consumeMessageDirectly receive new message: {}", msg);
387419
let mq = MessageQueue::from_parts(msg.topic().clone(), broker_name.unwrap_or_default(), msg.queue_id());
388420
let mut msgs = vec![ArcMut::new(msg)];
389-
let mut context = ConsumeOrderlyContext::new(mq);
390421
self.default_mqpush_consumer_impl
391422
.as_ref()
392423
.unwrap()
@@ -395,31 +426,48 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService {
395426

396427
let begin_timestamp = Instant::now();
397428

398-
let status = self.message_listener.consume_message(
399-
&msgs.iter().map(|msg| msg.as_ref()).collect::<Vec<&MessageExt>>()[..],
400-
&mut context,
401-
);
429+
let listener = self.message_listener.clone();
430+
let msgs_cloned: Vec<MessageExt> = msgs.iter().map(|m| m.as_ref().clone()).collect();
431+
let blocking_result = tokio::task::spawn_blocking(move || {
432+
let msg_refs: Vec<&MessageExt> = msgs_cloned.iter().collect();
433+
let mut ctx = ConsumeOrderlyContext::new(mq);
434+
let result = listener.consume_message(&msg_refs, &mut ctx);
435+
(result, ctx)
436+
})
437+
.await;
438+
402439
let mut result = ConsumeMessageDirectlyResult::default();
403440
result.set_order(true);
404-
result.set_auto_commit(context.is_auto_commit());
405-
match status {
406-
Ok(status) => match status {
407-
ConsumeOrderlyStatus::Success => {
408-
result.set_consume_result(CMResult::CRSuccess);
409-
}
410-
ConsumeOrderlyStatus::Rollback => {
411-
result.set_consume_result(CMResult::CRRollback);
412-
}
413-
ConsumeOrderlyStatus::Commit => {
414-
result.set_consume_result(CMResult::CRCommit);
415-
}
416-
ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => {
417-
result.set_consume_result(CMResult::CRLater);
441+
match blocking_result {
442+
Ok((status, context)) => {
443+
result.set_auto_commit(context.is_auto_commit());
444+
match status {
445+
Ok(status) => match status {
446+
ConsumeOrderlyStatus::Success => {
447+
result.set_consume_result(CMResult::CRSuccess);
448+
}
449+
ConsumeOrderlyStatus::Rollback => {
450+
result.set_consume_result(CMResult::CRRollback);
451+
}
452+
ConsumeOrderlyStatus::Commit => {
453+
result.set_consume_result(CMResult::CRCommit);
454+
}
455+
ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => {
456+
result.set_consume_result(CMResult::CRLater);
457+
}
458+
},
459+
Err(e) => {
460+
result.set_consume_result(CMResult::CRThrowException);
461+
result.set_remark(CheetahString::from_string(e.to_string()))
462+
}
418463
}
419-
},
420-
Err(e) => {
464+
}
465+
Err(join_err) => {
466+
result.set_auto_commit(true);
421467
result.set_consume_result(CMResult::CRThrowException);
422-
result.set_remark(CheetahString::from_string(e.to_string()))
468+
result.set_remark(CheetahString::from_string(format!(
469+
"consume_message panicked: {join_err}"
470+
)));
423471
}
424472
}
425473
result.set_spent_time_mills(begin_timestamp.elapsed().as_millis() as u64);
@@ -521,11 +569,30 @@ impl ConsumeRequest {
521569
return;
522570
}
523571

524-
let mut context = ConsumeOrderlyContext::new(self.message_queue.clone());
525-
let msg_refs: Vec<&MessageExt> = msgs.iter().map(|msg| msg.as_ref()).collect();
526-
let status = consume_message_pop_orderly_service
527-
.message_listener
528-
.consume_message(&msg_refs, &mut context);
572+
let listener = consume_message_pop_orderly_service.message_listener.clone();
573+
let msgs_cloned: Vec<MessageExt> = msgs.iter().map(|m| m.as_ref().clone()).collect();
574+
let mq = self.message_queue.clone();
575+
let mq_for_fallback = self.message_queue.clone();
576+
let blocking_result = tokio::task::spawn_blocking(move || {
577+
let msg_refs: Vec<&MessageExt> = msgs_cloned.iter().collect();
578+
let mut ctx = ConsumeOrderlyContext::new(mq);
579+
let result = listener.consume_message(&msg_refs, &mut ctx);
580+
(result, ctx)
581+
})
582+
.await;
583+
let (status, context) = match blocking_result {
584+
Ok(pair) => pair,
585+
Err(e) => {
586+
error!(
587+
"consume_message task panicked: {:?}, Group: {}, MQ: {}",
588+
e, consume_message_pop_orderly_service.consumer_group, self.message_queue
589+
);
590+
(
591+
Ok(ConsumeOrderlyStatus::SuspendCurrentQueueAMoment),
592+
ConsumeOrderlyContext::new(mq_for_fallback),
593+
)
594+
}
595+
};
529596

530597
let continue_consume = consume_message_pop_orderly_service
531598
.process_consume_result(msgs, status, &context)

0 commit comments

Comments
 (0)