[ISSUE #6506]♻️Refactor consume message concurrency handling to improve task management and shutdown process#6507
Conversation
…ve task management and shutdown process
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThe changes refactor concurrent message consumption handling by replacing a runtime-backed spawning mechanism with semaphore-based concurrency control for improved task management and graceful shutdown. Additionally, crate-level recursion limits are increased across multiple files to support deeper macro expansion during compilation. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs (3)
187-199:submit_consume_request_laterdoesn't check the cancellation token, but the callee does — acceptable.The delayed retry task will be harmlessly rejected by
submit_consume_requestat line 319 when the shutdown token is already cancelled. No issue here, but be aware these ghost tasks can fire aftershutdown()returns (since shutdown doesn't await them). If strict post-shutdown silence is needed, you could select on the token inside the delay:Optional: cancel delayed retries on shutdown
fn submit_consume_request_later( &self, msgs: Vec<ArcMut<MessageExt>>, this: ArcMut<Self>, process_queue: Arc<ProcessQueue>, message_queue: MessageQueue, ) { + let token = self.shutdown_token.clone(); tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(5)).await; - this.submit_consume_request(this.clone(), msgs, process_queue, message_queue, true) - .await; + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => { + this.submit_consume_request(this.clone(), msgs, process_queue, message_queue, true) + .await; + } + _ = token.cancelled() => {} + } }); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs` around lines 187 - 199, The delayed-retry helper submit_consume_request_later currently unconditionally sleeps then calls submit_consume_request, which can run after shutdown; to cancel those ghost tasks earlier, change submit_consume_request_later to await either the delay or the shutdown cancellation and only call submit_consume_request when the delay completed (use tokio::select! between tokio::time::sleep(Duration::from_secs(5)) and the shutdown token's cancelled() future), keeping the call to submit_consume_request(this.clone(), msgs, process_queue, message_queue, true). Ensure you reference the same shutdown cancellation token used by submit_consume_request so the delayed task observes shutdown and returns without calling submit_consume_request if cancelled.
357-394: Permit lifecycle is well-managed; consider the retry accumulation scenario.The
_permitbinding correctly ties the semaphore permit to the task's lifetime, ensuring release on both normal completion and panics.One edge-case to consider: under sustained backpressure, each saturation event spawns a new delayed retry via
submit_consume_request_later, and if those retries also hit saturation, the cycle compounds — potentially accumulating many sleepingtokio::spawntasks. This matches the Java SDK's behavior, but you might want to monitor the retry queue depth in production.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs` around lines 357 - 394, spawn_consume_task currently calls submit_consume_request_later on semaphore saturation which can spawn many sleeping retry tasks under sustained backpressure; change the retry strategy so submit_consume_request_later enqueues the (msgs, this, process_queue, message_queue) into a single bounded retry queue (e.g., an mpsc::channel) and have one background retry worker task drain that queue and retry after the 5s delay (or apply backoff), rather than spawning a new tokio::spawn per saturation event; update submit_consume_request_later and add a retry worker initialised by ConsumeMessageConcurrentlyService to avoid unbounded sleeping tasks.
369-383: Note:spawn_blockinginConsumeRequest::runuses Tokio's blocking pool, not the semaphore-bounded pool.The semaphore correctly limits the number of concurrent logical consume tasks, but each task's actual blocking work (
listener.consume_messageat line 464) runs on Tokio's default blocking thread pool (up to 512 threads). Ifconsume_thread_maxis much smaller than 512, the semaphore is the effective bottleneck — which is the intent. Just be aware that under extreme load, the blocking pool configuration (max_blocking_threads) may also need tuning independently ofconsume_thread_max.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs` around lines 369 - 383, The current code acquires a semaphore permit via try_acquire_owned and then tokio::spawn's an async task, but ConsumeRequest::run itself uses tokio::task::spawn_blocking for listener.consume_message, so the semaphore does not bound the runtime's blocking pool; either move the blocking execution under the semaphore permit or avoid an inner spawn_blocking: refactor ConsumeRequest::run so the blocking listener.consume_message is executed while the permit is held (e.g., perform the blocking call directly or have the outer task use tokio::task::spawn_blocking instead of tokio::spawn), or document and tune the runtime's max_blocking_threads to match consume_thread_max; key symbols: try_acquire_owned, tokio::spawn, ConsumeRequest::run, listener.consume_message, spawn_blocking, consume_thread_max, max_blocking_threads.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs`:
- Around line 187-199: The delayed-retry helper submit_consume_request_later
currently unconditionally sleeps then calls submit_consume_request, which can
run after shutdown; to cancel those ghost tasks earlier, change
submit_consume_request_later to await either the delay or the shutdown
cancellation and only call submit_consume_request when the delay completed (use
tokio::select! between tokio::time::sleep(Duration::from_secs(5)) and the
shutdown token's cancelled() future), keeping the call to
submit_consume_request(this.clone(), msgs, process_queue, message_queue, true).
Ensure you reference the same shutdown cancellation token used by
submit_consume_request so the delayed task observes shutdown and returns without
calling submit_consume_request if cancelled.
- Around line 357-394: spawn_consume_task currently calls
submit_consume_request_later on semaphore saturation which can spawn many
sleeping retry tasks under sustained backpressure; change the retry strategy so
submit_consume_request_later enqueues the (msgs, this, process_queue,
message_queue) into a single bounded retry queue (e.g., an mpsc::channel) and
have one background retry worker task drain that queue and retry after the 5s
delay (or apply backoff), rather than spawning a new tokio::spawn per saturation
event; update submit_consume_request_later and add a retry worker initialised by
ConsumeMessageConcurrentlyService to avoid unbounded sleeping tasks.
- Around line 369-383: The current code acquires a semaphore permit via
try_acquire_owned and then tokio::spawn's an async task, but ConsumeRequest::run
itself uses tokio::task::spawn_blocking for listener.consume_message, so the
semaphore does not bound the runtime's blocking pool; either move the blocking
execution under the semaphore permit or avoid an inner spawn_blocking: refactor
ConsumeRequest::run so the blocking listener.consume_message is executed while
the permit is held (e.g., perform the blocking call directly or have the outer
task use tokio::task::spawn_blocking instead of tokio::spawn), or document and
tune the runtime's max_blocking_threads to match consume_thread_max; key
symbols: try_acquire_owned, tokio::spawn, ConsumeRequest::run,
listener.consume_message, spawn_blocking, consume_thread_max,
max_blocking_threads.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
rocketmq-client/benches/concurrent_optimization_benchmark.rsrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rsrocketmq-client/tests/integration_tests.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/examples/admin_builder_pattern.rsrocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/lib.rs
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6507 +/- ##
==========================================
- Coverage 42.17% 42.16% -0.02%
==========================================
Files 946 946
Lines 132075 132130 +55
==========================================
Hits 55708 55708
- Misses 76367 76422 +55 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
Refactor
Chores