Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions rocketmq-store/src/ha/default_ha_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
/// Default HA Client implementation using bytes crate
pub struct DefaultHAClient {
/// Master HA address (atomic reference)
master_ha_address: Arc<RwLock<Option<String>>>,
master_ha_address: Arc<AtomicPtr<String>>,

/// Master address (atomic reference)
master_address: Arc<AtomicPtr<String>>,
Expand Down Expand Up @@ -120,7 +120,7 @@
let now = get_current_millis() as i64;

Ok(ArcMut::new(Self {
master_ha_address: Arc::new(RwLock::new(None)),
master_ha_address: Arc::new(AtomicPtr::default()),

Check warning on line 123 in rocketmq-store/src/ha/default_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_client.rs#L123

Added line #L123 was not covered by tests
master_address: Arc::new(AtomicPtr::default()),
socket_stream: Arc::new(RwLock::new(None)),
last_read_timestamp: AtomicI64::new(now),
Expand All @@ -140,21 +140,15 @@
}))
}

/// Update HA master address
pub async fn update_ha_master_address(&self, new_addr: Option<String>) {
let mut current_addr = self.master_ha_address.write().await;
let old_addr = current_addr.clone();
*current_addr = new_addr.clone();

info!(
"update master ha address, OLD: {:?} NEW: {:?}",
old_addr, new_addr
);
}

/// Get HA master address
pub async fn get_ha_master_address(&self) -> Option<String> {
self.master_ha_address.read().await.clone()
let addr_ptr = self.master_ha_address.load(Ordering::SeqCst);
if !addr_ptr.is_null() {
let address = unsafe { (*addr_ptr).clone() };
Some(address)

Check warning on line 148 in rocketmq-store/src/ha/default_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_client.rs#L145-L148

Added lines #L145 - L148 were not covered by tests
} else {
None

Check warning on line 150 in rocketmq-store/src/ha/default_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_client.rs#L150

Added line #L150 was not covered by tests
}
}

/// Get master address
Expand Down Expand Up @@ -688,7 +682,17 @@
}

fn update_ha_master_address(&self, new_address: &str) {
todo!()
// Safely free the old pointer before storing the new one
let old_address = self.master_address.load(Ordering::SeqCst);
Copy link

Copilot AI Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading the old pointer and freeing it before storing the new one can introduce race conditions under concurrent access. Use swap to atomically exchange the pointer (let old_ptr = self.master_address.swap(new_ptr, Ordering::SeqCst)) and then free old_ptr.

Copilot uses AI. Check for mistakes.
if !old_address.is_null() {
unsafe {
// Convert the old pointer back into a Box to free the memory
let _ = Box::from_raw(old_address);
}
}

Check warning on line 692 in rocketmq-store/src/ha/default_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_client.rs#L686-L692

Added lines #L686 - L692 were not covered by tests
// Store the new address
let new_address_ptr = Box::into_raw(Box::new(new_address.to_string()));
self.master_address.store(new_address_ptr, Ordering::SeqCst);

Check warning on line 695 in rocketmq-store/src/ha/default_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_client.rs#L694-L695

Added lines #L694 - L695 were not covered by tests
}
Comment on lines 684 to 696
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical bug: Method is updating the wrong field!

The update_ha_master_address method is incorrectly updating self.master_address instead of self.master_ha_address. This appears to be a copy-paste error from the update_master_address method.

Apply this fix:

 fn update_ha_master_address(&self, new_address: &str) {
     // Safely free the old pointer before storing the new one
-    let old_address = self.master_address.load(Ordering::SeqCst);
+    let old_address = self.master_ha_address.load(Ordering::SeqCst);
     if !old_address.is_null() {
         unsafe {
             // Convert the old pointer back into a Box to free the memory
             let _ = Box::from_raw(old_address);
         }
     }
     // Store the new address
     let new_address_ptr = Box::into_raw(Box::new(new_address.to_string()));
-    self.master_address.store(new_address_ptr, Ordering::SeqCst);
+    self.master_ha_address.store(new_address_ptr, Ordering::SeqCst);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn update_ha_master_address(&self, new_address: &str) {
todo!()
// Safely free the old pointer before storing the new one
let old_address = self.master_address.load(Ordering::SeqCst);
if !old_address.is_null() {
unsafe {
// Convert the old pointer back into a Box to free the memory
let _ = Box::from_raw(old_address);
}
}
// Store the new address
let new_address_ptr = Box::into_raw(Box::new(new_address.to_string()));
self.master_address.store(new_address_ptr, Ordering::SeqCst);
}
fn update_ha_master_address(&self, new_address: &str) {
// Safely free the old pointer before storing the new one
- let old_address = self.master_address.load(Ordering::SeqCst);
+ let old_address = self.master_ha_address.load(Ordering::SeqCst);
if !old_address.is_null() {
unsafe {
// Convert the old pointer back into a Box to free the memory
let _ = Box::from_raw(old_address);
}
}
// Store the new address
let new_address_ptr = Box::into_raw(Box::new(new_address.to_string()));
- self.master_address.store(new_address_ptr, Ordering::SeqCst);
+ self.master_ha_address.store(new_address_ptr, Ordering::SeqCst);
}
🤖 Prompt for AI Agents
In rocketmq-store/src/ha/default_ha_client.rs around lines 684 to 696, the
method update_ha_master_address is incorrectly updating the field
self.master_address instead of self.master_ha_address. To fix this, replace all
occurrences of self.master_address with self.master_ha_address in this method,
ensuring the old pointer is freed and the new address is stored in
self.master_ha_address.


fn get_master_address(&self) -> String {
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-store/src/ha/default_ha_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
}

fn update_ha_master_address(&self, new_addr: &str) {
todo!()
self.ha_client.update_ha_master_address(new_addr);

Check warning on line 202 in rocketmq-store/src/ha/default_ha_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/default_ha_service.rs#L202

Added line #L202 was not covered by tests
}

fn in_sync_replicas_nums(&self, master_put_where: i64) -> i32 {
Expand Down
8 changes: 7 additions & 1 deletion rocketmq-store/src/ha/general_ha_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,13 @@
}

fn update_ha_master_address(&self, new_address: &str) {
todo!()
if let Some(ref client) = self.default_ha_client {
client.update_ha_master_address(new_address);
} else if let Some(ref client) = self.auto_switch_ha_client {
client.update_ha_master_address(new_address);
} else {
panic!("No HA service is set for GeneralHAClient");

Check warning on line 89 in rocketmq-store/src/ha/general_ha_client.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-store/src/ha/general_ha_client.rs#L84-L89

Added lines #L84 - L89 were not covered by tests
}
}

fn get_master_address(&self) -> String {
Expand Down
Loading