Skip to content

Commit 145a8fc

Browse files
authored
[ISSUE #3505]🚀Implement comprehensive HA service infrastructure with GroupTransfer and ConnectionState services✨ (#3506)
1 parent 644d4ea commit 145a8fc

File tree

6 files changed

+98
-5
lines changed

6 files changed

+98
-5
lines changed

rocketmq-store/src/ha.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ pub(crate) mod flow_monitor;
2323
pub(crate) mod general_ha_client;
2424
pub(crate) mod general_ha_connection;
2525
pub(crate) mod general_ha_service;
26+
mod group_transfer_service;
2627
pub(crate) mod ha_client;
2728
pub(crate) mod ha_connection;
2829
pub(crate) mod ha_connection_state;
2930
pub(crate) mod ha_connection_state_notification_request;
31+
mod ha_connection_state_notification_service;
3032
pub(crate) mod ha_service;
3133
pub(crate) mod wait_notify_object;

rocketmq-store/src/ha/default_ha_service.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,53 @@
3333

3434
use std::sync::atomic::AtomicI32;
3535
use std::sync::atomic::AtomicI64;
36+
use std::sync::atomic::AtomicU64;
3637
use std::sync::Arc;
3738

3839
use rocketmq_remoting::protocol::body::ha_runtime_info::HARuntimeInfo;
3940
use rocketmq_rust::ArcMut;
4041
use tracing::error;
4142

43+
use crate::ha::general_ha_client::GeneralHAClient;
44+
use crate::ha::general_ha_connection::GeneralHAConnection;
45+
use crate::ha::group_transfer_service::GroupTransferService;
4246
use crate::ha::ha_client::HAClient;
4347
use crate::ha::ha_connection::HAConnection;
4448
use crate::ha::ha_connection_state_notification_request::HAConnectionStateNotificationRequest;
49+
use crate::ha::ha_connection_state_notification_service::HAConnectionStateNotificationService;
4550
use crate::ha::ha_service::HAService;
4651
use crate::ha::wait_notify_object::WaitNotifyObject;
4752
use crate::log_file::flush_manager_impl::group_commit_request::GroupCommitRequest;
4853
use crate::message_store::local_file_message_store::LocalFileMessageStore;
4954
use crate::store_error::HAResult;
5055

51-
#[derive(Default)]
52-
pub struct DefaultHAService {}
56+
pub struct DefaultHAService {
57+
connection_count: Arc<AtomicU64>,
58+
connection_list: Vec<GeneralHAConnection>,
59+
accept_socket_service: AcceptSocketService,
60+
default_message_store: ArcMut<LocalFileMessageStore>,
61+
wait_notify_object: Arc<WaitNotifyObject>,
62+
push2_slave_max_offset: Arc<AtomicU64>,
63+
group_transfer_service: Option<GroupTransferService>,
64+
ha_client: GeneralHAClient,
65+
ha_connection_state_notification_service: HAConnectionStateNotificationService,
66+
}
5367

5468
impl DefaultHAService {
69+
pub fn new(message_store: ArcMut<LocalFileMessageStore>) -> Self {
70+
DefaultHAService {
71+
connection_count: Arc::new(AtomicU64::new(0)),
72+
connection_list: Vec::new(),
73+
accept_socket_service: AcceptSocketService,
74+
default_message_store: message_store,
75+
wait_notify_object: Arc::new(WaitNotifyObject),
76+
push2_slave_max_offset: Arc::new(AtomicU64::new(0)),
77+
group_transfer_service: None,
78+
ha_client: GeneralHAClient::new(),
79+
ha_connection_state_notification_service: HAConnectionStateNotificationService,
80+
}
81+
}
82+
5583
// Add any necessary fields here
5684
pub fn get_default_message_store(&self) -> &LocalFileMessageStore {
5785
unimplemented!(" get_default_message_store method is not implemented");
@@ -63,7 +91,7 @@ impl DefaultHAService {
6391
unimplemented!(" notify_transfer_some method is not implemented");
6492
}
6593

66-
pub(crate) fn init(&mut self, message_store: ArcMut<LocalFileMessageStore>) -> HAResult<()> {
94+
pub(crate) fn init(&mut self) -> HAResult<()> {
6795
// Initialize the DefaultHAService with the provided message store.
6896
Ok(())
6997
}
@@ -152,3 +180,5 @@ impl HAService for DefaultHAService {
152180
todo!()
153181
}
154182
}
183+
184+
struct AcceptSocketService;

rocketmq-store/src/ha/general_ha_client.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,28 @@ pub struct GeneralHAClient {
2222
default_ha_service: Option<DefaultHAClient>,
2323
auto_switch_ha_service: Option<AutoSwitchHAClient>,
2424
}
25+
26+
impl Default for GeneralHAClient {
27+
fn default() -> Self {
28+
GeneralHAClient::new()
29+
}
30+
}
31+
32+
impl GeneralHAClient {
33+
pub fn new() -> Self {
34+
GeneralHAClient {
35+
default_ha_service: None,
36+
auto_switch_ha_service: None,
37+
}
38+
}
39+
40+
pub fn set_default_ha_service(&mut self, service: DefaultHAClient) {
41+
self.default_ha_service = Some(service);
42+
}
43+
44+
pub fn set_auto_switch_ha_service(&mut self, service: AutoSwitchHAClient) {
45+
self.auto_switch_ha_service = Some(service);
46+
}
47+
48+
// Additional methods to interact with the HA services can be added here
49+
}

rocketmq-store/src/ha/general_ha_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ impl GeneralHAService {
4949
{
5050
self.auto_switch_ha_service = Some(AutoSwitchHAService)
5151
} else {
52-
let mut default_ha_service = DefaultHAService::default();
53-
default_ha_service.init(message_store)?;
52+
let mut default_ha_service = DefaultHAService::new(message_store);
53+
default_ha_service.init()?;
5454
self.default_ha_service = Some(default_ha_service);
5555
}
5656
Ok(())
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub struct GroupTransferService;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
pub struct HAConnectionStateNotificationService;

0 commit comments

Comments
 (0)