Skip to content

Commit 39397bd

Browse files
authored
[ISSUE #3519]⚡️Async socket handling migration using tokio-util for HA connection layer (#3520)
1 parent 8711443 commit 39397bd

File tree

3 files changed

+32
-25
lines changed

3 files changed

+32
-25
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rocketmq-store/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ bytes.workspace = true
3131

3232
#tokio
3333
tokio.workspace = true
34-
34+
tokio-util.workspace = true
3535

3636
#log
3737
tracing.workspace = true

rocketmq-store/src/ha/default_ha_connection.rs

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,17 @@ use std::time::Instant;
2424
use bytes::BufMut;
2525
use bytes::Bytes;
2626
use bytes::BytesMut;
27+
use futures_util::stream::SplitSink;
28+
use futures_util::stream::SplitStream;
29+
use futures_util::SinkExt;
30+
use futures_util::StreamExt;
2731
use rocketmq_rust::ArcMut;
28-
use tokio::io::AsyncReadExt;
29-
use tokio::io::AsyncWriteExt;
30-
use tokio::net::tcp::OwnedReadHalf;
31-
use tokio::net::tcp::OwnedWriteHalf;
3232
use tokio::net::TcpStream;
3333
use tokio::sync::mpsc;
3434
use tokio::sync::RwLock;
3535
use tokio::time::sleep;
36-
use tokio::time::timeout;
36+
use tokio_util::codec::BytesCodec;
37+
use tokio_util::codec::Framed;
3738
use tracing::error;
3839
use tracing::info;
3940
use tracing::warn;
@@ -112,8 +113,9 @@ impl DefaultHAConnection {
112113
// Start flow monitor
113114
self.flow_monitor.start().await;
114115

115-
let socket_stream = self.socket_stream.take().unwrap();
116-
let (reader, writer) = socket_stream.into_split();
116+
let tcp_stream = self.socket_stream.take().unwrap();
117+
let framed = Framed::with_capacity(tcp_stream, BytesCodec::new(), 1024 * 4);
118+
let (writer, reader) = framed.split();
117119

118120
// Create and start read service
119121
let read_service = ReadSocketService::new(
@@ -229,7 +231,7 @@ const REPORT_HEADER_SIZE: usize = 8;
229231
/// The main node processes requests from the slave nodes, reads the maximum request offset of the
230232
/// slave nodes.
231233
pub struct ReadSocketService {
232-
reader: Option<OwnedReadHalf>,
234+
reader: Option<SplitStream<Framed<TcpStream, BytesCodec>>>,
233235
client_address: String,
234236
ha_service: ArcMut<DefaultHAService>,
235237
current_state: Arc<RwLock<HAConnectionState>>,
@@ -242,7 +244,7 @@ pub struct ReadSocketService {
242244

243245
impl ReadSocketService {
244246
pub async fn new(
245-
reader: OwnedReadHalf,
247+
reader: SplitStream<Framed<TcpStream, BytesCodec>>,
246248
client_address: String,
247249
ha_service: ArcMut<DefaultHAService>,
248250
current_state: Arc<RwLock<HAConnectionState>>,
@@ -292,15 +294,15 @@ impl ReadSocketService {
292294
}
293295

294296
async fn run_service(
295-
mut socket_stream: Option<OwnedReadHalf>,
297+
socket_stream: Option<SplitStream<Framed<TcpStream, BytesCodec>>>,
296298
client_address: String,
297299
ha_service: ArcMut<DefaultHAService>,
298300
current_state: Arc<RwLock<HAConnectionState>>,
299301
slave_request_offset: Arc<AtomicI64>,
300302
slave_ack_offset: Arc<AtomicI64>,
301303
message_store_config: Arc<MessageStoreConfig>,
302304
) {
303-
info!("ReadSocketService started for client: {}", client_address);
305+
/* info!("ReadSocketService started for client: {}", client_address);
304306
305307
let mut buffer = vec![0u8; READ_MAX_BUFFER_SIZE];
306308
let mut process_position = 0usize;
@@ -323,7 +325,7 @@ impl ReadSocketService {
323325
}
324326
match timeout(
325327
Duration::from_secs(1),
326-
socket.read(&mut buffer[process_position..]),
328+
socket.next(),
327329
)
328330
.await
329331
{
@@ -378,6 +380,7 @@ impl ReadSocketService {
378380
break;
379381
}
380382
}
383+
_ => {}
381384
}
382385
}
383386
} else {
@@ -392,7 +395,8 @@ impl ReadSocketService {
392395
}
393396
394397
// ha_service.remove_connection(&client_address).await;
395-
info!("ReadSocketService ended for client: {}", client_address);
398+
info!("ReadSocketService ended for client: {}", client_address);*/
399+
unimplemented!("ReadSocketService is not implemented yet");
396400
}
397401

398402
pub async fn shutdown(&mut self) {
@@ -405,7 +409,7 @@ impl ReadSocketService {
405409

406410
/// Write Socket Service
407411
pub struct WriteSocketService {
408-
writer: Option<OwnedWriteHalf>,
412+
writer: Option<SplitSink<Framed<TcpStream, BytesCodec>, Bytes>>,
409413
client_address: String,
410414
ha_service: ArcMut<DefaultHAService>,
411415
current_state: Arc<RwLock<HAConnectionState>>,
@@ -418,7 +422,7 @@ pub struct WriteSocketService {
418422

419423
impl WriteSocketService {
420424
pub async fn new(
421-
writer: OwnedWriteHalf,
425+
writer: SplitSink<Framed<TcpStream, BytesCodec>, Bytes>,
422426
client_address: String,
423427
ha_service: ArcMut<DefaultHAService>,
424428
current_state: Arc<RwLock<HAConnectionState>>,
@@ -468,7 +472,7 @@ impl WriteSocketService {
468472
}
469473

470474
async fn run_service(
471-
mut socket_stream: Option<OwnedWriteHalf>,
475+
mut socket_stream: Option<SplitSink<Framed<TcpStream, BytesCodec>, Bytes>>,
472476
client_address: String,
473477
ha_service: ArcMut<DefaultHAService>,
474478
current_state: Arc<RwLock<HAConnectionState>>,
@@ -535,7 +539,7 @@ impl WriteSocketService {
535539
byte_buffer_header.put_i32(0);
536540
let header = byte_buffer_header.split().freeze();
537541
last_write_over =
538-
Self::transfer_data(&mut socket_stream, &header, None, &flow_monitor).await;
542+
Self::transfer_data(&mut socket_stream, header, None, &flow_monitor).await;
539543
if !last_write_over {
540544
continue;
541545
}
@@ -570,7 +574,7 @@ impl WriteSocketService {
570574
// If the data is larger than the size, slice it
571575
bytes.slice(..size as usize)
572576
});
573-
Self::transfer_data(&mut socket_stream, &header, data_buffer, &flow_monitor).await;
577+
Self::transfer_data(&mut socket_stream, header, data_buffer, &flow_monitor).await;
574578
} else {
575579
// No data available, wait
576580
sleep(Duration::from_millis(100)).await;
@@ -581,15 +585,16 @@ impl WriteSocketService {
581585
}
582586

583587
async fn transfer_data(
584-
socket_stream: &mut Option<OwnedWriteHalf>,
585-
buffer_header: &[u8],
588+
socket_stream: &mut Option<SplitSink<Framed<TcpStream, BytesCodec>, Bytes>>,
589+
buffer_header: Bytes,
586590
select_mapped_buffer: Option<Bytes>,
587591
flow_monitor: &Arc<FlowMonitor>,
588592
) -> bool {
589593
if let Some(ref mut socket) = socket_stream {
590-
let result = match socket.write_all(buffer_header).await {
594+
let len = buffer_header.len();
595+
let result = match socket.send(buffer_header).await {
591596
Ok(_) => {
592-
flow_monitor.add_byte_count_transferred(buffer_header.len() as i64);
597+
flow_monitor.add_byte_count_transferred(len as i64);
593598
true
594599
}
595600
Err(e) => {
@@ -599,9 +604,10 @@ impl WriteSocketService {
599604
};
600605

601606
if let Some(data) = select_mapped_buffer {
602-
match socket.write_all(data.as_ref()).await {
607+
let len = data.len();
608+
match socket.send(data).await {
603609
Ok(_) => {
604-
flow_monitor.add_byte_count_transferred(data.len() as i64);
610+
flow_monitor.add_byte_count_transferred(len as i64);
605611
true
606612
}
607613
Err(e) => {

0 commit comments

Comments
 (0)