Skip to content

Commit a1ccfa1

Browse files
committed
[ISSUE #3512]🚀Async High Availability (HA) Service Architecture Enhancement
1 parent 94f20fd commit a1ccfa1

14 files changed

+325
-38
lines changed

rocketmq-broker/src/broker_runtime.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -779,10 +779,11 @@ impl BrokerRuntime {
779779

780780
fn initial_request_pipeline(&mut self) {}
781781

782-
fn start_basic_service(&mut self) {
782+
async fn start_basic_service(&mut self) {
783783
if let Some(ref mut message_store) = self.inner.message_store {
784784
message_store
785785
.start()
786+
.await
786787
.unwrap_or_else(|e| panic!("Failed to start message store: {e}"));
787788
} else {
788789
panic!("Message store is not initialized");
@@ -890,7 +891,7 @@ impl BrokerRuntime {
890891
}
891892

892893
self.inner.broker_outer_api.start().await;
893-
self.start_basic_service();
894+
self.start_basic_service().await;
894895

895896
if !self.inner.is_isolated.load(Ordering::Acquire)
896897
&& !self.inner.message_store_config.enable_dledger_commit_log

rocketmq-store/src/base/message_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub trait MessageStoreInner: Sync + 'static {
6565
async fn load(&mut self) -> bool;
6666

6767
/// Launch this message store.
68-
fn start(&mut self) -> Result<(), StoreError>;
68+
async fn start(&mut self) -> Result<(), StoreError>;
6969

7070
/// Shutdown this message store.
7171
fn shutdown(&mut self);

rocketmq-store/src/config/message_store_config.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,9 @@ mod defaults {
277277
pub fn ha_send_heartbeat_interval() -> u64 {
278278
1000 * 5
279279
}
280+
pub fn ha_listen_port() -> usize {
281+
10912
282+
}
280283
}
281284

282285
#[derive(Clone, Debug, Deserialize, PartialEq)]
@@ -510,7 +513,7 @@ pub struct MessageStoreConfig {
510513
#[serde(default)]
511514
pub message_index_safe: bool,
512515

513-
#[serde(default)]
516+
#[serde(default = "defaults::ha_listen_port")]
514517
pub ha_listen_port: usize,
515518

516519
#[serde(default = "defaults::ha_send_heartbeat_interval")]
@@ -899,7 +902,7 @@ impl Default for MessageStoreConfig {
899902
max_index_num: 5000000 * 4,
900903
max_msgs_num_batch: 64,
901904
message_index_safe: false,
902-
ha_listen_port: 0,
905+
ha_listen_port: 10912,
903906
ha_send_heartbeat_interval: 1000 * 5,
904907
ha_housekeeping_interval: 1000 * 20,
905908
ha_transfer_batch_size: 0,

rocketmq-store/src/ha/auto_switch/auto_switch_ha_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::store_error::HAResult;
3333
pub struct AutoSwitchHAService;
3434

3535
impl HAService for AutoSwitchHAService {
36-
fn start(&mut self) -> HAResult<()> {
36+
async fn start(&mut self) -> HAResult<()> {
3737
error!("DefaultHAService start not implemented");
3838
Ok(())
3939
}

rocketmq-store/src/ha/default_ha_client.rs

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use tracing::warn;
3939

4040
use crate::base::message_store::MessageStore;
4141
use crate::ha::flow_monitor::FlowMonitor;
42+
use crate::ha::ha_client::HAClient;
4243
use crate::ha::ha_connection_state::HAConnectionState;
4344
use crate::message_store::local_file_message_store::LocalFileMessageStore;
4445

@@ -609,9 +610,9 @@ impl DefaultHAClient {
609610
sleep(Duration::from_secs(5)).await;
610611
}
611612

612-
/// Start the HA client service
613-
pub async fn start(self: Arc<Self>) -> Result<(), HAClientError> {
614-
let self_clone = Arc::clone(&self);
613+
// Start the HA client service
614+
/* pub async fn start(self: ArcMut<Self>) -> Result<(), HAClientError> {
615+
let self_clone = ArcMut::clone(&self);
615616
let handle = tokio::spawn(async move {
616617
self_clone.run_service().await;
617618
});
@@ -620,7 +621,7 @@ impl DefaultHAClient {
620621
*service_handle = Some(handle);
621622
622623
Ok(())
623-
}
624+
}*/
624625

625626
/// Shutdown the HA client
626627
pub async fn shutdown(self: Arc<Self>) {
@@ -663,6 +664,59 @@ impl DefaultHAClient {
663664
}
664665
}
665666

667+
impl HAClient for DefaultHAClient {
668+
async fn start(&self) {
669+
error!("GeneralHAService does not implement start directly, use specific service");
670+
}
671+
672+
async fn shutdown(&self) {
673+
todo!()
674+
}
675+
676+
async fn wakeup(&self) {
677+
todo!()
678+
}
679+
680+
async fn update_master_address(&self, new_address: &str) {
681+
todo!()
682+
}
683+
684+
async fn update_ha_master_address(&self, new_address: &str) {
685+
todo!()
686+
}
687+
688+
fn get_master_address(&self) -> String {
689+
todo!()
690+
}
691+
692+
fn get_ha_master_address(&self) -> String {
693+
todo!()
694+
}
695+
696+
fn get_last_read_timestamp(&self) -> i64 {
697+
todo!()
698+
}
699+
700+
fn get_last_write_timestamp(&self) -> i64 {
701+
todo!()
702+
}
703+
704+
fn get_current_state(&self) -> HAConnectionState {
705+
todo!()
706+
}
707+
708+
fn change_current_state(&self, ha_connection_state: HAConnectionState) {
709+
todo!()
710+
}
711+
712+
async fn close_master(&self) {
713+
todo!()
714+
}
715+
716+
fn get_transferred_byte_in_second(&self) -> i64 {
717+
todo!()
718+
}
719+
}
666720
/// Error types
667721
#[derive(Debug, thiserror::Error)]
668722
pub enum HAClientError {

rocketmq-store/src/ha/default_ha_connection.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::time::Instant;
2424
use bytes::BufMut;
2525
use bytes::Bytes;
2626
use bytes::BytesMut;
27+
use rocketmq_rust::ArcMut;
2728
use tokio::io::AsyncReadExt;
2829
use tokio::io::AsyncWriteExt;
2930
use tokio::net::tcp::OwnedReadHalf;
@@ -48,7 +49,7 @@ use crate::ha::ha_connection_state::HAConnectionState;
4849
pub const TRANSFER_HEADER_SIZE: usize = 8 + 4;
4950

5051
pub struct DefaultHAConnection {
51-
ha_service: Arc<DefaultHAService>,
52+
ha_service: ArcMut<DefaultHAService>,
5253
socket_stream: Option<TcpStream>,
5354
client_address: String,
5455
write_socket_service: Option<WriteSocketService>,
@@ -64,7 +65,7 @@ pub struct DefaultHAConnection {
6465
impl DefaultHAConnection {
6566
/// Create a new DefaultHAConnection
6667
pub async fn new(
67-
ha_service: Arc<DefaultHAService>,
68+
ha_service: ArcMut<DefaultHAService>,
6869
socket_stream: TcpStream,
6970
message_store_config: Arc<MessageStoreConfig>,
7071
) -> Result<Self, HAConnectionError> {
@@ -83,7 +84,9 @@ impl DefaultHAConnection {
8384
let flow_monitor = Arc::new(FlowMonitor::new(message_store_config.clone()));
8485

8586
// Increment connection count
86-
// ha_service.increment_connection_count();
87+
ha_service
88+
.get_connection_count()
89+
.fetch_add(1, Ordering::SeqCst);
8790

8891
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);
8992

@@ -116,7 +119,7 @@ impl DefaultHAConnection {
116119
let read_service = ReadSocketService::new(
117120
reader,
118121
self.client_address.clone(),
119-
Arc::clone(&self.ha_service),
122+
ArcMut::clone(&self.ha_service),
120123
Arc::clone(&self.current_state),
121124
self.slave_request_offset.clone(),
122125
self.slave_ack_offset.clone(),
@@ -128,7 +131,7 @@ impl DefaultHAConnection {
128131
let write_service = WriteSocketService::new(
129132
writer,
130133
self.client_address.clone(),
131-
Arc::clone(&self.ha_service),
134+
ArcMut::clone(&self.ha_service),
132135
Arc::clone(&self.current_state),
133136
self.slave_request_offset.clone(),
134137
Arc::clone(&self.flow_monitor),
@@ -228,7 +231,7 @@ const REPORT_HEADER_SIZE: usize = 8;
228231
pub struct ReadSocketService {
229232
reader: Option<OwnedReadHalf>,
230233
client_address: String,
231-
ha_service: Arc<DefaultHAService>,
234+
ha_service: ArcMut<DefaultHAService>,
232235
current_state: Arc<RwLock<HAConnectionState>>,
233236
slave_request_offset: Arc<AtomicI64>,
234237
slave_ack_offset: Arc<AtomicI64>,
@@ -241,7 +244,7 @@ impl ReadSocketService {
241244
pub async fn new(
242245
reader: OwnedReadHalf,
243246
client_address: String,
244-
ha_service: Arc<DefaultHAService>,
247+
ha_service: ArcMut<DefaultHAService>,
245248
current_state: Arc<RwLock<HAConnectionState>>,
246249
slave_request_offset: Arc<AtomicI64>,
247250
slave_ack_offset: Arc<AtomicI64>,
@@ -265,7 +268,7 @@ impl ReadSocketService {
265268
pub async fn start(&mut self) -> Result<(), HAConnectionError> {
266269
let socket_stream = self.reader.take();
267270
let client_address = self.client_address.clone();
268-
let ha_service = Arc::clone(&self.ha_service);
271+
let ha_service = ArcMut::clone(&self.ha_service);
269272
let current_state = Arc::clone(&self.current_state);
270273
let slave_request_offset = self.slave_request_offset.clone();
271274
let slave_ack_offset = self.slave_ack_offset.clone();
@@ -291,7 +294,7 @@ impl ReadSocketService {
291294
async fn run_service(
292295
mut socket_stream: Option<OwnedReadHalf>,
293296
client_address: String,
294-
ha_service: Arc<DefaultHAService>,
297+
ha_service: ArcMut<DefaultHAService>,
295298
current_state: Arc<RwLock<HAConnectionState>>,
296299
slave_request_offset: Arc<AtomicI64>,
297300
slave_ack_offset: Arc<AtomicI64>,
@@ -404,7 +407,7 @@ impl ReadSocketService {
404407
pub struct WriteSocketService {
405408
writer: Option<OwnedWriteHalf>,
406409
client_address: String,
407-
ha_service: Arc<DefaultHAService>,
410+
ha_service: ArcMut<DefaultHAService>,
408411
current_state: Arc<RwLock<HAConnectionState>>,
409412
slave_request_offset: Arc<AtomicI64>,
410413
flow_monitor: Arc<FlowMonitor>,
@@ -417,7 +420,7 @@ impl WriteSocketService {
417420
pub async fn new(
418421
writer: OwnedWriteHalf,
419422
client_address: String,
420-
ha_service: Arc<DefaultHAService>,
423+
ha_service: ArcMut<DefaultHAService>,
421424
current_state: Arc<RwLock<HAConnectionState>>,
422425
slave_request_offset: Arc<AtomicI64>,
423426
flow_monitor: Arc<FlowMonitor>,
@@ -439,7 +442,7 @@ impl WriteSocketService {
439442
pub async fn start(&mut self) -> Result<(), HAConnectionError> {
440443
let socket_stream = self.writer.take();
441444
let client_address = self.client_address.clone();
442-
let ha_service = Arc::clone(&self.ha_service);
445+
let ha_service = ArcMut::clone(&self.ha_service);
443446
let current_state = Arc::clone(&self.current_state);
444447
let slave_request_offset = self.slave_request_offset.clone();
445448
let flow_monitor = Arc::clone(&self.flow_monitor);
@@ -467,7 +470,7 @@ impl WriteSocketService {
467470
async fn run_service(
468471
mut socket_stream: Option<OwnedWriteHalf>,
469472
client_address: String,
470-
ha_service: Arc<DefaultHAService>,
473+
ha_service: ArcMut<DefaultHAService>,
471474
current_state: Arc<RwLock<HAConnectionState>>,
472475
slave_request_offset: Arc<AtomicI64>,
473476
flow_monitor: Arc<FlowMonitor>,

0 commit comments

Comments
 (0)