[ISSUE #6350]šImplement high-performance oneway send methods and benchmarks for optimization#6351
Conversation
ā¦hmarks for optimization
|
š@mxsm šThanks for your contributionšļ¼ š”CodeRabbit(AI) will review your code firstš„ļ¼ Note šØThe code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code reviewš„. |
WalkthroughThis pull request introduces a high-performance fire-and-forget oneway send pathway across the RocketMQ client stack. The remoting layer adds unbounded oneway invocation, the MQ client API adds unbounded send methods, the producer implements batch oneway sending with background task spawning, and comprehensive benchmarks measure latency, throughput, and concurrent performance. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Application
participant Producer as DefaultMQProducer
participant MQApi as MQClientAPIImpl
participant Remoting as RemotingClient
participant Network as Background Task<br/>(Network I/O)
Client->>Producer: send_oneway_batch([Message])
Producer->>Producer: validate & resolve topic info<br/>select queue
loop For each message
Producer->>Producer: spawn_oneway_send(msg)
Producer->>Producer: build_oneway_request_internal(msg)
Producer->>MQApi: send_oneway_unbounded(addr, request)
MQApi->>Remoting: invoke_oneway_unbounded(addr, request)
Remoting->>Network: spawn background task
Remoting-->>MQApi: return immediately (fire-and-forget)
MQApi-->>Producer: return immediately
end
Producer-->>Client: return count of spawned sends
Network->>Network: attempt client pool lookup & send<br/>errors logged silently
Estimated code review effortšÆ 3 (Moderate) | ā±ļø ~25 minutes Poem
š„ Pre-merge checks | ā 6ā Passed checks (6 passed)
āļø Tip: You can configure your own custom pre-merge checks in the settings. ⨠Finishing touches
š§Ŗ Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Reportā Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6351 +/- ##
==========================================
+ Coverage 42.39% 42.46% +0.06%
==========================================
Files 919 919
Lines 129135 129262 +127
==========================================
+ Hits 54749 54891 +142
+ Misses 74386 74371 -15 ā View full report in Codecov by Sentry. š New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 3
š¤ Fix all issues with AI agents
In `@rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs`:
- Around line 2712-2767: build_oneway_request_internal currently hardcodes
sys_flag, unit_mode, batch and ignores namespace, pre-compressed bodies and
transactional/delay flags, causing mis-encoded messages; update it to reuse the
same request-building logic as send_kernel_impl (extract shared helper that
computes sys_flag, handles compression, transaction/delay flags, unit/batch
flags, sets topic with namespace, and uses msg.get_body_or_compressed()
semantics) so oneway requests are encoded identically, or alternatively make
build_oneway_request_internal perform strict validation (from send_oneway_batch)
and return an error for unsupported features (transactional, compressed
pre-encoded bodies, non-empty namespace) while referencing
build_oneway_request_internal, send_kernel_impl and send_oneway_batch to locate
where to extract/replicate the logic.
In `@rocketmq-remoting/src/clients.rs`:
- Around line 84-96: The docs for invoke_oneway_unbounded claim send errors are
ālogged onlyā but the current implementation drops send errors silently; update
the implementation of invoke_oneway_unbounded (the spawned background task in
clients.rs where the RemotingCommand is sent) to capture the send result and
call the project logger (e.g., log::error or tracing::error) on Err to record
failures, or if you prefer to keep silent drops, change the doc comment to
explicitly state that errors may be discarded; refer to the
invoke_oneway_unbounded function and its spawned task/closure to apply the fix.
In `@rocketmq-remoting/src/clients/rocketmq_tokio_client.rs`:
- Around line 1018-1047: invoke_oneway_unbounded currently drops messages
whenever connection_tables.get(&addr) returns None; change the spawned task to
attempt the same get-or-create client path used elsewhere instead of only
reading the cache: use the cloned
namesrv_addr_list/tokio_client_config/latency_tracker and call the existing
client creation logic (the same function or sequence used by the "get/create"
path you have elsewhere) inside the tokio::spawn so a connection is established
on cold start, then send the oneway request; keep marking the request with
request.mark_oneway_rpc_ref() and preserve the fireāandāforget semantics by
dropping or logging errors after send.
š§¹ Nitpick comments (1)
rocketmq-client/benches/oneway_benchmark.rs (1)
36-55: Consider reusing a started producer outside the bench loop.
Right now each iteration builds a producer and never starts it, so the benchmark mostly measures setup/errorāpath overhead rather than oneway send cost. Hoisting producer init (and optionally gating on a running nameserver via env flag) will make latency numbers more meaningful.
| /// Helper function to build oneway request (simplified version for performance). | ||
| /// | ||
| /// This is used internally by batch oneway to avoid code duplication. | ||
| fn build_oneway_request_internal<T>( | ||
| msg: &mut T, | ||
| mq: &MessageQueue, | ||
| broker_name: &CheetahString, | ||
| producer_config: &ProducerConfig, | ||
| namespace: Option<&str>, | ||
| ) -> rocketmq_error::RocketMQResult<RemotingCommand> | ||
| where | ||
| T: MessageTrait, | ||
| { | ||
| use rocketmq_remoting::code::request_code::RequestCode; | ||
| use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader; | ||
| use rocketmq_remoting::protocol::remoting_command::RemotingCommand; | ||
|
|
||
| // Set message ID | ||
| MessageClientIDSetter::set_uniq_id(msg); | ||
|
|
||
| // Build request header (simplified for oneway) | ||
| let request_header = SendMessageRequestHeader { | ||
| producer_group: CheetahString::from_string(producer_config.producer_group().to_string()), | ||
| topic: CheetahString::from_string(msg.get_topic().to_string()), | ||
| default_topic: CheetahString::from_string(producer_config.create_topic_key().to_string()), | ||
| default_topic_queue_nums: producer_config.default_topic_queue_nums() as i32, | ||
| queue_id: mq.get_queue_id(), | ||
| sys_flag: 0, | ||
| born_timestamp: get_current_millis() as i64, | ||
| flag: msg.get_flag(), | ||
| properties: Some(MessageDecoder::message_properties_to_string(msg.get_properties())), | ||
| reconsume_times: Some(0), | ||
| unit_mode: Some(false), | ||
| batch: Some(false), | ||
| topic_request_header: Some(TopicRequestHeader { | ||
| rpc_request_header: Some(RpcRequestHeader { | ||
| broker_name: Some(broker_name.clone()), | ||
| ..Default::default() | ||
| }), | ||
| ..Default::default() | ||
| }), | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| // Build command | ||
| let mut request = RemotingCommand::create_request_command(RequestCode::SendMessage, request_header); | ||
|
|
||
| // Set body (zero-copy: Bytes is reference-counted) | ||
| if let Some(body) = msg.get_body() { | ||
| request.set_body_mut_ref(body.clone()); | ||
| } else { | ||
| return Err(mq_client_err!(-1, "Message body is None")); | ||
| } | ||
|
|
||
| Ok(request) | ||
| } |
There was a problem hiding this comment.
Oneway batch request builder omits critical message flags and namespace handling.
build_oneway_request_internal hardācodes sys_flag = 0, unit_mode = false, batch = false, and only uses msg.get_body(). It ignores the namespace parameter and any preācompressed body or transaction/delay flags. This diverges from send_kernel_impl and can misāencode messages (e.g., transactional, compressed, or namespaced). Please extract the shared requestābuilding logic (including sys_flag/compression/namespace handling) or explicitly reject unsupported message types in send_oneway_batch to avoid silent misāsend.
š¤ Prompt for AI Agents
In `@rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs`
around lines 2712 - 2767, build_oneway_request_internal currently hardcodes
sys_flag, unit_mode, batch and ignores namespace, pre-compressed bodies and
transactional/delay flags, causing mis-encoded messages; update it to reuse the
same request-building logic as send_kernel_impl (extract shared helper that
computes sys_flag, handles compression, transaction/delay flags, unit/batch
flags, sets topic with namespace, and uses msg.get_body_or_compressed()
semantics) so oneway requests are encoded identically, or alternatively make
build_oneway_request_internal perform strict validation (from send_oneway_batch)
and return an error for unsupported features (transactional, compressed
pre-encoded bodies, non-empty namespace) while referencing
build_oneway_request_internal, send_kernel_impl and send_oneway_batch to locate
where to extract/replicate the logic.
| /// Invokes a command on a specified address without waiting for a response or confirmation. | ||
| /// This is a true fire-and-forget method that returns immediately after spawning the send task. | ||
| /// | ||
| /// # Arguments | ||
| /// * `addr` - The address to invoke the command on. | ||
| /// * `request` - The `RemotingCommand` to be sent. | ||
| /// | ||
| /// # Semantics | ||
| /// - Returns immediately after spawning background task | ||
| /// - Does NOT wait for the message to be sent | ||
| /// - Does NOT wait for network I/O | ||
| /// - Errors are silently dropped (logged only) | ||
| fn invoke_oneway_unbounded(&self, addr: CheetahString, request: RemotingCommand); |
There was a problem hiding this comment.
Clarify the errorāhandling semantics in this doc block.
The trait doc says errors are ālogged only,ā but the current unbounded implementation drops send errors silently; either add logging on failure or update the docs to state errors may be discarded.
š¤ Prompt for AI Agents
In `@rocketmq-remoting/src/clients.rs` around lines 84 - 96, The docs for
invoke_oneway_unbounded claim send errors are ālogged onlyā but the current
implementation drops send errors silently; update the implementation of
invoke_oneway_unbounded (the spawned background task in clients.rs where the
RemotingCommand is sent) to capture the send result and call the project logger
(e.g., log::error or tracing::error) on Err to record failures, or if you prefer
to keep silent drops, change the doc comment to explicitly state that errors may
be discarded; refer to the invoke_oneway_unbounded function and its spawned
task/closure to apply the fix.
| fn invoke_oneway_unbounded(&self, addr: CheetahString, request: RemotingCommand) { | ||
| // True fire-and-forget: spawn immediately and return | ||
| // No waiting, no timeout enforcement, errors are dropped silently | ||
| // Clone necessary data for the spawned task | ||
| let connection_tables = self.connection_tables.clone(); | ||
| let namesrv_addr_list = self.namesrv_addr_list.clone(); | ||
| let tokio_client_config = self.tokio_client_config.clone(); | ||
| let latency_tracker = self.latency_tracker.clone(); | ||
|
|
||
| tokio::spawn(async move { | ||
| // Get client in background task using connection tables directly | ||
| let client_result = connection_tables.get(&addr); | ||
|
|
||
| if let Some(client_ref) = client_result { | ||
| let mut client = client_ref.value().clone(); | ||
| // Mark as oneway and send | ||
| let mut request = request; | ||
| request.mark_oneway_rpc_ref(); | ||
|
|
||
| let _ = client.send(request).await; | ||
| } else { | ||
| tracing::debug!( | ||
| "No cached client for oneway send to {}, will be skipped in fire-and-forget mode", | ||
| addr | ||
| ); | ||
| } | ||
| }); | ||
|
|
||
| // Return immediately - task is running in background | ||
| } |
There was a problem hiding this comment.
Fireāandāforget path drops messages when no cached client.
invoke_oneway_unbounded only checks connection_tables and skips if the client isnāt cached, so cold starts can silently drop every send even when a connection could be established. Consider using the existing get/create path inside the spawned task (or explicitly document a preāwarmed connection requirement).
š¤ Prompt for AI Agents
In `@rocketmq-remoting/src/clients/rocketmq_tokio_client.rs` around lines 1018 -
1047, invoke_oneway_unbounded currently drops messages whenever
connection_tables.get(&addr) returns None; change the spawned task to attempt
the same get-or-create client path used elsewhere instead of only reading the
cache: use the cloned namesrv_addr_list/tokio_client_config/latency_tracker and
call the existing client creation logic (the same function or sequence used by
the "get/create" path you have elsewhere) inside the tokio::spawn so a
connection is established on cold start, then send the oneway request; keep
marking the request with request.mark_oneway_rpc_ref() and preserve the
fireāandāforget semantics by dropping or logging errors after send.
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ā
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Tests
Chores