Skip to content

Commit 03e9b58

Browse files
authored
[ISSUE #3532]🚀Implement missing methods in DefaultHAConnection for state management and data transfer (#3533)
1 parent 1fd1dd1 commit 03e9b58

File tree

1 file changed

+7
-27
lines changed

1 file changed

+7
-27
lines changed

rocketmq-store/src/ha/default_ha_connection.rs

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -114,30 +114,6 @@ impl DefaultHAConnection {
114114
let mut state_guard = self.current_state.write().await;
115115
*state_guard = new_state;
116116
}
117-
118-
/// Get current state
119-
pub async fn get_current_state(&self) -> HAConnectionState {
120-
*self.current_state.read().await
121-
}
122-
123-
/// Get slave ack offset
124-
pub fn get_slave_ack_offset(&self) -> i64 {
125-
self.slave_ack_offset.load(Ordering::SeqCst)
126-
}
127-
128-
/// Get transferred bytes per second
129-
pub fn get_transferred_byte_in_second(&self) -> u64 {
130-
self.flow_monitor.get_transferred_byte_in_second() as u64
131-
}
132-
133-
/// Get transfer from where
134-
pub fn get_transfer_from_where(&self) -> i64 {
135-
if let Some(ref write_service) = self.write_socket_service {
136-
write_service.get_next_transfer_from_where()
137-
} else {
138-
-1
139-
}
140-
}
141117
}
142118

143119
impl HAConnection for DefaultHAConnection {
@@ -232,15 +208,19 @@ impl HAConnection for DefaultHAConnection {
232208
}
233209

234210
fn get_transferred_byte_in_second(&self) -> i64 {
235-
todo!()
211+
self.flow_monitor.get_transferred_byte_in_second()
236212
}
237213

238214
fn get_transfer_from_where(&self) -> i64 {
239-
todo!()
215+
if let Some(ref write_service) = self.write_socket_service {
216+
write_service.get_next_transfer_from_where()
217+
} else {
218+
-1
219+
}
240220
}
241221

242222
fn get_slave_ack_offset(&self) -> i64 {
243-
todo!()
223+
self.slave_ack_offset.load(Ordering::SeqCst)
244224
}
245225
}
246226

0 commit comments

Comments
 (0)