Skip to content

Commit c341a3d

Browse files
committed
[ISSUE #3534]🚀Implement connection handling in HAConnection methods with error logging
1 parent 03e9b58 commit c341a3d

File tree

1 file changed

+65
-8
lines changed

1 file changed

+65
-8
lines changed

rocketmq-store/src/ha/general_ha_connection.rs

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,34 +76,91 @@ impl HAConnection for GeneralHAConnection {
7676
}
7777

7878
async fn shutdown(&mut self) {
79-
todo!()
79+
match (
80+
&mut self.default_ha_connection,
81+
&mut self.auto_switch_ha_connection,
82+
) {
83+
(Some(connection), _) => connection.shutdown().await,
84+
(_, Some(connection)) => connection.shutdown().await,
85+
(None, None) => {
86+
tracing::warn!("No HA connection to shutdown");
87+
}
88+
}
8089
}
8190

8291
fn close(&self) {
83-
todo!()
92+
match (&self.default_ha_connection, &self.auto_switch_ha_connection) {
93+
(Some(connection), _) => connection.close(),
94+
(_, Some(connection)) => connection.close(),
95+
(None, None) => {
96+
tracing::warn!("No HA connection to close");
97+
}
98+
}
8499
}
85100

86101
fn get_socket(&self) -> &TcpStream {
87-
todo!()
102+
match (&self.default_ha_connection, &self.auto_switch_ha_connection) {
103+
(Some(connection), _) => connection.get_socket(),
104+
(_, Some(connection)) => connection.get_socket(),
105+
(None, None) => {
106+
tracing::warn!("No HA connection to get socket from");
107+
panic!("No HA connection available");
108+
}
109+
}
88110
}
89111

90112
async fn get_current_state(&self) -> HAConnectionState {
91-
todo!()
113+
match (&self.default_ha_connection, &self.auto_switch_ha_connection) {
114+
(Some(connection), _) => connection.get_current_state().await,
115+
(_, Some(connection)) => connection.get_current_state().await,
116+
(None, None) => {
117+
tracing::warn!("No HA connection to get current state from");
118+
panic!("No HA connection available");
119+
}
120+
}
92121
}
93122

94123
fn get_client_address(&self) -> &str {
95-
todo!()
124+
match (&self.default_ha_connection, &self.auto_switch_ha_connection) {
125+
(Some(connection), _) => connection.get_client_address(),
126+
(_, Some(connection)) => connection.get_client_address(),
127+
(None, None) => {
128+
tracing::warn!("No HA connection to get client address from");
129+
panic!("No HA connection available");
130+
}
131+
}
96132
}
97133

98134
fn get_transferred_byte_in_second(&self) -> i64 {
99-
todo!()
135+
match (&self.default_ha_connection, &self.auto_switch_ha_connection) {
136+
(Some(connection), _) => connection.get_transferred_byte_in_second(),
137+
(_, Some(connection)) => connection.get_transferred_byte_in_second(),
138+
(None, None) => {
139+
tracing::warn!("No HA connection to get transferred bytes from");
140+
panic!("No HA connection available");
141+
}
142+
}
100143
}
101144

102145
fn get_transfer_from_where(&self) -> i64 {
103-
todo!()
146+
match (&self.default_ha_connection, &self.auto_switch_ha_connection) {
147+
(Some(connection), _) => connection.get_transfer_from_where(),
148+
(_, Some(connection)) => connection.get_transfer_from_where(),
149+
(None, None) => {
150+
tracing::warn!("No HA connection to get transfer offset from");
151+
panic!("No HA connection available");
152+
}
153+
}
104154
}
105155

106156
fn get_slave_ack_offset(&self) -> i64 {
107-
todo!()
157+
match (&self.default_ha_connection, &self.auto_switch_ha_connection) {
158+
(Some(connection), _) => connection.get_slave_ack_offset(),
159+
(_, Some(connection)) => connection.get_slave_ack_offset(),
160+
(None, None) => {
161+
tracing::warn!("No HA connection to get slave ack offset from");
162+
panic!("No HA connection available");
163+
}
164+
}
108165
}
109166
}

0 commit comments

Comments
 (0)