[ISSUE #3519]⚡️Async socket handling migration using tokio-util for HA connection layer#3520
[ISSUE #3519]⚡️Async socket handling migration using tokio-util for HA connection layer#3520rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
…A connection layer
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThe changes migrate the HA connection layer from using raw Tokio socket read/write halves to a framed stream abstraction with Changes
Sequence Diagram(s)sequenceDiagram
participant TcpStream
participant Framed
participant SplitSink
participant SplitStream
participant WriteSocketService
participant ReadSocketService
TcpStream->>Framed: Wrap with BytesCodec (Framed::with_capacity)
Framed->>SplitSink: split() -> writer (SplitSink)
Framed->>SplitStream: split() -> reader (SplitStream)
WriteSocketService->>SplitSink: send(Bytes)
ReadSocketService->>SplitStream: next() (currently unimplemented)
Assessment against linked issues
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Pull Request Overview
This PR migrates the HA connection layer to use tokio-util’s framed streams for asynchronous socket handling.
- Introduces
FramedwithBytesCodecand replaces manual split/read/write halves. - Updates
ReadSocketServiceandWriteSocketServiceto useSplitStream/SplitSink. - Stubs out the read service implementation and adds
tokio-utilworkspace override inCargo.toml.
Reviewed Changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| rocketmq-store/src/ha/default_ha_connection.rs | Refactor socket splitting to use tokio_util::codec::Framed, change reader/writer types, and stub out ReadSocketService. |
| rocketmq-store/Cargo.toml | Add tokio-util.workspace = true under workspace overrides. |
| // ha_service.remove_connection(&client_address).await; | ||
| info!("ReadSocketService ended for client: {}", client_address); | ||
| info!("ReadSocketService ended for client: {}", client_address);*/ | ||
| unimplemented!("ReadSocketService is not implemented yet"); |
There was a problem hiding this comment.
The run_service method for ReadSocketService is currently unimplemented and will panic at runtime; consider implementing or removing this stub to restore read functionality.
| message_store_config: Arc<MessageStoreConfig>, | ||
| ) { | ||
| info!("ReadSocketService started for client: {}", client_address); | ||
| /* info!("ReadSocketService started for client: {}", client_address); |
There was a problem hiding this comment.
[nitpick] Large commented-out sections reduce readability; consider removing or isolating dead code.
| let socket_stream = self.socket_stream.take().unwrap(); | ||
| let (reader, writer) = socket_stream.into_split(); | ||
| let tcp_stream = self.socket_stream.take().unwrap(); | ||
| let framed = Framed::with_capacity(tcp_stream, BytesCodec::new(), 1024 * 4); |
There was a problem hiding this comment.
[nitpick] The buffer capacity 4096 is a magic number; consider defining a named constant for clarity.
| let framed = Framed::with_capacity(tcp_stream, BytesCodec::new(), 1024 * 4); | |
| let framed = Framed::with_capacity(tcp_stream, BytesCodec::new(), BUFFER_CAPACITY); |
| let len = buffer_header.len(); | ||
| let result = match socket.send(buffer_header).await { |
There was a problem hiding this comment.
[nitpick] Sending header and data in separate calls increases syscall overhead; consider combining them into one buffer or using a batch send to improve throughput.
| let len = buffer_header.len(); | |
| let result = match socket.send(buffer_header).await { | |
| let mut combined_buffer = BytesMut::new(); | |
| combined_buffer.put(buffer_header); | |
| if let Some(data) = select_mapped_buffer { | |
| combined_buffer.put(data); | |
| } | |
| let len = combined_buffer.len(); | |
| match socket.send(combined_buffer.freeze()).await { |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
rocketmq-store/src/ha/default_ha_connection.rs (1)
116-118: Consider making the buffer size configurable.The framed approach with
BytesCodecis appropriate for this use case. However, the hardcoded buffer size of 4KB might benefit from being configurable viaMessageStoreConfigto allow tuning based on deployment requirements.- let framed = Framed::with_capacity(tcp_stream, BytesCodec::new(), 1024 * 4); + let buffer_size = self.message_store_config.ha_socket_buffer_size.unwrap_or(1024 * 4); + let framed = Framed::with_capacity(tcp_stream, BytesCodec::new(), buffer_size);
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
rocketmq-store/Cargo.toml(1 hunks)rocketmq-store/src/ha/default_ha_connection.rs(15 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: auto-approve
- GitHub Check: test
- GitHub Check: build
🔇 Additional comments (5)
rocketmq-store/Cargo.toml (1)
34-34: LGTM! Necessary dependency for the HA connection refactoring.The addition of
tokio-utildependency supports the migration to framed stream handling in the HA connection layer.rocketmq-store/src/ha/default_ha_connection.rs (4)
27-30: LGTM! Necessary imports for framed socket handling.The added imports support the migration from raw socket operations to the framed codec approach using
tokio_utilandfutures_util.Also applies to: 36-37
234-234: LGTM! Type signatures correctly updated for framed streams.The type changes from
OwnedReadHalftoSplitStream<Framed<TcpStream, BytesCodec>>are consistent with the new framed codec approach.Also applies to: 247-247, 297-297
412-412: LGTM! WriteSocketService types correctly updated.The type changes to use
SplitSink<Framed<TcpStream, BytesCodec>, Bytes>and the addition ofmutkeyword are appropriate for the new sink-based approach.Also applies to: 425-425, 475-475
587-624: LGTM! Properly adapted for sink-based data transfer.The changes correctly adapt the data transfer logic:
- Method signature updated to use
Bytesinstead of byte slicessend()operations replacewrite_all()for sink interface- Flow monitoring correctly updated for new data types
The error handling and logic flow remain consistent with the original implementation.
| // ha_service.remove_connection(&client_address).await; | ||
| info!("ReadSocketService ended for client: {}", client_address); | ||
| info!("ReadSocketService ended for client: {}", client_address);*/ | ||
| unimplemented!("ReadSocketService is not implemented yet"); |
There was a problem hiding this comment.
Critical: ReadSocketService is completely disabled.
The read service implementation has been replaced with unimplemented!(), which means:
- Slave requests cannot be processed
- Slave acknowledgment offsets won't be updated
- HA replication functionality is broken
This change would cause runtime panics when the read service is called.
Either:
- Complete the implementation for the new framed approach, or
- Revert to the previous working implementation until the migration is complete
Would you like me to help implement the read service using the new framed stream approach based on the commented code?
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_connection.rs at line 399, the
ReadSocketService is currently disabled with unimplemented!(), causing runtime
panics and breaking HA replication. To fix this, either fully implement the read
service using the new framed stream approach as indicated by the commented code
or revert to the previous working implementation until the migration is complete
to ensure slave requests and acknowledgments function correctly.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3520 +/- ##
==========================================
+ Coverage 26.18% 26.20% +0.01%
==========================================
Files 555 555
Lines 78648 78594 -54
==========================================
Hits 20593 20593
+ Misses 58055 58001 -54 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Which Issue(s) This PR Fixes(Closes)
Fixes #3519
Brief Description
How Did You Test This Change?
Summary by CodeRabbit