Skip to content

Commit 0bada47

Browse files
authored
[ISSUE #4001]♻️Nameserver bootstrap: replace internal RocketMQRuntime scheduling with ScheduledTaskManager + initialization lifecycle split (#4002)
1 parent bc14da1 commit 0bada47

File tree

1 file changed

+60
-23
lines changed

1 file changed

+60
-23
lines changed

rocketmq-namesrv/src/bootstrap.rs

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ use rocketmq_remoting::remoting::RemotingService;
3030
use rocketmq_remoting::remoting_server::server::RocketMQServer;
3131
use rocketmq_remoting::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
3232
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
33-
use rocketmq_runtime::RocketMQRuntime;
33+
use rocketmq_rust::schedule::simple_scheduler::ScheduledTaskManager;
3434
use rocketmq_rust::wait_for_signal;
3535
use rocketmq_rust::ArcMut;
3636
use tokio::sync::broadcast;
3737
use tracing::info;
38+
use tracing::warn;
3839

3940
use crate::processor::ClientRequestProcessor;
4041
use crate::processor::NameServerRequestProcessor;
@@ -53,8 +54,9 @@ pub struct Builder {
5354
}
5455

5556
struct NameServerRuntime {
56-
name_server_runtime: Option<RocketMQRuntime>,
57+
//name_server_runtime: Option<RocketMQRuntime>,
5758
inner: ArcMut<NameServerRuntimeInner>,
59+
scheduled_task_manager: ScheduledTaskManager,
5860
// receiver for shutdown signal
5961
shutdown_rx: Option<tokio::sync::broadcast::Receiver<()>>,
6062
}
@@ -82,6 +84,53 @@ async fn wait_for_signal_inner(shutdown_tx: broadcast::Sender<()>) {
8284
}
8385

8486
impl NameServerRuntime {
87+
pub async fn initialize(&mut self) {
88+
self.load_config().await;
89+
self.initiate_network_components();
90+
self.register_processor();
91+
self.start_schedule_service();
92+
self.initiate_ssl_context();
93+
self.initiate_rpc_hooks();
94+
}
95+
96+
async fn load_config(&mut self) {
97+
if let Some(kv_config_manager) = self.inner.kvconfig_manager.as_mut() {
98+
kv_config_manager.load();
99+
}
100+
}
101+
fn initiate_network_components(&mut self) {
102+
//nothing to do
103+
}
104+
105+
fn register_processor(&mut self) {
106+
//nothing to do
107+
}
108+
fn start_schedule_service(&self) {
109+
let scan_not_active_broker_interval = self
110+
.inner
111+
.name_server_config
112+
.scan_not_active_broker_interval;
113+
let mut name_server_runtime_inner = self.inner.clone();
114+
self.scheduled_task_manager.add_fixed_rate_task_async(
115+
Duration::from_secs(5),
116+
Duration::from_millis(scan_not_active_broker_interval),
117+
async move |_ctx| {
118+
if let Some(route_info_manager) =
119+
name_server_runtime_inner.route_info_manager.as_mut()
120+
{
121+
route_info_manager.scan_not_active_broker();
122+
}
123+
Ok(())
124+
},
125+
);
126+
}
127+
fn initiate_ssl_context(&mut self) {
128+
warn!("SSL is not supported yet");
129+
}
130+
fn initiate_rpc_hooks(&mut self) {
131+
warn!("RPC hooks are not supported yet");
132+
}
133+
85134
pub async fn start(&mut self) {
86135
let (notify_conn_disconnect, _) = broadcast::channel::<SocketAddr>(100);
87136
let receiver = notify_conn_disconnect.subscribe();
@@ -118,13 +167,14 @@ impl NameServerRuntime {
118167

119168
#[inline]
120169
fn shutdown(&mut self) {
121-
if let Some(runtime) = self.name_server_runtime.take() {
122-
runtime.shutdown();
123-
}
170+
self.scheduled_task_manager.cancel_all();
124171
self.inner
125172
.route_info_manager_mut()
126173
.un_register_service
127174
.shutdown();
175+
/*if let Some(runtime) = self.name_server_runtime.take() {
176+
runtime.shutdown();
177+
}*/
128178
info!("Rocketmq NameServer(Rust) gracefully shutdown completed");
129179
}
130180

@@ -140,20 +190,6 @@ impl NameServerRuntime {
140190
crate::processor::default_request_processor::DefaultRequestProcessor::new(
141191
self.inner.clone(),
142192
);
143-
144-
let mut inner = self.inner.clone();
145-
self.name_server_runtime
146-
.as_ref()
147-
.unwrap()
148-
.schedule_at_fixed_rate_mut(
149-
move || {
150-
if let Some(route_info_manager) = inner.route_info_manager.as_mut() {
151-
route_info_manager.scan_not_active_broker();
152-
}
153-
},
154-
Some(Duration::from_secs(5)),
155-
Duration::from_secs(5),
156-
);
157193
let mut name_server_request_processor = NameServerRequestProcessor::new();
158194
name_server_request_processor.register_processor(
159195
RequestCode::GetRouteinfoByTopic,
@@ -173,9 +209,9 @@ impl NameServerRuntime {
173209
impl Drop for NameServerRuntime {
174210
#[inline]
175211
fn drop(&mut self) {
176-
if let Some(runtime) = self.name_server_runtime.take() {
212+
/*if let Some(runtime) = self.name_server_runtime.take() {
177213
runtime.shutdown();
178-
}
214+
}*/
179215
}
180216
}
181217

@@ -210,7 +246,7 @@ impl Builder {
210246
#[inline]
211247
pub fn build(self) -> NameServerBootstrap {
212248
let name_server_config = self.name_server_config.unwrap_or_default();
213-
let runtime = RocketMQRuntime::new_multi(10, "namesrv-thread");
249+
//let runtime = RocketMQRuntime::new_multi(10, "namesrv-thread");
214250
let tokio_client_config = TokioClientConfig::default();
215251
let remoting_client = ArcMut::new(RocketmqDefaultClient::new(
216252
Arc::new(tokio_client_config.clone()),
@@ -237,8 +273,9 @@ impl Builder {
237273

238274
NameServerBootstrap {
239275
name_server_runtime: NameServerRuntime {
240-
name_server_runtime: Some(runtime),
276+
//name_server_runtime: Some(runtime),
241277
inner,
278+
scheduled_task_manager: ScheduledTaskManager::new(),
242279
shutdown_rx: None,
243280
},
244281
}

0 commit comments

Comments
 (0)