Skip to content

Commit 1cac084

Browse files
authored
chore(query): use the same memory stat for http_query_handler and http_page_handler (#17855)
* refactor(query): split http query create and start * chore(query): use same mem state for query_handler and query_page_handler
1 parent 33dc48c commit 1cac084

File tree

9 files changed

+169
-137
lines changed

9 files changed

+169
-137
lines changed

โ€Žsrc/query/service/src/servers/http/middleware/session.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ use databend_common_base::headers::HEADER_STICKY;
2525
use databend_common_base::headers::HEADER_TENANT;
2626
use databend_common_base::headers::HEADER_VERSION;
2727
use databend_common_base::headers::HEADER_WAREHOUSE;
28-
use databend_common_base::runtime::defer;
29-
use databend_common_base::runtime::MemStat;
3028
use databend_common_base::runtime::ThreadTracker;
3129
use databend_common_config::GlobalConfig;
3230
use databend_common_exception::ErrorCode;
@@ -636,16 +634,6 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
636634
.map(|id| id.to_str().unwrap().to_string())
637635
.unwrap_or_else(|| Uuid::new_v4().to_string());
638636

639-
let query_mem_stat = MemStat::create(format!("Query-{}", query_id));
640-
let mut tracking_payload = ThreadTracker::new_tracking_payload();
641-
tracking_payload.query_id = Some(query_id.clone());
642-
tracking_payload.mem_stat = Some(query_mem_stat.clone());
643-
644-
let _guard = ThreadTracker::tracking(tracking_payload);
645-
let _guard2 = defer(move || {
646-
query_mem_stat.set_limit(0);
647-
});
648-
649637
ThreadTracker::tracking_future(async move {
650638
match self.auth(&req, query_id).await {
651639
Ok(ctx) => {

โ€Žsrc/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 71 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_base::headers::HEADER_QUERY_PAGE_ROWS;
2121
use databend_common_base::headers::HEADER_QUERY_STATE;
2222
use databend_common_base::runtime::drop_guard;
2323
use databend_common_base::runtime::execute_futures_in_parallel;
24+
use databend_common_base::runtime::ThreadTracker;
2425
use databend_common_config::GlobalConfig;
2526
use databend_common_exception::ErrorCode;
2627
use databend_common_expression::DataSchemaRef;
@@ -51,6 +52,7 @@ use serde::Deserialize;
5152
use serde::Serialize;
5253

5354
use super::query::ExecuteStateKind;
55+
use super::query::HttpQuery;
5456
use super::query::HttpQueryRequest;
5557
use super::query::HttpQueryResponseInternal;
5658
use super::query::RemoveReason;
@@ -354,39 +356,58 @@ async fn query_page_handler(
354356
Path((query_id, page_no)): Path<(String, usize)>,
355357
) -> PoemResult<impl IntoResponse> {
356358
ctx.check_node_id(&query_id)?;
357-
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
358-
let _t = SlowRequestLogTracker::new(ctx);
359+
// tracing in middleware
359360

360-
async {
361-
let http_query_manager = HttpQueryManager::instance();
362-
match http_query_manager.get_query(&query_id) {
363-
Some(query) => {
364-
if query.user_name != ctx.user_name {
365-
return Err(poem::error::Error::from_string(
366-
format!(
367-
"wrong user, query {} expect {}, got {}",
368-
query_id, query.user_name, ctx.user_name
369-
),
370-
StatusCode::UNAUTHORIZED,
371-
));
372-
}
373-
query.check_client_session_id(&ctx.client_session_id)?;
374-
if let Some(reason) = query.check_removed() {
375-
Err(query_id_removed(&query_id, reason))
376-
} else {
377-
query.update_expire_time(true).await;
378-
let resp = query.get_response_page(page_no).await.map_err(|err| {
379-
poem::Error::from_string(err.message(), StatusCode::NOT_FOUND)
380-
})?;
381-
query.update_expire_time(false).await;
382-
Ok(QueryResponse::from_internal(query_id, resp, false))
383-
}
361+
let http_query_manager = HttpQueryManager::instance();
362+
363+
let Some(query) = http_query_manager.get_query(&query_id) else {
364+
return Err(query_id_not_found(&query_id, &ctx.node_id));
365+
};
366+
367+
let query_mem_stat = query.query_mem_stat.clone();
368+
369+
let query_page_handle = {
370+
let query_id = query_id.clone();
371+
async move {
372+
if query.user_name != ctx.user_name {
373+
return Err(poem::error::Error::from_string(
374+
format!(
375+
"wrong user, query {} expect {}, got {}",
376+
query_id, query.user_name, ctx.user_name
377+
),
378+
StatusCode::UNAUTHORIZED,
379+
));
380+
}
381+
382+
query.check_client_session_id(&ctx.client_session_id)?;
383+
if let Some(reason) = query.check_removed() {
384+
Err(query_id_removed(&query_id, reason))
385+
} else {
386+
query.update_expire_time(true).await;
387+
let resp = query.get_response_page(page_no).await.map_err(|err| {
388+
poem::Error::from_string(err.message(), StatusCode::NOT_FOUND)
389+
})?;
390+
query.update_expire_time(false).await;
391+
Ok(QueryResponse::from_internal(query_id, resp, false))
384392
}
385-
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
386393
}
387-
}
388-
.in_span(root)
389-
.await
394+
};
395+
396+
let query_page_handle = {
397+
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
398+
let _t = SlowRequestLogTracker::new(ctx);
399+
query_page_handle.in_span(root)
400+
};
401+
402+
let query_page_handle = {
403+
let mut tracking_payload = ThreadTracker::new_tracking_payload();
404+
tracking_payload.mem_stat = query_mem_stat;
405+
tracking_payload.query_id = Some(query_id.clone());
406+
let _tracking_guard = ThreadTracker::tracking(tracking_payload);
407+
ThreadTracker::tracking_future(query_page_handle)
408+
};
409+
410+
query_page_handle.await
390411
}
391412

392413
#[poem::handler]
@@ -399,18 +420,29 @@ pub(crate) async fn query_handler(
399420
let _t = SlowRequestLogTracker::new(ctx);
400421

401422
async {
402-
let agent_info = ctx.user_agent.as_ref().map(|s|(format!("(from {s})"))).unwrap_or("".to_string());
403-
let client_session_id_info = ctx.client_session_id.as_ref().map(|s|(format!("(client_session_id={s})"))).unwrap_or("".to_string());
423+
let agent_info = ctx.user_agent.as_ref().map(|s| (format!("(from {s})"))).unwrap_or("".to_string());
424+
let client_session_id_info = ctx.client_session_id.as_ref().map(|s| (format!("(client_session_id={s})"))).unwrap_or("".to_string());
404425
info!("http query new request{}{}: {}", agent_info, client_session_id_info, mask_connection_info(&format!("{:?}", req)));
405-
let http_query_manager = HttpQueryManager::instance();
406426
let sql = req.sql.clone();
407427

408-
let query = http_query_manager
409-
.try_create_query(ctx, req.clone())
410-
.await
411-
.map_err(|err| err.display_with_sql(&sql));
412-
match query {
413-
Ok(query) => {
428+
match HttpQuery::try_create(ctx, req.clone()).await {
429+
Err(err) => {
430+
let err = err.display_with_sql(&sql);
431+
error!("http query fail to start sql, error: {:?}", err);
432+
ctx.set_fail();
433+
Ok(req.fail_to_start_sql(err).into_response())
434+
}
435+
Ok(mut query) => {
436+
if let Err(err) = query.start_query(sql.clone()).await {
437+
let err = err.display_with_sql(&sql);
438+
error!("http query fail to start sql, error: {:?}", err);
439+
ctx.set_fail();
440+
return Ok(req.fail_to_start_sql(err).into_response());
441+
}
442+
443+
let http_query_manager = HttpQueryManager::instance();
444+
let query = http_query_manager.add_query(query).await;
445+
414446
query.update_expire_time(true).await;
415447
// tmp workaround to tolerant old clients
416448
let resp = query
@@ -431,11 +463,6 @@ pub(crate) async fn query_handler(
431463
query.update_expire_time(false).await;
432464
Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response())
433465
}
434-
Err(e) => {
435-
error!("http query fail to start sql, error: {:?}", e);
436-
ctx.set_fail();
437-
Ok(req.fail_to_start_sql(e).into_response())
438-
}
439466
}
440467
}
441468
.in_span(root)

โ€Žsrc/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ impl ExecuteState {
115115

116116
pub struct ExecuteStarting {
117117
pub(crate) ctx: Arc<QueryContext>,
118+
pub(crate) sender: SizedChannelSender<DataBlock>,
118119
}
119120

120121
pub struct ExecuteRunning {
@@ -238,7 +239,7 @@ impl Executor {
238239

239240
pub fn get_query_duration_ms(&self) -> i64 {
240241
match &self.state {
241-
Starting(ExecuteStarting { ctx }) | Running(ExecuteRunning { ctx, .. }) => {
242+
Starting(ExecuteStarting { ctx, .. }) | Running(ExecuteRunning { ctx, .. }) => {
242243
ctx.get_query_duration_ms()
243244
}
244245
Stopped(f) => f.query_duration_ms,

0 commit comments

Comments
ย (0)