Skip to content

Commit def325d

Browse files
authored
[ISSUE #4060]⚡️Enhance broker runtime with locking mechanism and refactor sync methods (#4061)
1 parent b4d8a02 commit def325d

File tree

1 file changed

+91
-26
lines changed

1 file changed

+91
-26
lines changed

rocketmq-broker/src/broker_runtime.rs

Lines changed: 91 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ impl BrokerRuntime {
243243
broker_pre_online_service: None,
244244
min_broker_id_in_group: AtomicU64::new(0),
245245
min_broker_addr_in_group: Default::default(),
246+
lock: Default::default(),
246247
});
247248
let mut stats_manager = BrokerStatsManager::new(inner.broker_config.clone());
248249
stats_manager.set_producer_state_getter(Arc::new(ProducerStateGetter {
@@ -1201,7 +1202,7 @@ impl BrokerRuntime {
12011202
Duration::from_millis(1000),
12021203
Duration::from_millis(sync_broker_member_group_period),
12031204
async move |_ctx| {
1204-
inner_.sync_broker_member_group().await;
1205+
BrokerRuntimeInner::sync_broker_member_group(&inner_).await;
12051206
Ok(())
12061207
},
12071208
);
@@ -1616,6 +1617,7 @@ pub(crate) struct BrokerRuntimeInner<MS: MessageStore> {
16161617
broker_pre_online_service: Option<BrokerPreOnlineService<MS>>,
16171618
min_broker_id_in_group: AtomicU64,
16181619
min_broker_addr_in_group: Mutex<Option<CheetahString>>,
1620+
lock: Mutex<()>,
16191621
}
16201622

16211623
impl<MS: MessageStore> BrokerRuntimeInner<MS> {
@@ -2082,8 +2084,8 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
20822084
}
20832085

20842086
#[inline]
2085-
pub fn slave_synchronize(&self) -> &Option<SlaveSynchronize<MS>> {
2086-
&self.slave_synchronize
2087+
pub fn slave_synchronize(&self) -> Option<&SlaveSynchronize<MS>> {
2088+
self.slave_synchronize.as_ref()
20872089
}
20882090

20892091
#[inline]
@@ -2482,11 +2484,11 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
24822484
CheetahString::from_string(addr)
24832485
}
24842486

2485-
async fn sync_broker_member_group(&self) {
2486-
let broker_cluster_name = &self.broker_config.broker_identity.broker_cluster_name;
2487-
let broker_name = &self.broker_config.broker_identity.broker_name;
2488-
let compatible_with_old_name_srv = self.broker_config.compatible_with_old_name_srv;
2489-
let broker_member_group = self
2487+
async fn sync_broker_member_group(this: &ArcMut<Self>) {
2488+
let broker_cluster_name = &this.broker_config.broker_identity.broker_cluster_name;
2489+
let broker_name = &this.broker_config.broker_identity.broker_name;
2490+
let compatible_with_old_name_srv = this.broker_config.compatible_with_old_name_srv;
2491+
let broker_member_group = this
24902492
.broker_outer_api
24912493
.sync_broker_member_group(
24922494
broker_cluster_name,
@@ -2527,33 +2529,48 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
25272529
}
25282530
}
25292531
let broker_member_group = broker_member_group.unwrap();
2530-
self.message_store_unchecked()
2532+
this.message_store_unchecked()
25312533
.set_alive_replica_num_in_group(calc_alive_broker_num_in_group(
25322534
&broker_member_group.broker_addrs,
2533-
self.broker_config.broker_identity.broker_id,
2535+
this.broker_config.broker_identity.broker_id,
25342536
) as i32);
2535-
if !self.is_isolated.load(Ordering::Acquire) {
2537+
if !this.is_isolated.load(Ordering::Acquire) {
25362538
let min_broker_id = broker_member_group.minimum_broker_id();
25372539
let min_broker_addr = broker_member_group
25382540
.broker_addrs
25392541
.get(&min_broker_id)
25402542
.cloned()
25412543
.unwrap_or_default();
2542-
self.update_min_broker(min_broker_id, min_broker_addr).await;
2544+
BrokerRuntimeInner::update_min_broker(this, min_broker_id, min_broker_addr).await;
25432545
}
25442546
}
25452547

2546-
pub async fn update_min_broker(&self, min_broker_id: u64, min_broker_addr: CheetahString) {
2547-
if self.broker_config.enable_slave_acting_master
2548-
&& self.broker_config.broker_identity.broker_id != MASTER_ID
2548+
pub async fn update_min_broker(
2549+
this: &ArcMut<Self>,
2550+
min_broker_id: u64,
2551+
min_broker_addr: CheetahString,
2552+
) {
2553+
if this.broker_config.enable_slave_acting_master
2554+
&& this.broker_config.broker_identity.broker_id != MASTER_ID
25492555
{
2550-
let min_broker_id_in_group = self.min_broker_id_in_group.load(Ordering::SeqCst);
2551-
if min_broker_id != min_broker_id_in_group {
2552-
let mut offline_broker_addr = None;
2553-
if min_broker_id > min_broker_id_in_group {
2554-
offline_broker_addr = self.min_broker_addr_in_group.lock().await.clone();
2556+
let mut this_clone = this.clone();
2557+
if let Ok(lock) = this.lock.try_lock() {
2558+
let min_broker_id_in_group = this.min_broker_id_in_group.load(Ordering::SeqCst);
2559+
if min_broker_id != min_broker_id_in_group {
2560+
let mut offline_broker_addr = None;
2561+
if min_broker_id > min_broker_id_in_group {
2562+
offline_broker_addr = this.min_broker_addr_in_group.lock().await.clone();
2563+
}
2564+
this_clone
2565+
.on_min_broker_change(
2566+
min_broker_id,
2567+
min_broker_addr,
2568+
offline_broker_addr,
2569+
None,
2570+
)
2571+
.await;
25552572
}
2556-
self.on_min_broker_change(min_broker_id, min_broker_addr, offline_broker_addr, None)
2573+
drop(lock);
25572574
}
25582575
}
25592576
}
@@ -2655,14 +2672,62 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
26552672
unimplemented!("BrokerRuntimeInner#start_service");
26562673
}
26572674

2658-
fn on_min_broker_change(
2659-
&self,
2660-
_min_broker_id: u64,
2675+
async fn on_min_broker_change(
2676+
&mut self,
2677+
min_broker_id: u64,
2678+
min_broker_addr: CheetahString,
2679+
offline_broker_addr: Option<CheetahString>,
2680+
master_ha_addr: Option<CheetahString>,
2681+
) {
2682+
let min_broker_id_in_group_old = self.min_broker_id_in_group.load(Ordering::SeqCst);
2683+
let mut min_broker_addr_in_group_old = self.min_broker_addr_in_group.lock().await;
2684+
info!(
2685+
"Min broker changed, old: {}-{:?}, new {}-{}",
2686+
min_broker_id_in_group_old,
2687+
min_broker_addr_in_group_old,
2688+
min_broker_id,
2689+
min_broker_addr
2690+
);
2691+
self.min_broker_id_in_group
2692+
.store(min_broker_id, Ordering::SeqCst);
2693+
*min_broker_addr_in_group_old = Some(min_broker_addr.clone());
2694+
drop(min_broker_addr_in_group_old);
2695+
self.change_special_service_status(
2696+
self.broker_config.broker_identity.broker_id
2697+
== self.min_broker_id_in_group.load(Ordering::SeqCst),
2698+
)
2699+
.await;
2700+
if offline_broker_addr.is_some()
2701+
&& offline_broker_addr.as_ref() == self.slave_synchronize().unwrap().master_addr()
2702+
{
2703+
//master offline
2704+
self.on_master_offline().await;
2705+
}
2706+
2707+
if min_broker_id == MASTER_ID && !min_broker_addr.is_empty() {
2708+
//master online
2709+
self.on_master_on_line(min_broker_addr, master_ha_addr)
2710+
.await;
2711+
}
2712+
2713+
// notify PullRequest on hold to pull from master.
2714+
if self.min_broker_id_in_group.load(Ordering::SeqCst) == MASTER_ID {
2715+
self.pull_request_hold_service_unchecked()
2716+
.notify_master_online()
2717+
.await;
2718+
}
2719+
}
2720+
2721+
async fn on_master_on_line(
2722+
&mut self,
26612723
_min_broker_addr: CheetahString,
2662-
_offline_broker_addr: Option<CheetahString>,
26632724
_master_ha_addr: Option<CheetahString>,
26642725
) {
2665-
unimplemented!("BrokerRuntimeInner#on_min_broker_change");
2726+
error!("unimplemented")
2727+
}
2728+
2729+
async fn on_master_offline(&mut self) {
2730+
error!("unimplemented")
26662731
}
26672732

26682733
async fn send_heartbeat(&self) {

0 commit comments

Comments
 (0)