Skip to content

Commit 94f20fd

Browse files
authored
[ISSUE #3510]🚀Implement Comprehensive High Availability (HA) Subsystem Architecture with Enhanced Service Infrastructure✨ (#3511)
1 parent 145a8fc commit 94f20fd

File tree

7 files changed

+91
-15
lines changed

7 files changed

+91
-15
lines changed

rocketmq-store/src/ha/default_ha_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,14 @@ impl DefaultHAClient {
110110
/// Create a new DefaultHAClient
111111
pub fn new(
112112
default_message_store: ArcMut<LocalFileMessageStore>,
113-
) -> Result<Arc<Self>, HAClientError> {
113+
) -> Result<ArcMut<Self>, HAClientError> {
114114
let flow_monitor = Arc::new(FlowMonitor::new(
115115
default_message_store.message_store_config(),
116116
));
117117

118118
let now = get_current_millis() as i64;
119119

120-
Ok(Arc::new(Self {
120+
Ok(ArcMut::new(Self {
121121
master_ha_address: Arc::new(RwLock::new(None)),
122122
master_address: Arc::new(RwLock::new(None)),
123123
socket_stream: Arc::new(RwLock::new(None)),

rocketmq-store/src/ha/default_ha_service.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,15 @@ use std::sync::atomic::AtomicI64;
3636
use std::sync::atomic::AtomicU64;
3737
use std::sync::Arc;
3838

39+
use rocketmq_common::common::broker::broker_role::BrokerRole;
3940
use rocketmq_remoting::protocol::body::ha_runtime_info::HARuntimeInfo;
4041
use rocketmq_rust::ArcMut;
4142
use tracing::error;
4243

44+
use crate::ha::default_ha_client::DefaultHAClient;
4345
use crate::ha::general_ha_client::GeneralHAClient;
4446
use crate::ha::general_ha_connection::GeneralHAConnection;
47+
use crate::ha::general_ha_service::GeneralHAService;
4548
use crate::ha::group_transfer_service::GroupTransferService;
4649
use crate::ha::ha_client::HAClient;
4750
use crate::ha::ha_connection::HAConnection;
@@ -51,6 +54,7 @@ use crate::ha::ha_service::HAService;
5154
use crate::ha::wait_notify_object::WaitNotifyObject;
5255
use crate::log_file::flush_manager_impl::group_commit_request::GroupCommitRequest;
5356
use crate::message_store::local_file_message_store::LocalFileMessageStore;
57+
use crate::store_error::HAError;
5458
use crate::store_error::HAResult;
5559

5660
pub struct DefaultHAService {
@@ -62,7 +66,7 @@ pub struct DefaultHAService {
6266
push2_slave_max_offset: Arc<AtomicU64>,
6367
group_transfer_service: Option<GroupTransferService>,
6468
ha_client: GeneralHAClient,
65-
ha_connection_state_notification_service: HAConnectionStateNotificationService,
69+
ha_connection_state_notification_service: Option<HAConnectionStateNotificationService>,
6670
}
6771

6872
impl DefaultHAService {
@@ -76,7 +80,7 @@ impl DefaultHAService {
7680
push2_slave_max_offset: Arc::new(AtomicU64::new(0)),
7781
group_transfer_service: None,
7882
ha_client: GeneralHAClient::new(),
79-
ha_connection_state_notification_service: HAConnectionStateNotificationService,
83+
ha_connection_state_notification_service: None,
8084
}
8185
}
8286

@@ -91,8 +95,22 @@ impl DefaultHAService {
9195
unimplemented!(" notify_transfer_some method is not implemented");
9296
}
9397

94-
pub(crate) fn init(&mut self) -> HAResult<()> {
98+
pub(crate) fn init(&mut self, this: ArcMut<Self>) -> HAResult<()> {
9599
// Initialize the DefaultHAService with the provided message store.
100+
let config = self.default_message_store.get_message_store_config();
101+
let service = GeneralHAService::new_with_default_ha_service(this.clone());
102+
let group_transfer_service = GroupTransferService::new(config.clone(), service.clone());
103+
self.group_transfer_service = Some(group_transfer_service);
104+
105+
if config.broker_role == BrokerRole::Slave {
106+
let default_message_store = self.default_message_store.clone();
107+
let client = DefaultHAClient::new(default_message_store)
108+
.map_err(|e| HAError::Service(format!("Failed to create DefaultHAClient: {e}")))?;
109+
self.ha_client.set_default_ha_service(client)
110+
}
111+
let state_notification_service =
112+
HAConnectionStateNotificationService::new(service, self.default_message_store.clone());
113+
self.ha_connection_state_notification_service = Some(state_notification_service);
96114
Ok(())
97115
}
98116
}

rocketmq-store/src/ha/general_ha_client.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use rocketmq_rust::ArcMut;
1718

1819
use crate::ha::auto_switch::auto_switch_ha_client::AutoSwitchHAClient;
1920
use crate::ha::default_ha_client::DefaultHAClient;
2021

2122
pub struct GeneralHAClient {
22-
default_ha_service: Option<DefaultHAClient>,
23+
default_ha_service: Option<ArcMut<DefaultHAClient>>,
2324
auto_switch_ha_service: Option<AutoSwitchHAClient>,
2425
}
2526

@@ -37,7 +38,7 @@ impl GeneralHAClient {
3738
}
3839
}
3940

40-
pub fn set_default_ha_service(&mut self, service: DefaultHAClient) {
41+
pub fn set_default_ha_service(&mut self, service: ArcMut<DefaultHAClient>) {
4142
self.default_ha_service = Some(service);
4243
}
4344

rocketmq-store/src/ha/general_ha_service.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use rocketmq_remoting::protocol::body::ha_runtime_info::HARuntimeInfo;
2323
use rocketmq_rust::ArcMut;
2424
use tracing::error;
2525

26-
use crate::base::message_store::MessageStore;
2726
use crate::ha::auto_switch::auto_switch_ha_service::AutoSwitchHAService;
2827
use crate::ha::default_ha_service::DefaultHAService;
2928
use crate::ha::ha_client::HAClient;
@@ -36,21 +35,37 @@ use crate::message_store::local_file_message_store::LocalFileMessageStore;
3635
use crate::store_error::HAError;
3736
use crate::store_error::HAResult;
3837

38+
#[derive(Clone)]
3939
pub struct GeneralHAService {
40-
default_ha_service: Option<DefaultHAService>,
41-
auto_switch_ha_service: Option<AutoSwitchHAService>,
40+
default_ha_service: Option<ArcMut<DefaultHAService>>,
41+
auto_switch_ha_service: Option<ArcMut<AutoSwitchHAService>>,
4242
}
4343

4444
impl GeneralHAService {
45+
pub fn new() -> Self {
46+
GeneralHAService {
47+
default_ha_service: None,
48+
auto_switch_ha_service: None,
49+
}
50+
}
51+
52+
pub fn new_with_default_ha_service(default_ha_service: ArcMut<DefaultHAService>) -> Self {
53+
GeneralHAService {
54+
default_ha_service: Some(default_ha_service),
55+
auto_switch_ha_service: None,
56+
}
57+
}
58+
4559
pub(crate) fn init(&mut self, message_store: ArcMut<LocalFileMessageStore>) -> HAResult<()> {
4660
if message_store
4761
.get_message_store_config()
4862
.enable_controller_mode
4963
{
50-
self.auto_switch_ha_service = Some(AutoSwitchHAService)
64+
self.auto_switch_ha_service = Some(ArcMut::new(AutoSwitchHAService))
5165
} else {
52-
let mut default_ha_service = DefaultHAService::new(message_store);
53-
default_ha_service.init()?;
66+
let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store));
67+
let default_ha_service_clone = default_ha_service.clone();
68+
DefaultHAService::init(&mut default_ha_service, default_ha_service_clone)?;
5469
self.default_ha_service = Some(default_ha_service);
5570
}
5671
Ok(())

rocketmq-store/src/ha/group_transfer_service.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,24 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::sync::Arc;
1718

18-
pub struct GroupTransferService;
19+
use crate::config::message_store_config::MessageStoreConfig;
20+
use crate::ha::general_ha_service::GeneralHAService;
21+
22+
pub struct GroupTransferService {
23+
message_store_config: Arc<MessageStoreConfig>,
24+
ha_service: GeneralHAService,
25+
}
26+
27+
impl GroupTransferService {
28+
pub fn new(
29+
message_store_config: Arc<MessageStoreConfig>,
30+
ha_service: GeneralHAService,
31+
) -> Self {
32+
GroupTransferService {
33+
message_store_config,
34+
ha_service,
35+
}
36+
}
37+
}

rocketmq-store/src/ha/ha_connection_state_notification_service.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,24 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use rocketmq_rust::ArcMut;
1718

18-
pub struct HAConnectionStateNotificationService;
19+
use crate::ha::general_ha_service::GeneralHAService;
20+
use crate::message_store::local_file_message_store::LocalFileMessageStore;
21+
22+
pub struct HAConnectionStateNotificationService {
23+
ha_service: GeneralHAService,
24+
default_message_store: ArcMut<LocalFileMessageStore>,
25+
}
26+
27+
impl HAConnectionStateNotificationService {
28+
pub fn new(
29+
ha_service: GeneralHAService,
30+
default_message_store: ArcMut<LocalFileMessageStore>,
31+
) -> Self {
32+
HAConnectionStateNotificationService {
33+
ha_service,
34+
default_message_store,
35+
}
36+
}
37+
}

rocketmq-store/src/message_store/local_file_message_store.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,10 @@ impl LocalFileMessageStore {
531531
fn do_recheck_reput_offset_from_cq(&self) {
532532
error!("do_recheck_reput_offset_from_cq called, not implemented yet");
533533
}
534+
535+
pub fn get_message_store_config(&self) -> Arc<MessageStoreConfig> {
536+
self.message_store_config.clone()
537+
}
534538
}
535539

536540
fn estimate_in_mem_by_commit_offset(

0 commit comments

Comments
 (0)