Skip to content

Commit b856a63

Browse files
authored
feat: /v1/status merge status of HttpQueryManager. (#18778)
- merge count and last_query_finished_at from http query manager - update last_query_finished_at each time a query finished (not need count=0), same with last_query_started_at
1 parent dc96b85 commit b856a63

File tree

12 files changed

+168
-135
lines changed

12 files changed

+168
-135
lines changed

scripts/ci/deploy/config/databend-query-node-1.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ http_handler_host = "0.0.0.0"
2828
http_handler_port = 8000
2929
# mainly for test/debug
3030
# http_session_timeout_secs = 90
31+
http_handler_result_timeout_secs = 60
3132

3233
# Databend Query FlightSQL Handler.
3334
flight_sql_handler_host = "0.0.0.0"

src/query/service/src/servers/admin/v1/instance_status.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use poem::IntoResponse;
1919
use serde::Deserialize;
2020
use serde::Serialize;
2121

22+
use crate::servers::HttpQueryManager;
2223
use crate::sessions::QueriesQueueManager;
2324
use crate::sessions::SessionManager;
2425

@@ -40,8 +41,6 @@ pub struct InstanceStatus {
4041
pub instance_started_at: u64,
4142
// the local timestamp, may be useful to avoid the clock drift issues
4243
pub instance_timestamp: u64,
43-
// the timestamp on last valid request to /v1/query
44-
pub last_query_request_at: Option<u64>,
4544
}
4645

4746
// lightweight way to get status
@@ -51,17 +50,27 @@ pub struct InstanceStatus {
5150
pub async fn instance_status_handler() -> poem::Result<impl IntoResponse> {
5251
let session_manager = SessionManager::instance();
5352
let queue_manager = QueriesQueueManager::instance();
53+
let (http_query_count, last_query_finished_at_http) = HttpQueryManager::instance().status();
5454
let status = session_manager.get_current_session_status();
55+
56+
let last_query_finished_at = [
57+
status.last_query_started_at.map(unix_timestamp_secs),
58+
last_query_finished_at_http,
59+
]
60+
.iter()
61+
.flatten()
62+
.max()
63+
.copied();
64+
5565
let status = InstanceStatus {
56-
running_queries_count: status.running_queries_count,
66+
running_queries_count: status.running_queries_count.max(http_query_count),
5767
active_sessions_count: status.active_sessions_count,
5868
queuing_queries_count: queue_manager.length() as u64,
5969
last_query_started_at: status.last_query_started_at.map(unix_timestamp_secs),
60-
last_query_finished_at: status.last_query_finished_at.map(unix_timestamp_secs),
70+
last_query_finished_at,
6171
max_running_query_executed_secs: status.max_running_query_executed_secs,
6272
instance_started_at: unix_timestamp_secs(status.instance_started_at),
6373
instance_timestamp: unix_timestamp_secs(SystemTime::now()),
64-
last_query_request_at: status.last_query_request_at,
6574
};
6675
Ok(Json(status))
6776
}

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ use super::query::ExecuteStateKind;
6060
use super::query::HttpQuery;
6161
use super::query::HttpQueryRequest;
6262
use super::query::HttpQueryResponseInternal;
63-
use super::query::RemoveReason;
63+
use super::query::StopReason;
6464
use crate::clusters::ClusterDiscovery;
6565
use crate::servers::http::error::HttpErrorCode;
6666
use crate::servers::http::error::QueryError;
@@ -93,7 +93,6 @@ use crate::servers::http::v1::HttpQueryManager;
9393
use crate::servers::http::v1::HttpSessionConf;
9494
use crate::servers::HttpHandlerKind;
9595
use crate::sessions::QueryAffect;
96-
use crate::sessions::SessionManager;
9796

9897
pub fn make_page_uri(query_id: &str, page_no: usize) -> String {
9998
format!("/v1/query/{}/page/{}", query_id, page_no)
@@ -167,10 +166,10 @@ pub struct QueryResponse {
167166
}
168167

169168
impl QueryResponse {
170-
pub(crate) fn removed(query_id: &str, remove_reason: RemoveReason) -> impl IntoResponse {
169+
pub(crate) fn removed(query_id: &str, remove_reason: StopReason) -> impl IntoResponse {
171170
let id = query_id.to_string();
172171
let state = match remove_reason {
173-
RemoveReason::Finished => ExecuteStateKind::Succeeded,
172+
StopReason::Finished => ExecuteStateKind::Succeeded,
174173
_ => ExecuteStateKind::Failed,
175174
};
176175
Json(QueryResponse {
@@ -306,10 +305,10 @@ async fn query_final_handler(
306305
);
307306
let http_query_manager = HttpQueryManager::instance();
308307
match http_query_manager
309-
.remove_query(
308+
.stop_query(
310309
&query_id,
311310
&ctx.client_session_id,
312-
RemoveReason::Finished,
311+
StopReason::Finished,
313312
ErrorCode::ClosedQuery("Query closed by client"),
314313
)
315314
.await?
@@ -347,10 +346,10 @@ async fn query_cancel_handler(
347346
);
348347
let http_query_manager = HttpQueryManager::instance();
349348
match http_query_manager
350-
.remove_query(
349+
.stop_query(
351350
&query_id,
352351
&ctx.client_session_id,
353-
RemoveReason::Canceled,
352+
StopReason::Canceled,
354353
ErrorCode::AbortedQuery("canceled by client"),
355354
)
356355
.await?
@@ -449,9 +448,8 @@ async fn query_page_handler(
449448
query.update_expire_time(false).await;
450449
let (resp, next_is_final) = QueryResponse::from_internal(query_id, resp, false);
451450
if next_is_final {
452-
query.wait_for_final()
451+
query.set_data_drained()
453452
}
454-
SessionManager::instance().new_query_request();
455453
Ok(resp)
456454
}
457455
}
@@ -501,7 +499,6 @@ pub(crate) async fn query_handler(
501499
client_session_id_info,
502500
mask_connection_info(&format!("{:?}", req))
503501
);
504-
SessionManager::instance().new_query_request();
505502
let sql = req.sql.clone();
506503

507504
match HttpQuery::try_create(ctx, req.clone()).await {
@@ -522,7 +519,6 @@ pub(crate) async fn query_handler(
522519
let http_query_manager = HttpQueryManager::instance();
523520
let query = http_query_manager.add_query(query).await;
524521

525-
query.update_expire_time(true).await;
526522
// tmp workaround to tolerant old clients
527523
let resp = query
528524
.get_response_page(0)
@@ -550,7 +546,7 @@ pub(crate) async fn query_handler(
550546
let (resp, next_is_final) =
551547
QueryResponse::from_internal(query.id.to_string(), resp, false);
552548
if next_is_final {
553-
query.wait_for_final()
549+
query.set_data_drained()
554550
}
555551
Ok(resp.into_response())
556552
}
@@ -842,7 +838,7 @@ pub fn query_route() -> Route {
842838
route
843839
}
844840

845-
fn query_id_removed(query_id: &str, remove_reason: RemoveReason) -> PoemError {
841+
fn query_id_removed(query_id: &str, remove_reason: StopReason) -> PoemError {
846842
PoemError::from_string(
847843
format!("[HTTP-QUERY] Query ID {query_id} {}", remove_reason),
848844
StatusCode::BAD_REQUEST,

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use serde::Serializer;
5252

5353
use super::execute_state::ExecutionError;
5454
use super::HttpQueryContext;
55-
use super::RemoveReason;
55+
use super::StopReason;
5656
use crate::servers::http::error::QueryError;
5757
use crate::servers::http::v1::http_query_handlers::QueryResponseField;
5858
use crate::servers::http::v1::query::blocks_serializer::BlocksSerializer;
@@ -405,15 +405,15 @@ impl HttpSessionConf {
405405
session.set_all_variables(state.get_variables()?)
406406
}
407407
if let Some(id) = state.last_query_ids.first() {
408-
if let Some(last_query) = http_query_manager.queries.get(id) {
408+
if let Some(last_query) = http_query_manager.get_query(id) {
409409
let state = *last_query.state.lock();
410410
if !matches!(
411411
state,
412-
HttpQueryState::Removed(RemoveReason::Finished | RemoveReason::Canceled)
412+
HttpQueryState::Stopped(StopReason::Finished | StopReason::Canceled)
413413
) {
414414
warn!(
415-
"[HTTP-QUERY] Last query id not finished yet, id = {}, state = {:?}",
416-
id, state
415+
"[HTTP-QUERY] Last query id not finished yet, id = {}, state = {:?}, is_data_drained={}",
416+
id, state, last_query.is_data_drained.load(Ordering::Relaxed)
417417
);
418418
}
419419
}
@@ -511,17 +511,15 @@ pub struct HttpQueryResponseInternal {
511511

512512
#[derive(Debug, Clone, Copy)]
513513
pub enum HttpQueryState {
514-
Working,
515-
WaitForFinal,
516514
ExpireAt(Instant),
517-
Removed(RemoveReason),
515+
Stopped(StopReason),
518516
}
519517

520518
#[derive(Debug)]
521519
pub enum ExpireResult {
522520
Expired,
523521
Sleep(Duration),
524-
Removed,
522+
Stopped,
525523
}
526524

527525
pub struct HttpQuery {
@@ -533,6 +531,7 @@ pub struct HttpQuery {
533531
executor: Arc<Mutex<Executor>>,
534532
page_manager: Arc<TokioMutex<PageManager>>,
535533
state: Arc<Mutex<HttpQueryState>>,
534+
is_data_drained: AtomicBool,
536535
/// The timeout for the query result polling. In the normal case, the client driver
537536
/// should fetch the paginated result in a timely manner, and the interval should not
538537
/// exceed this result_timeout_secs.
@@ -624,6 +623,8 @@ impl HttpQuery {
624623

625624
let settings = session.get_settings();
626625
let result_timeout_secs = settings.get_http_handler_result_timeout_secs()?;
626+
let deadline = Instant::now()
627+
+ Duration::from_secs(result_timeout_secs + req.pagination.wait_time_secs as u64);
627628

628629
Ok(HttpQuery {
629630
id: query_id,
@@ -635,11 +636,12 @@ impl HttpQuery {
635636
page_manager: Arc::new(TokioMutex::new(page_manager)),
636637
result_timeout_secs,
637638

638-
state: Arc::new(Mutex::new(HttpQueryState::Working)),
639+
state: Arc::new(Mutex::new(HttpQueryState::ExpireAt(deadline))),
639640
temp_tbl_mgr: session.temp_tbl_mgr().clone(),
640641
query_mem_stat: ctx.get_query_memory_tracking(),
641642
is_txn_mgr_saved: Default::default(),
642643
last_session_conf: Default::default(),
644+
is_data_drained: AtomicBool::new(false),
643645
})
644646
}
645647

@@ -847,35 +849,32 @@ impl HttpQuery {
847849

848850
#[async_backtrace::framed]
849851
pub async fn update_expire_time(&self, before_wait: bool) {
850-
let duration = Duration::from_secs(self.result_timeout_secs)
851-
+ if before_wait {
852-
Duration::from_secs(self.request.pagination.wait_time_secs as u64)
853-
} else {
854-
Duration::new(0, 0)
855-
};
856-
let deadline = Instant::now() + duration;
852+
let mut to = Duration::from_secs(self.result_timeout_secs);
853+
if before_wait {
854+
to += Duration::from_secs(self.request.pagination.wait_time_secs as u64)
855+
};
856+
let deadline = Instant::now() + to;
857857
let mut t = self.state.lock();
858858
*t = HttpQueryState::ExpireAt(deadline);
859859
}
860860

861-
pub fn mark_removed(&self, remove_reason: RemoveReason) -> bool {
861+
pub fn mark_stopped(&self, remove_reason: StopReason) -> bool {
862862
let mut t = self.state.lock();
863-
if !matches!(*t, HttpQueryState::Removed(_)) {
864-
*t = HttpQueryState::Removed(remove_reason);
863+
if !matches!(*t, HttpQueryState::Stopped(_)) {
864+
*t = HttpQueryState::Stopped(remove_reason);
865865
true
866866
} else {
867867
false
868868
}
869869
}
870870

871-
pub fn wait_for_final(&self) {
872-
let mut t = self.state.lock();
873-
*t = HttpQueryState::WaitForFinal;
871+
pub fn set_data_drained(&self) {
872+
self.is_data_drained.store(true, Ordering::Relaxed);
874873
}
875874

876-
pub fn check_removed(&self) -> Option<RemoveReason> {
875+
pub fn check_removed(&self) -> Option<StopReason> {
877876
let t = self.state.lock();
878-
if let HttpQueryState::Removed(r) = *t {
877+
if let HttpQueryState::Stopped(r) = *t {
879878
Some(r)
880879
} else {
881880
None
@@ -895,8 +894,7 @@ impl HttpQuery {
895894
ExpireResult::Sleep(expire_at - now)
896895
}
897896
}
898-
HttpQueryState::Removed(_) => ExpireResult::Removed,
899-
_ => ExpireResult::Sleep(Duration::from_secs(self.result_timeout_secs)),
897+
HttpQueryState::Stopped(_) => ExpireResult::Stopped,
900898
}
901899
}
902900

@@ -911,8 +909,7 @@ impl HttpQuery {
911909
*expire_state = HttpQueryState::ExpireAt(deadline);
912910
true
913911
}
914-
HttpQueryState::Removed(_) => false,
915-
HttpQueryState::Working | HttpQueryState::WaitForFinal => true,
912+
HttpQueryState::Stopped(_) => false,
916913
}
917914
}
918915

0 commit comments

Comments
 (0)