From b8a61a19cd3daf930d48b54eac0c8e741765f40b Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 22 Jun 2025 17:17:02 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#3510]=F0=9F=9A=80Implement=20Comprehe?= =?UTF-8?q?nsive=20High=20Availability=20(HA)=20Subsystem=20Architecture?= =?UTF-8?q?=20with=20Enhanced=20Service=20Infrastructure=E2=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-store/src/ha/default_ha_client.rs | 4 +-- rocketmq-store/src/ha/default_ha_service.rs | 24 ++++++++++++++--- rocketmq-store/src/ha/general_ha_client.rs | 5 ++-- rocketmq-store/src/ha/general_ha_service.rs | 27 ++++++++++++++----- .../src/ha/group_transfer_service.rs | 21 ++++++++++++++- ...a_connection_state_notification_service.rs | 21 ++++++++++++++- .../message_store/local_file_message_store.rs | 4 +++ 7 files changed, 91 insertions(+), 15 deletions(-) diff --git a/rocketmq-store/src/ha/default_ha_client.rs b/rocketmq-store/src/ha/default_ha_client.rs index 65b727caf..e037320ec 100644 --- a/rocketmq-store/src/ha/default_ha_client.rs +++ b/rocketmq-store/src/ha/default_ha_client.rs @@ -110,14 +110,14 @@ impl DefaultHAClient { /// Create a new DefaultHAClient pub fn new( default_message_store: ArcMut, - ) -> Result, HAClientError> { + ) -> Result, HAClientError> { let flow_monitor = Arc::new(FlowMonitor::new( default_message_store.message_store_config(), )); let now = get_current_millis() as i64; - Ok(Arc::new(Self { + Ok(ArcMut::new(Self { master_ha_address: Arc::new(RwLock::new(None)), master_address: Arc::new(RwLock::new(None)), socket_stream: Arc::new(RwLock::new(None)), diff --git a/rocketmq-store/src/ha/default_ha_service.rs b/rocketmq-store/src/ha/default_ha_service.rs index 22af48302..9504bcf9a 100644 --- a/rocketmq-store/src/ha/default_ha_service.rs +++ b/rocketmq-store/src/ha/default_ha_service.rs @@ -36,12 +36,15 @@ use std::sync::atomic::AtomicI64; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use rocketmq_common::common::broker::broker_role::BrokerRole; use rocketmq_remoting::protocol::body::ha_runtime_info::HARuntimeInfo; use rocketmq_rust::ArcMut; use tracing::error; +use crate::ha::default_ha_client::DefaultHAClient; use crate::ha::general_ha_client::GeneralHAClient; use crate::ha::general_ha_connection::GeneralHAConnection; +use crate::ha::general_ha_service::GeneralHAService; use crate::ha::group_transfer_service::GroupTransferService; use crate::ha::ha_client::HAClient; use crate::ha::ha_connection::HAConnection; @@ -51,6 +54,7 @@ use crate::ha::ha_service::HAService; use crate::ha::wait_notify_object::WaitNotifyObject; use crate::log_file::flush_manager_impl::group_commit_request::GroupCommitRequest; use crate::message_store::local_file_message_store::LocalFileMessageStore; +use crate::store_error::HAError; use crate::store_error::HAResult; pub struct DefaultHAService { @@ -62,7 +66,7 @@ pub struct DefaultHAService { push2_slave_max_offset: Arc, group_transfer_service: Option, ha_client: GeneralHAClient, - ha_connection_state_notification_service: HAConnectionStateNotificationService, + ha_connection_state_notification_service: Option, } impl DefaultHAService { @@ -76,7 +80,7 @@ impl DefaultHAService { push2_slave_max_offset: Arc::new(AtomicU64::new(0)), group_transfer_service: None, ha_client: GeneralHAClient::new(), - ha_connection_state_notification_service: HAConnectionStateNotificationService, + ha_connection_state_notification_service: None, } } @@ -91,8 +95,22 @@ impl DefaultHAService { unimplemented!(" notify_transfer_some method is not implemented"); } - pub(crate) fn init(&mut self) -> HAResult<()> { + pub(crate) fn init(&mut self, this: ArcMut) -> HAResult<()> { // Initialize the DefaultHAService with the provided message store. + let config = self.default_message_store.get_message_store_config(); + let service = GeneralHAService::new_with_default_ha_service(this.clone()); + let group_transfer_service = GroupTransferService::new(config.clone(), service.clone()); + self.group_transfer_service = Some(group_transfer_service); + + if config.broker_role == BrokerRole::Slave { + let default_message_store = self.default_message_store.clone(); + let client = DefaultHAClient::new(default_message_store) + .map_err(|e| HAError::Service(format!("Failed to create DefaultHAClient: {e}")))?; + self.ha_client.set_default_ha_service(client) + } + let state_notification_service = + HAConnectionStateNotificationService::new(service, self.default_message_store.clone()); + self.ha_connection_state_notification_service = Some(state_notification_service); Ok(()) } } diff --git a/rocketmq-store/src/ha/general_ha_client.rs b/rocketmq-store/src/ha/general_ha_client.rs index d83c04417..84e85ba74 100644 --- a/rocketmq-store/src/ha/general_ha_client.rs +++ b/rocketmq-store/src/ha/general_ha_client.rs @@ -14,12 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use rocketmq_rust::ArcMut; use crate::ha::auto_switch::auto_switch_ha_client::AutoSwitchHAClient; use crate::ha::default_ha_client::DefaultHAClient; pub struct GeneralHAClient { - default_ha_service: Option, + default_ha_service: Option>, auto_switch_ha_service: Option, } @@ -37,7 +38,7 @@ impl GeneralHAClient { } } - pub fn set_default_ha_service(&mut self, service: DefaultHAClient) { + pub fn set_default_ha_service(&mut self, service: ArcMut) { self.default_ha_service = Some(service); } diff --git a/rocketmq-store/src/ha/general_ha_service.rs b/rocketmq-store/src/ha/general_ha_service.rs index dba71e0d6..0cc6cf86a 100644 --- a/rocketmq-store/src/ha/general_ha_service.rs +++ b/rocketmq-store/src/ha/general_ha_service.rs @@ -23,7 +23,6 @@ use rocketmq_remoting::protocol::body::ha_runtime_info::HARuntimeInfo; use rocketmq_rust::ArcMut; use tracing::error; -use crate::base::message_store::MessageStore; use crate::ha::auto_switch::auto_switch_ha_service::AutoSwitchHAService; use crate::ha::default_ha_service::DefaultHAService; use crate::ha::ha_client::HAClient; @@ -36,21 +35,37 @@ use crate::message_store::local_file_message_store::LocalFileMessageStore; use crate::store_error::HAError; use crate::store_error::HAResult; +#[derive(Clone)] pub struct GeneralHAService { - default_ha_service: Option, - auto_switch_ha_service: Option, + default_ha_service: Option>, + auto_switch_ha_service: Option>, } impl GeneralHAService { + pub fn new() -> Self { + GeneralHAService { + default_ha_service: None, + auto_switch_ha_service: None, + } + } + + pub fn new_with_default_ha_service(default_ha_service: ArcMut) -> Self { + GeneralHAService { + default_ha_service: Some(default_ha_service), + auto_switch_ha_service: None, + } + } + pub(crate) fn init(&mut self, message_store: ArcMut) -> HAResult<()> { if message_store .get_message_store_config() .enable_controller_mode { - self.auto_switch_ha_service = Some(AutoSwitchHAService) + self.auto_switch_ha_service = Some(ArcMut::new(AutoSwitchHAService)) } else { - let mut default_ha_service = DefaultHAService::new(message_store); - default_ha_service.init()?; + let mut default_ha_service = ArcMut::new(DefaultHAService::new(message_store)); + let default_ha_service_clone = default_ha_service.clone(); + DefaultHAService::init(&mut default_ha_service, default_ha_service_clone)?; self.default_ha_service = Some(default_ha_service); } Ok(()) diff --git a/rocketmq-store/src/ha/group_transfer_service.rs b/rocketmq-store/src/ha/group_transfer_service.rs index 4899f2918..010a84e3e 100644 --- a/rocketmq-store/src/ha/group_transfer_service.rs +++ b/rocketmq-store/src/ha/group_transfer_service.rs @@ -14,5 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::Arc; -pub struct GroupTransferService; +use crate::config::message_store_config::MessageStoreConfig; +use crate::ha::general_ha_service::GeneralHAService; + +pub struct GroupTransferService { + message_store_config: Arc, + ha_service: GeneralHAService, +} + +impl GroupTransferService { + pub fn new( + message_store_config: Arc, + ha_service: GeneralHAService, + ) -> Self { + GroupTransferService { + message_store_config, + ha_service, + } + } +} diff --git a/rocketmq-store/src/ha/ha_connection_state_notification_service.rs b/rocketmq-store/src/ha/ha_connection_state_notification_service.rs index 9b323e8ba..5d10377fa 100644 --- a/rocketmq-store/src/ha/ha_connection_state_notification_service.rs +++ b/rocketmq-store/src/ha/ha_connection_state_notification_service.rs @@ -14,5 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use rocketmq_rust::ArcMut; -pub struct HAConnectionStateNotificationService; +use crate::ha::general_ha_service::GeneralHAService; +use crate::message_store::local_file_message_store::LocalFileMessageStore; + +pub struct HAConnectionStateNotificationService { + ha_service: GeneralHAService, + default_message_store: ArcMut, +} + +impl HAConnectionStateNotificationService { + pub fn new( + ha_service: GeneralHAService, + default_message_store: ArcMut, + ) -> Self { + HAConnectionStateNotificationService { + ha_service, + default_message_store, + } + } +} diff --git a/rocketmq-store/src/message_store/local_file_message_store.rs b/rocketmq-store/src/message_store/local_file_message_store.rs index d6d58ef81..8a1323c8f 100644 --- a/rocketmq-store/src/message_store/local_file_message_store.rs +++ b/rocketmq-store/src/message_store/local_file_message_store.rs @@ -531,6 +531,10 @@ impl LocalFileMessageStore { fn do_recheck_reput_offset_from_cq(&self) { error!("do_recheck_reput_offset_from_cq called, not implemented yet"); } + + pub fn get_message_store_config(&self) -> Arc { + self.message_store_config.clone() + } } fn estimate_in_mem_by_commit_offset(