[ISSUE #4113]🚀Add RemotingGeneral struct and message processing methods#4114
[ISSUE #4113]🚀Add RemotingGeneral struct and message processing methods#4114rocketmq-rust-bot merged 2 commits intomainfrom
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughRemoved a public method from the InvokeCallback trait and added a private Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Net as Network/Channel
participant Ctx as ConnectionHandlerContext
participant RG as RemotingGeneral
participant RP as RequestProcessor
participant Hooks as RPCHook(s)
participant RespTbl as response_table
Net->>Ctx: RemotingCommand received
Ctx->>RG: process_message_received(cmd)
alt cmd.type == REQUEST
RG->>Hooks: do_before_rpc_hooks(channel, request)
RG->>RP: process_request_command(request)
RP-->>RG: response / ResponseFuture
RG->>Hooks: do_after_rpc_hooks(channel, request, response)
RG->>RespTbl: insert/track ResponseFuture
else cmd.type == RESPONSE
RG->>RespTbl: lookup by opaque -> ResponseFuture
RG->>RG: process_response_command(response)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (3 warnings)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a new RemotingGeneral struct and associated message processing methods to handle remoting command processing. The changes implement the foundation for processing incoming remoting commands by type (REQUEST or RESPONSE) and provide hooks for RPC lifecycle management.
- Adds
RemotingGeneralstruct with fields for request processor, shutdown handling, RPC hooks, and response tracking - Implements message processing dispatch logic to route commands based on type
- Provides placeholder methods for request/response processing and RPC hooks
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
rocketmq-remoting/src/remoting.rs
Outdated
| fn process_request_command(&mut self, channel: Channel, | ||
| ctx: ConnectionHandlerContext, | ||
| cmd: &mut RemotingCommand,) { | ||
|
|
||
| } | ||
|
|
||
| fn process_response_command(&mut self, channel: Channel, | ||
| ctx: ConnectionHandlerContext, | ||
| cmd: &mut RemotingCommand,) { | ||
|
|
||
| } |
There was a problem hiding this comment.
The process_request_command and process_response_command methods are empty implementations. Consider adding TODO comments or basic error handling to indicate their intended functionality, or implement basic logging to track method invocation.
| fn operation_fail(&self, throwable: Box<dyn std::error::Error>); | ||
| } | ||
|
|
||
| #[allow(unused_variables)] |
There was a problem hiding this comment.
The #[allow(unused_variables)] attribute is applied broadly to the entire module. This should be removed once the empty method implementations are completed, as it may hide legitimate unused variable warnings in future development.
| #[allow(unused_variables)] |
| unimplemented!("do_after_rpc_hooks unimplemented") | ||
| } | ||
|
|
||
| pub fn do_before_rpc_hooks( | ||
| &self, | ||
| channel: &Channel, | ||
| request: Option<&mut RemotingCommand>, | ||
| ) -> rocketmq_error::RocketMQResult<()> { | ||
| unimplemented!("do_before_rpc_hooks unimplemented") |
There was a problem hiding this comment.
Both RPC hook methods use unimplemented!() which will panic when called. Consider using todo!() instead to indicate these are placeholder implementations, or implement basic no-op functionality that returns Ok(()) to prevent runtime panics.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4114 +/- ##
==========================================
- Coverage 26.59% 26.57% -0.02%
==========================================
Files 575 576 +1
Lines 81362 81403 +41
==========================================
Hits 21635 21635
- Misses 59727 59768 +41 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
rocketmq-remoting/src/remoting.rs (4)
95-98: BorrowChannelandConnectionHandlerContextto avoid unnecessary movesPassing by value risks ownership moves and copies; borrowing better matches typical handler lifetimes and avoids cloning
Channel.- pub fn process_message_received(&mut self, channel: Channel, - ctx: ConnectionHandlerContext, - cmd: &mut RemotingCommand,) { + pub fn process_message_received(&mut self, channel: &Channel, + ctx: &mut ConnectionHandlerContext, + cmd: &mut RemotingCommand) { match cmd.get_type() { RemotingCommandType::REQUEST => { - self.process_request_command(channel, ctx, cmd); + self.process_request_command(channel, ctx, cmd); } RemotingCommandType::RESPONSE => { - self.process_response_command(channel, ctx, cmd); + self.process_response_command(channel, ctx, cmd); } } } - fn process_request_command(&mut self, channel: Channel, - ctx: ConnectionHandlerContext, - cmd: &mut RemotingCommand,) { + fn process_request_command(&mut self, channel: &Channel, + ctx: &mut ConnectionHandlerContext, + cmd: &mut RemotingCommand) { } - fn process_response_command(&mut self, channel: Channel, - ctx: ConnectionHandlerContext, - cmd: &mut RemotingCommand,) { + fn process_response_command(&mut self, channel: &Channel, + ctx: &mut ConnectionHandlerContext, + cmd: &mut RemotingCommand) { }Also applies to: 109-117
86-92: Tighten hook storage and plan response-table lifecycle
- Prefer
Vec<Arc<dyn RPCHook>>overArc<Vec<Box<dyn RPCHook>>>to avoid double indirection and to align with server-side usage patterns. This also simplifies sharing and iteration.- Response-table needs lifecycle management (timeout sweeps) and, if accessed across tasks, a concurrent map (e.g.,
DashMap) or locking.Minimal type refactor:
- rpc_hooks: Arc<Vec<Box<dyn RPCHook>>>, + rpc_hooks: Vec<Arc<dyn RPCHook>>,If you adopt this, ensure the corresponding registration paths accept
Arc<dyn RPCHook>and adjust call sites.
109-119: Fill in request/response handling (route to processor and complete futures)These stubs should:
- REQUEST: run
do_before_rpc_hooks, invokerequest_processorwithctx/cmd, build/send response, thendo_after_rpc_hooks.- RESPONSE: look up
opaqueinresponse_table, complete theResponseFuture, and remove the entry. Add timeout cleanup elsewhere.I can wire these up with a concrete implementation consistent with your
RequestProcessortrait andResponseFutureAPI. Want me to push a patch?
73-74: Silence dead_code in the interim (optional)Until these types are integrated, avoid noise from dead code warnings.
-#[allow(unused_variables)] +#[allow(unused_variables, dead_code)]
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-remoting/src/remoting.rs(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-remoting/src/remoting.rs (2)
rocketmq-remoting/src/runtime/connection_handler_context.rs (1)
channel(77-79)rocketmq-remoting/src/remoting_server/server.rs (2)
do_after_rpc_hooks(99-111)do_before_rpc_hooks(86-97)
🪛 GitHub Actions: CI
rocketmq-remoting/src/remoting.rs
[error] 72-105: cargo fmt -- --check failed. Formatting changes were detected in rocketmq-remoting/src/remoting.rs. Run 'cargo fmt' to format the code.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: auto-approve
- GitHub Check: build
- GitHub Check: test
🔇 Additional comments (1)
rocketmq-remoting/src/remoting.rs (1)
73-138: Fix rustfmt failure in CICI reports cargo fmt failures in rocketmq-remoting/src/remoting.rs (lines 73–138). I couldn't run cargo fmt here due to the sandbox error (no /proc/self/exe). Run locally:
cargo fmt --allthencargo fmt --all -- --check, commit the formatted changes and push so CI can pass.
rocketmq-remoting/src/remoting.rs
Outdated
| fn do_after_rpc_hooks( | ||
| &self, | ||
| channel: &Channel, | ||
| request: &RemotingCommand, | ||
| response: Option<&mut RemotingCommand>, | ||
| ) -> rocketmq_error::RocketMQResult<()> { | ||
| unimplemented!("do_after_rpc_hooks unimplemented") | ||
| } | ||
|
|
||
| pub fn do_before_rpc_hooks( | ||
| &self, | ||
| channel: &Channel, | ||
| request: Option<&mut RemotingCommand>, | ||
| ) -> rocketmq_error::RocketMQResult<()> { | ||
| unimplemented!("do_before_rpc_hooks unimplemented") | ||
| } |
There was a problem hiding this comment.
Avoid runtime panics: implement RPC hook methods instead of unimplemented!
These will panic if called. Mirror the existing server-side logic to make them no-ops when no hook is present and to uniformly execute hooks when provided.
Apply this diff:
- fn do_after_rpc_hooks(
+ fn do_after_rpc_hooks(
&self,
channel: &Channel,
request: &RemotingCommand,
response: Option<&mut RemotingCommand>,
) -> rocketmq_error::RocketMQResult<()> {
- unimplemented!("do_after_rpc_hooks unimplemented")
+ if let Some(response) = response {
+ for hook in self.rpc_hooks.iter() {
+ hook.do_after_response(channel.remote_address(), request, response)?;
+ }
+ }
+ Ok(())
}
- pub fn do_before_rpc_hooks(
+ pub fn do_before_rpc_hooks(
&self,
channel: &Channel,
request: Option<&mut RemotingCommand>,
) -> rocketmq_error::RocketMQResult<()> {
- unimplemented!("do_before_rpc_hooks unimplemented")
+ if let Some(request) = request {
+ for hook in self.rpc_hooks.iter() {
+ hook.do_before_request(channel.remote_address(), request)?;
+ }
+ }
+ Ok(())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn do_after_rpc_hooks( | |
| &self, | |
| channel: &Channel, | |
| request: &RemotingCommand, | |
| response: Option<&mut RemotingCommand>, | |
| ) -> rocketmq_error::RocketMQResult<()> { | |
| unimplemented!("do_after_rpc_hooks unimplemented") | |
| } | |
| pub fn do_before_rpc_hooks( | |
| &self, | |
| channel: &Channel, | |
| request: Option<&mut RemotingCommand>, | |
| ) -> rocketmq_error::RocketMQResult<()> { | |
| unimplemented!("do_before_rpc_hooks unimplemented") | |
| } | |
| fn do_after_rpc_hooks( | |
| &self, | |
| channel: &Channel, | |
| request: &RemotingCommand, | |
| response: Option<&mut RemotingCommand>, | |
| ) -> rocketmq_error::RocketMQResult<()> { | |
| if let Some(response) = response { | |
| for hook in self.rpc_hooks.iter() { | |
| hook.do_after_response(channel.remote_address(), request, response)?; | |
| } | |
| } | |
| Ok(()) | |
| } | |
| pub fn do_before_rpc_hooks( | |
| &self, | |
| channel: &Channel, | |
| request: Option<&mut RemotingCommand>, | |
| ) -> rocketmq_error::RocketMQResult<()> { | |
| if let Some(request) = request { | |
| for hook in self.rpc_hooks.iter() { | |
| hook.do_before_request(channel.remote_address(), request)?; | |
| } | |
| } | |
| Ok(()) | |
| } |
🤖 Prompt for AI Agents
In rocketmq-remoting/src/remoting.rs around lines 121 to 136, replace the
unimplemented! stubs for do_after_rpc_hooks and do_before_rpc_hooks with real
implementations that mirror the server-side behavior: check whether a configured
RPC hook exists and, if not, return Ok(()); if a hook is present, invoke the
appropriate hook method (passing the channel and request/response objects as the
server-side implementation does), handle any errors by converting/propagating
them into rocketmq_error::RocketMQResult, and ensure both methods return Ok(())
on success so they no longer panic at runtime.
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
rocketmq-remoting/src/remoting.rs (2)
87-93: Unify RPCHook storage type with the public API; avoid Arc<Box<..>> vs Box<..> mismatchRemotingService::register_rpc_hook takes Arc<Box>, but here you store Box. This inconsistency will force awkward conversions and prevents sharing hooks safely. Prefer a consistent type across the codebase, e.g. Vec<Arc<Box>> (or better, Arc<dyn RPCHook + Send + Sync>).
Would you like me to draft a follow-up patch to align types across remoting_server and here?
98-112: Pass Channel/Context by reference to avoid moves and align with hook signaturesCurrent signatures take Channel/Context by value; hooks use &Channel. Passing by reference avoids unnecessary clones/moves and keeps interfaces consistent.
- pub fn process_message_received( - &mut self, - channel: Channel, - ctx: ConnectionHandlerContext, - cmd: &mut RemotingCommand, - ) { + pub fn process_message_received( + &mut self, + channel: &Channel, + ctx: &ConnectionHandlerContext, + cmd: &mut RemotingCommand, + ) { match cmd.get_type() { RemotingCommandType::REQUEST => { - self.process_request_command(channel, ctx, cmd); + self.process_request_command(channel, ctx, cmd); } RemotingCommandType::RESPONSE => { - self.process_response_command(channel, ctx, cmd); + self.process_response_command(channel, ctx, cmd); } } }And update the callees accordingly (see below).
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-remoting/src/remoting.rs(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rocketmq-remoting/src/remoting.rs (2)
rocketmq-remoting/src/runtime/connection_handler_context.rs (1)
channel(77-79)rocketmq-remoting/src/remoting_server/server.rs (2)
do_after_rpc_hooks(99-111)do_before_rpc_hooks(86-97)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: build
- GitHub Check: test
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: auto-approve
🔇 Additional comments (6)
rocketmq-remoting/src/remoting.rs (6)
73-74: Scope down the #[allow(unused_variables)] — don’t suppress at module levelKeep warnings useful. Remove this once stubs are implemented or limit it to specific items.
94-97: Add Send bound to RP if used off-threadLikely required in remoting paths. If RP can be used by worker threads, add Send to the bound.
Apply if appropriate:
- RP: RequestProcessor + Sync + 'static + Clone, + RP: RequestProcessor + Send + Sync + 'static + Clone,
130-137: Replace unimplemented! with safe hook execution (prevents runtime panics)Mirror server-side behavior and no-op when no response/hook is present.
fn do_after_rpc_hooks( &self, channel: &Channel, request: &RemotingCommand, response: Option<&mut RemotingCommand>, ) -> rocketmq_error::RocketMQResult<()> { - unimplemented!("do_after_rpc_hooks unimplemented") + if let Some(response) = response { + for hook in self.rpc_hooks.iter() { + hook.do_after_response(channel.remote_address(), request, response)?; + } + } + Ok(()) }
139-145: Implement before-RPC hook; avoid panic and keep parity with server pathNo-op when no request/hook is present; otherwise execute hooks.
pub fn do_before_rpc_hooks( &self, channel: &Channel, request: Option<&mut RemotingCommand>, ) -> rocketmq_error::RocketMQResult<()> { - unimplemented!("do_before_rpc_hooks unimplemented") + if let Some(request) = request { + for hook in self.rpc_hooks.iter() { + hook.do_before_request(channel.remote_address(), request)?; + } + } + Ok(()) }
122-129: Empty response handling will leak/strand futures in response_tableThis should locate the opaque, complete the corresponding ResponseFuture, and run after-RPC hooks. Add at least logging/TODO to avoid silent drops.
fn process_response_command( &mut self, - channel: Channel, - ctx: ConnectionHandlerContext, + channel: &Channel, + ctx: &ConnectionHandlerContext, cmd: &mut RemotingCommand, ) { + // TODO(#4113): Match response to `response_table` by opaque, complete the future, + // and invoke `do_after_rpc_hooks`. + // Intentionally left as no-op scaffolding. }
114-121: Empty request handling will silently drop inbound requestsAt minimum, add logging/TODO; ideally, wire this to RequestProcessor and RPC hooks (before/after) to keep parity with the server path.
fn process_request_command( &mut self, - channel: Channel, - ctx: ConnectionHandlerContext, + channel: &Channel, + ctx: &ConnectionHandlerContext, cmd: &mut RemotingCommand, ) { + // TODO(#4113): Implement request processing via `request_processor` and + // invoke `do_before_rpc_hooks`/`do_after_rpc_hooks`. + // Intentionally left as no-op scaffolding. }
Which Issue(s) This PR Fixes(Closes)
Fixes #4113
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Refactor
Chores
Notes