Skip to content

Commit fc1ee91

Browse files
authored
[ISSUE #6526]♻️Refactor consume message handling for improved clarity and performance (#6527)
1 parent e2d20c0 commit fc1ee91

File tree

1 file changed

+87
-82
lines changed

1 file changed

+87
-82
lines changed

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

Lines changed: 87 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::time::Duration;
2121
use std::time::Instant;
2222

2323
use cheetah_string::CheetahString;
24-
use rocketmq_common::common::message::message_client_ext::MessageClientExt;
2524
use rocketmq_common::common::message::message_ext::MessageExt;
2625
use rocketmq_common::common::message::message_queue::MessageQueue;
2726
use rocketmq_common::common::message::MessageConst;
@@ -88,9 +87,7 @@ impl ConsumeMessagePopConcurrentlyService {
8887
}
8988

9089
impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
91-
fn start(&mut self, this: ArcMut<Self>) {
92-
// nothing to do need
93-
}
90+
fn start(&mut self, this: ArcMut<Self>) {}
9491

9592
async fn shutdown(&mut self, await_terminate_millis: u64) {
9693
info!(
@@ -126,7 +123,6 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
126123
let old_size = self.max_concurrency.load(Ordering::Acquire);
127124
self.max_concurrency.store(core_pool_size, Ordering::Release);
128125

129-
// Adjust semaphore permits
130126
if core_pool_size > old_size {
131127
let diff = core_pool_size - old_size;
132128
self.concurrency_limiter.add_permits(diff);
@@ -135,8 +131,6 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
135131
self.consumer_group, old_size, core_pool_size
136132
);
137133
} else if core_pool_size < old_size {
138-
// Note: Tokio Semaphore doesn't support reducing permits directly
139-
// Permits will naturally decrease as tasks complete
140134
info!(
141135
"{} ConsumeMessagePopConcurrentlyService decrease core pool size from {} to {} (will take effect \
142136
gradually)",
@@ -182,24 +176,35 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
182176

183177
let begin_timestamp = Instant::now();
184178

185-
let msgs_refs: Vec<&MessageExt> = msgs.iter().map(|msg| msg.as_ref()).collect();
186-
let status = self.message_listener.consume_message(&msgs_refs, &context);
179+
let listener = self.message_listener.clone();
180+
let msgs_cloned: Vec<MessageExt> = msgs.iter().map(|m| m.as_ref().clone()).collect();
181+
let status_result = tokio::task::spawn_blocking(move || {
182+
let msgs_refs: Vec<&MessageExt> = msgs_cloned.iter().collect();
183+
listener.consume_message(&msgs_refs, &context)
184+
})
185+
.await;
187186
let mut result = ConsumeMessageDirectlyResult::default();
188187
result.set_order(false);
189188
result.set_auto_commit(true);
190-
match status {
191-
Ok(status) => match status {
189+
match status_result {
190+
Ok(Ok(status)) => match status {
192191
ConsumeConcurrentlyStatus::ConsumeSuccess => {
193192
result.set_consume_result(CMResult::CRSuccess);
194193
}
195194
ConsumeConcurrentlyStatus::ReconsumeLater => {
196195
result.set_consume_result(CMResult::CRLater);
197196
}
198197
},
199-
Err(e) => {
198+
Ok(Err(e)) => {
200199
result.set_consume_result(CMResult::CRThrowException);
201200
result.set_remark(CheetahString::from_string(e.to_string()))
202201
}
202+
Err(join_err) => {
203+
result.set_consume_result(CMResult::CRThrowException);
204+
result.set_remark(CheetahString::from_string(format!(
205+
"consume_message panicked: {join_err}"
206+
)))
207+
}
203208
}
204209
result.set_spent_time_mills(begin_timestamp.elapsed().as_millis() as u64);
205210
info!("consumeMessageDirectly Result: {}", result);
@@ -225,15 +230,15 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
225230
message_queue: &MessageQueue,
226231
) {
227232
let consume_batch_size = self.consumer_config.consume_message_batch_max_size;
228-
let msgs = msgs
229-
.into_iter()
230-
.map(|msg| ArcMut::new(MessageClientExt::new(msg)))
231-
.collect::<Vec<ArcMut<MessageClientExt>>>();
232233
if msgs.len() <= consume_batch_size as usize {
233-
let mut request = ConsumeRequest::new(msgs, Arc::new(process_queue.clone()), message_queue.clone());
234-
request.consumer_group = self.consumer_group.clone();
235-
request.message_listener = self.message_listener.clone();
236-
request.default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.clone();
234+
let request = ConsumeRequest::new(
235+
msgs,
236+
Arc::new(process_queue.clone()),
237+
message_queue.clone(),
238+
self.consumer_group.clone(),
239+
self.message_listener.clone(),
240+
self.default_mqpush_consumer_impl.clone(),
241+
);
237242
let limiter = self.concurrency_limiter.clone();
238243
let stopped = self.stopped.clone();
239244
tokio::spawn(async move {
@@ -260,13 +265,16 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
260265
let stopped = self.stopped.clone();
261266
msgs.chunks(consume_batch_size as usize)
262267
.map(|t| t.to_vec())
263-
.for_each(|msgs| {
264-
let mut consume_request =
265-
ConsumeRequest::new(msgs, Arc::new(process_queue.clone()), message_queue.clone());
266-
consume_request.consumer_group = consumer_group.clone();
267-
consume_request.message_listener = message_listener.clone();
268-
consume_request.default_mqpush_consumer_impl = default_impl.clone();
269-
let pop_consume_message_concurrently_service = this.clone();
268+
.for_each(|chunk| {
269+
let consume_request = ConsumeRequest::new(
270+
chunk,
271+
Arc::new(process_queue.clone()),
272+
message_queue.clone(),
273+
consumer_group.clone(),
274+
message_listener.clone(),
275+
default_impl.clone(),
276+
);
277+
let pop_service = this.clone();
270278
let limiter_clone = limiter.clone();
271279
let stopped_clone = stopped.clone();
272280
tokio::spawn(async move {
@@ -282,7 +290,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
282290
}
283291
};
284292

285-
consume_request.run(pop_consume_message_concurrently_service).await;
293+
consume_request.run(pop_service).await;
286294
drop(permit);
287295
});
288296
});
@@ -336,7 +344,6 @@ impl ConsumeMessagePopConcurrentlyService {
336344
}
337345
}
338346

339-
//ack if consume success
340347
if ack_index >= 0 {
341348
for i in 0..=ack_index {
342349
let msg = &consume_request.msgs[i as usize];
@@ -349,20 +356,17 @@ impl ConsumeMessagePopConcurrentlyService {
349356
}
350357
}
351358

352-
//consume later if consume fail
353359
for i in (ack_index + 1) as usize..consume_request.msgs.len() {
354360
let msg = &consume_request.msgs[i];
355361
consume_request.process_queue.ack();
356-
357-
// More than maxReconsumeTimes
358362
if msg.reconsume_times >= self.consumer_config.max_reconsume_times {
359363
self.check_need_ack_or_delay(msg).await;
360364
continue;
361365
}
362366

363367
let delay_level = context.delay_level_when_next_consume;
364-
let consumer_group = &self.consumer_group.clone();
365-
self.change_pop_invisible_time(msg, consumer_group, delay_level).await;
368+
let consumer_group = self.consumer_group.clone();
369+
self.change_pop_invisible_time(msg, &consumer_group, delay_level).await;
366370
}
367371
}
368372

@@ -428,9 +432,7 @@ impl ConsumeMessagePopConcurrentlyService {
428432
struct DefaultAckCallback;
429433

430434
impl AckCallback for DefaultAckCallback {
431-
fn on_success(&self, ack_result: AckResult) {
432-
//nothing to do
433-
}
435+
fn on_success(&self, ack_result: AckResult) {}
434436

435437
fn on_exception(&self, e: Box<dyn Error>) {
436438
error!("changePopInvisibleTime exception: {}", e);
@@ -461,7 +463,6 @@ impl ConsumeMessagePopConcurrentlyService {
461463

462464
struct ConsumeRequest {
463465
msgs: Vec<ArcMut<MessageExt>>,
464-
//msgs: Vec<MessageExt>,
465466
process_queue: Arc<PopProcessQueue>,
466467
message_queue: MessageQueue,
467468
pop_time: u64,
@@ -473,15 +474,18 @@ struct ConsumeRequest {
473474

474475
impl ConsumeRequest {
475476
pub fn new(
476-
msgs: Vec<ArcMut<MessageClientExt>>,
477+
msgs: Vec<MessageExt>,
477478
process_queue: Arc<PopProcessQueue>,
478479
message_queue: MessageQueue,
480+
consumer_group: CheetahString,
481+
message_listener: ArcMessageListenerConcurrently,
482+
default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
479483
) -> Self {
480484
let mut pop_time = 0u64;
481485
let mut invisible_time = 0u64;
482486

483-
if !msgs.is_empty() {
484-
if let Some(extra_info) = msgs[0].property(&CheetahString::from_static_str(MessageConst::PROPERTY_POP_CK)) {
487+
if let Some(first) = msgs.first() {
488+
if let Some(extra_info) = first.property(&CheetahString::from_static_str(MessageConst::PROPERTY_POP_CK)) {
485489
let extra_info_strs = ExtraInfoUtil::split(&extra_info);
486490
if let Ok(pt) = ExtraInfoUtil::get_pop_time(&extra_info_strs) {
487491
pop_time = pt as u64;
@@ -492,35 +496,17 @@ impl ConsumeRequest {
492496
}
493497
}
494498

495-
// Convert MessageClientExt to MessageExt by extracting the inner field
496-
let msgs_ext: Vec<ArcMut<MessageExt>> = msgs
497-
.into_iter()
498-
.map(|m| ArcMut::new(m.message_ext_inner.clone()))
499-
.collect();
500-
501-
// Create a dummy listener - will be replaced when spawning the task
502-
struct DummyListener;
503-
impl crate::consumer::listener::message_listener_concurrently::MessageListenerConcurrently for DummyListener {
504-
fn consume_message(
505-
&self,
506-
_msgs: &[&MessageExt],
507-
_context: &crate::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext,
508-
) -> rocketmq_error::RocketMQResult<
509-
crate::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus,
510-
> {
511-
Ok(crate::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus::ConsumeSuccess)
512-
}
513-
}
499+
let msgs_arc: Vec<ArcMut<MessageExt>> = msgs.into_iter().map(ArcMut::new).collect();
514500

515501
Self {
516-
msgs: msgs_ext,
502+
msgs: msgs_arc,
517503
process_queue,
518504
message_queue,
519505
pop_time,
520506
invisible_time,
521-
consumer_group: CheetahString::new(),
522-
message_listener: Arc::new(DummyListener),
523-
default_mqpush_consumer_impl: None,
507+
consumer_group,
508+
message_listener,
509+
default_mqpush_consumer_impl,
524510
}
525511
}
526512

@@ -532,10 +518,7 @@ impl ConsumeRequest {
532518
get_current_millis().saturating_sub(self.pop_time) >= self.invisible_time
533519
}
534520

535-
pub async fn run(
536-
&mut self,
537-
mut consume_message_concurrently_service: ArcMut<ConsumeMessagePopConcurrentlyService>,
538-
) {
521+
pub async fn run(mut self, mut consume_message_concurrently_service: ArcMut<ConsumeMessagePopConcurrentlyService>) {
539522
if consume_message_concurrently_service.stopped.load(Ordering::Acquire) {
540523
warn!(
541524
"run, service stopped, discard consume request for {}",
@@ -578,12 +561,6 @@ impl ConsumeRequest {
578561
self.process_queue.inc_found_msg(self.msgs.len());
579562
return;
580563
}
581-
let context = ConsumeConcurrentlyContext {
582-
message_queue: self.message_queue.clone(),
583-
delay_level_when_next_consume: 0,
584-
ack_index: i32::MAX,
585-
};
586-
587564
let mut default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.as_ref().unwrap().clone();
588565
default_mqpush_consumer_impl.reset_retry_and_namespace(&mut self.msgs, self.consumer_group.as_str());
589566
let mut consume_message_context = None;
@@ -619,12 +596,26 @@ impl ConsumeRequest {
619596
});
620597
default_mqpush_consumer_impl.execute_hook_before(&mut consume_message_context);
621598
}
622-
let msgs_refs: Vec<&MessageExt> = self.msgs.iter().map(|msg| msg.as_ref()).collect();
623-
match self.message_listener.consume_message(&msgs_refs, &context) {
624-
Ok(value) => {
599+
600+
let listener = self.message_listener.clone();
601+
let msgs_cloned: Vec<MessageExt> = self.msgs.iter().map(|m| m.as_ref().clone()).collect();
602+
let context = ConsumeConcurrentlyContext {
603+
message_queue: self.message_queue.clone(),
604+
delay_level_when_next_consume: 0,
605+
ack_index: i32::MAX,
606+
};
607+
let blocking_result = tokio::task::spawn_blocking(move || {
608+
let msgs_refs: Vec<&MessageExt> = msgs_cloned.iter().collect();
609+
let result = listener.consume_message(&msgs_refs, &context);
610+
(result, context)
611+
})
612+
.await;
613+
let context = match blocking_result {
614+
Ok((Ok(value), ctx)) => {
625615
status = Some(value);
616+
ctx
626617
}
627-
Err(e) => {
618+
Ok((Err(e), ctx)) => {
628619
has_exception = true;
629620
error!(
630621
"consumeMessage exception: {:?}, Group: {}, Msgs: {}, MQ: {}",
@@ -633,8 +624,24 @@ impl ConsumeRequest {
633624
self.msgs.len(),
634625
self.message_queue
635626
);
627+
ctx
636628
}
637-
}
629+
Err(join_err) => {
630+
has_exception = true;
631+
error!(
632+
"consumeMessage task panicked: {:?}, Group: {}, Msgs: {}, MQ: {}",
633+
join_err,
634+
self.consumer_group,
635+
self.msgs.len(),
636+
self.message_queue
637+
);
638+
ConsumeConcurrentlyContext {
639+
message_queue: self.message_queue.clone(),
640+
delay_level_when_next_consume: 0,
641+
ack_index: i32::MAX,
642+
}
643+
}
644+
};
638645
let consume_rt = begin_timestamp.elapsed().as_millis() as u64;
639646
let return_type = match status {
640647
None => {
@@ -650,7 +657,6 @@ impl ConsumeRequest {
650657
} else if s == ConsumeConcurrentlyStatus::ReconsumeLater {
651658
ConsumeReturnType::Failed
652659
} else {
653-
// Must be ConsumeSuccess
654660
ConsumeReturnType::Success
655661
}
656662
}
@@ -681,7 +687,6 @@ impl ConsumeRequest {
681687
self.consumer_group, self.message_queue,
682688
);
683689
} else if self.is_pop_timeout() {
684-
// Java: processQueue.decFoundMsg(-msgs.size()) = subtract negative = add
685690
self.process_queue.inc_found_msg(self.msgs.len());
686691
warn!(
687692
"processQueue invalid or popTimeout. isDropped={}, isPopTimeout={}, messageQueue={}, msgs={}",
@@ -694,7 +699,7 @@ impl ConsumeRequest {
694699
let this = consume_message_concurrently_service.clone();
695700

696701
consume_message_concurrently_service
697-
.process_consume_result(this, status.unwrap(), &context, self)
702+
.process_consume_result(this, status.unwrap(), &context, &mut self)
698703
.await;
699704
}
700705
}

0 commit comments

Comments
 (0)