Skip to content

Commit dd05ed2

Browse files
authored
Merge pull request #10501 from dantengsky/fix-thread-leakings
fix: thread leaking
2 parents b30a4a1 + ec7acb7 commit dd05ed2

File tree

7 files changed

+69
-25
lines changed

7 files changed

+69
-25
lines changed

src/common/base/src/runtime/global_runtime.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ use crate::runtime::Runtime;
2121

2222
pub struct GlobalIORuntime;
2323

24+
pub struct GlobalQueryRuntime(pub Runtime);
25+
26+
impl GlobalQueryRuntime {
27+
#[inline(always)]
28+
pub fn runtime<'a>(self: &'a Arc<Self>) -> &'a Runtime {
29+
&self.0
30+
}
31+
}
32+
2433
impl GlobalIORuntime {
2534
pub fn init(num_cpus: usize) -> Result<()> {
2635
let thread_num = std::cmp::max(num_cpus, num_cpus::get() / 2);
@@ -37,3 +46,18 @@ impl GlobalIORuntime {
3746
GlobalInstance::get()
3847
}
3948
}
49+
50+
impl GlobalQueryRuntime {
51+
pub fn init(num_cpus: usize) -> Result<()> {
52+
let thread_num = std::cmp::max(num_cpus, num_cpus::get() / 2);
53+
let thread_num = std::cmp::max(2, thread_num);
54+
55+
let rt = Runtime::with_worker_threads(thread_num, Some("g-query-worker".to_owned()))?;
56+
GlobalInstance::set(Arc::new(GlobalQueryRuntime(rt)));
57+
Ok(())
58+
}
59+
60+
pub fn instance() -> Arc<GlobalQueryRuntime> {
61+
GlobalInstance::get()
62+
}
63+
}

src/common/base/src/runtime/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod thread_pool;
2323
pub use catch_unwind::catch_unwind;
2424
pub use catch_unwind::CatchUnwindFuture;
2525
pub use global_runtime::GlobalIORuntime;
26+
pub use global_runtime::GlobalQueryRuntime;
2627
pub use runtime::execute_futures_in_parallel;
2728
pub use runtime::match_join_handle;
2829
pub use runtime::Dropper;

src/query/service/src/api/rpc/exchange/statistics_sender.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::Arc;
1818

1919
use async_channel::Receiver;
2020
use async_channel::Sender;
21+
use common_base::runtime::GlobalQueryRuntime;
2122
use common_base::runtime::TrySpawn;
2223
use common_catalog::table_context::TableContext;
2324
use common_exception::ErrorCode;
@@ -63,8 +64,8 @@ impl StatisticsSender {
6364
let shutdown_flag = self.shutdown_flag.clone();
6465
let shutdown_flag_receiver = self.shutdown_flag_receiver.clone();
6566

66-
let spawner = ctx.clone();
67-
spawner.spawn(async move {
67+
let spawner = GlobalQueryRuntime::instance();
68+
spawner.runtime().spawn(async move {
6869
let mut recv = Box::pin(flight_exchange.recv());
6970
let mut notified = Box::pin(shutdown_flag_receiver.recv());
7071

src/query/service/src/global_services.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use common_base::base::GlobalInstance;
1616
use common_base::runtime::GlobalIORuntime;
17+
use common_base::runtime::GlobalQueryRuntime;
1718
use common_catalog::catalog::CatalogManager;
1819
use common_config::GlobalConfig;
1920
use common_config::InnerConfig;
@@ -47,6 +48,7 @@ impl GlobalServices {
4748

4849
QueryLogger::init(app_name_shuffle, &config.log)?;
4950
GlobalIORuntime::init(config.storage.num_cpus as usize)?;
51+
GlobalQueryRuntime::init(config.storage.num_cpus as usize)?;
5052

5153
// Cluster discovery.
5254
ClusterDiscovery::init(config.clone()).await?;

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::time::Instant;
2020
use common_base::base::tokio;
2121
use common_base::base::tokio::sync::Mutex as TokioMutex;
2222
use common_base::base::tokio::sync::RwLock;
23+
use common_base::runtime::GlobalQueryRuntime;
2324
use common_base::runtime::TrySpawn;
2425
use common_catalog::table_context::StageAttachment;
2526
use common_exception::ErrorCode;
@@ -278,28 +279,37 @@ impl HttpQuery {
278279
let query_id_clone = id.clone();
279280

280281
let schema = ExecuteState::get_schema(&sql, ctx.clone()).await?;
281-
ctx.try_spawn(async move {
282-
let state = state_clone.clone();
283-
if let Err(e) =
284-
ExecuteState::try_start_query(state, &sql, session, ctx_clone.clone(), block_sender)
285-
.await
286-
{
287-
InterpreterQueryLog::fail_to_start(ctx_clone.clone(), e.clone());
288-
let state = ExecuteStopped {
289-
stats: Progresses::default(),
290-
reason: Err(e.clone()),
291-
stop_time: Instant::now(),
292-
affect: ctx_clone.get_affect(),
293-
};
294-
tracing::info!(
295-
"http query {}, change state to Stopped, fail to start {:?}",
296-
&query_id,
297-
e
298-
);
299-
Executor::start_to_stop(&state_clone, ExecuteState::Stopped(Box::new(state))).await;
300-
block_sender_closer.close();
301-
}
302-
})?;
282+
let http_query_runtime_instance = GlobalQueryRuntime::instance();
283+
http_query_runtime_instance
284+
.runtime()
285+
.try_spawn(async move {
286+
let state = state_clone.clone();
287+
if let Err(e) = ExecuteState::try_start_query(
288+
state,
289+
&sql,
290+
session,
291+
ctx_clone.clone(),
292+
block_sender,
293+
)
294+
.await
295+
{
296+
InterpreterQueryLog::fail_to_start(ctx_clone.clone(), e.clone());
297+
let state = ExecuteStopped {
298+
stats: Progresses::default(),
299+
reason: Err(e.clone()),
300+
stop_time: Instant::now(),
301+
affect: ctx_clone.get_affect(),
302+
};
303+
tracing::info!(
304+
"http query {}, change state to Stopped, fail to start {:?}",
305+
&query_id,
306+
e
307+
);
308+
Executor::start_to_stop(&state_clone, ExecuteState::Stopped(Box::new(state)))
309+
.await;
310+
block_sender_closer.close();
311+
}
312+
})?;
303313

304314
let format_settings = ctx.get_format_settings()?;
305315
let data = Arc::new(TokioMutex::new(PageManager::new(
@@ -319,6 +329,7 @@ impl HttpQuery {
319329
config,
320330
expire_state: Arc::new(TokioMutex::new(ExpireState::Working)),
321331
};
332+
322333
Ok(Arc::new(query))
323334
}
324335

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,11 @@ impl QueryContextShared {
270270
}
271271
}
272272

273+
pub fn get_runtime(&self) -> Option<Arc<Runtime>> {
274+
let query_runtime = self.runtime.read();
275+
(*query_runtime).clone()
276+
}
277+
273278
pub fn attach_query_str(&self, kind: String, query: String) {
274279
{
275280
let mut running_query = self.running_query.write();

src/query/service/src/sessions/session_info.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl Session {
3434

3535
let shared_query_context = &session_ctx.get_query_context_shared();
3636
if let Some(shared) = shared_query_context {
37-
if let Ok(runtime) = shared.try_get_runtime() {
37+
if let Some(runtime) = shared.get_runtime() {
3838
let mem_stat = runtime.get_tracker();
3939
memory_usage = mem_stat.get_memory_usage();
4040
}

0 commit comments

Comments
 (0)