Skip to content

Commit 8711443

Browse files
authored
[ISSUE #3517]⚡️Memory Management Fix for AtomicPtr-based Address Fields in HA Client (#3518)
1 parent 4e10afa commit 8711443

File tree

3 files changed

+28
-18
lines changed

3 files changed

+28
-18
lines changed

rocketmq-store/src/ha/default_ha_client.rs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ const TRANSFER_HEADER_SIZE: usize = 12; // 8 bytes offset + 4 bytes body size
6363
/// Default HA Client implementation using bytes crate
6464
pub struct DefaultHAClient {
6565
/// Master HA address (atomic reference)
66-
master_ha_address: Arc<RwLock<Option<String>>>,
66+
master_ha_address: Arc<AtomicPtr<String>>,
6767

6868
/// Master address (atomic reference)
6969
master_address: Arc<AtomicPtr<String>>,
@@ -120,7 +120,7 @@ impl DefaultHAClient {
120120
let now = get_current_millis() as i64;
121121

122122
Ok(ArcMut::new(Self {
123-
master_ha_address: Arc::new(RwLock::new(None)),
123+
master_ha_address: Arc::new(AtomicPtr::default()),
124124
master_address: Arc::new(AtomicPtr::default()),
125125
socket_stream: Arc::new(RwLock::new(None)),
126126
last_read_timestamp: AtomicI64::new(now),
@@ -140,21 +140,15 @@ impl DefaultHAClient {
140140
}))
141141
}
142142

143-
/// Update HA master address
144-
pub async fn update_ha_master_address(&self, new_addr: Option<String>) {
145-
let mut current_addr = self.master_ha_address.write().await;
146-
let old_addr = current_addr.clone();
147-
*current_addr = new_addr.clone();
148-
149-
info!(
150-
"update master ha address, OLD: {:?} NEW: {:?}",
151-
old_addr, new_addr
152-
);
153-
}
154-
155143
/// Get HA master address
156144
pub async fn get_ha_master_address(&self) -> Option<String> {
157-
self.master_ha_address.read().await.clone()
145+
let addr_ptr = self.master_ha_address.load(Ordering::SeqCst);
146+
if !addr_ptr.is_null() {
147+
let address = unsafe { (*addr_ptr).clone() };
148+
Some(address)
149+
} else {
150+
None
151+
}
158152
}
159153

160154
/// Get master address
@@ -688,7 +682,17 @@ impl HAClient for DefaultHAClient {
688682
}
689683

690684
fn update_ha_master_address(&self, new_address: &str) {
691-
todo!()
685+
// Safely free the old pointer before storing the new one
686+
let old_address = self.master_address.load(Ordering::SeqCst);
687+
if !old_address.is_null() {
688+
unsafe {
689+
// Convert the old pointer back into a Box to free the memory
690+
let _ = Box::from_raw(old_address);
691+
}
692+
}
693+
// Store the new address
694+
let new_address_ptr = Box::into_raw(Box::new(new_address.to_string()));
695+
self.master_address.store(new_address_ptr, Ordering::SeqCst);
692696
}
693697

694698
fn get_master_address(&self) -> String {

rocketmq-store/src/ha/default_ha_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl HAService for DefaultHAService {
199199
}
200200

201201
fn update_ha_master_address(&self, new_addr: &str) {
202-
todo!()
202+
self.ha_client.update_ha_master_address(new_addr);
203203
}
204204

205205
fn in_sync_replicas_nums(&self, master_put_where: i64) -> i32 {

rocketmq-store/src/ha/general_ha_client.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,13 @@ impl HAClient for GeneralHAClient {
8181
}
8282

8383
fn update_ha_master_address(&self, new_address: &str) {
84-
todo!()
84+
if let Some(ref client) = self.default_ha_client {
85+
client.update_ha_master_address(new_address);
86+
} else if let Some(ref client) = self.auto_switch_ha_client {
87+
client.update_ha_master_address(new_address);
88+
} else {
89+
panic!("No HA service is set for GeneralHAClient");
90+
}
8591
}
8692

8793
fn get_master_address(&self) -> String {

0 commit comments

Comments
 (0)