From 4ddeeea33e502dc2e4ba714ebccf687c3db971e1 Mon Sep 17 00:00:00 2001 From: mxsm Date: Wed, 25 Jun 2025 23:56:17 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#3527]=E2=9A=A1=EF=B8=8FDelegation=20p?= =?UTF-8?q?attern=20implementation=20with=20error=20handling=20for=20HA=20?= =?UTF-8?q?connections?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-store/src/ha/default_ha_service.rs | 10 ++++++++-- rocketmq-store/src/ha/general_ha_connection.rs | 10 +++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/rocketmq-store/src/ha/default_ha_service.rs b/rocketmq-store/src/ha/default_ha_service.rs index 0f0fb8009..307515d0f 100644 --- a/rocketmq-store/src/ha/default_ha_service.rs +++ b/rocketmq-store/src/ha/default_ha_service.rs @@ -294,8 +294,14 @@ impl AcceptSocketService { unimplemented!("Auto-switching is not implemented yet"); }else{ let default_conn = DefaultHAConnection::new(default_ha_service.clone(), stream,message_store_config.clone()).await.expect("Error creating HAConnection"); - let general_conn = GeneralHAConnection::new_with_default_ha_connection(default_conn); - default_ha_service.add_connection(general_conn).await; + let mut general_conn = GeneralHAConnection::new_with_default_ha_connection(default_conn); + if let Err(e) = general_conn.start().await { + error!("Error starting HAService: {}", e); + }else { + info!("HAService accept new connection, {}", addr); + default_ha_service.add_connection(general_conn).await; + } + }; } Err(e) => { diff --git a/rocketmq-store/src/ha/general_ha_connection.rs b/rocketmq-store/src/ha/general_ha_connection.rs index 196d0acab..6b3fb7e58 100644 --- a/rocketmq-store/src/ha/general_ha_connection.rs +++ b/rocketmq-store/src/ha/general_ha_connection.rs @@ -63,7 +63,15 @@ impl GeneralHAConnection { impl HAConnection for GeneralHAConnection { async fn start(&mut self) -> Result<(), HAConnectionError> { - todo!() + if let Some(ref mut connection) = self.default_ha_connection { + connection.start().await + } else if let Some(ref mut connection) = self.auto_switch_ha_connection { + connection.start().await + } else { + Err(HAConnectionError::Connection( + "No HA connection set".to_string(), + )) + } } async fn shutdown(&mut self) {