From c89c29876a86dfd0acb95b600f919629387c2869 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 26 Sep 2025 01:05:43 +0800 Subject: [PATCH] refactor: clarify the HTTP query lifecycle. --- .../servers/http/v1/http_query_handlers.rs | 88 +++---- .../servers/http/v1/query/execute_state.rs | 4 +- .../src/servers/http/v1/query/http_query.rs | 225 +++++++++++------- .../http/v1/query/http_query_manager.rs | 138 +++++------ .../service/src/servers/http/v1/query/mod.rs | 2 +- .../src/servers/http/v1/query/page_manager.rs | 12 +- .../it/servers/http/http_query_handlers.rs | 18 +- .../09_http_handler/test_09_0010_heartbeat.py | 101 ++++---- 8 files changed, 299 insertions(+), 289 deletions(-) diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index f827a364aad87..23af83e8dcc3c 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -27,7 +27,6 @@ use databend_common_base::runtime::ParentMemStat; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_config::GlobalConfig; -use databend_common_exception::ErrorCode; use databend_common_expression::DataSchemaRef; use databend_common_management::WorkloadGroupResourceManager; use databend_common_metrics::http::metrics_incr_http_response_errors_count; @@ -56,11 +55,11 @@ use serde::Deserialize; use serde::Serialize; use uuid::Uuid; +use super::query::CloseReason; use super::query::ExecuteStateKind; use super::query::HttpQuery; use super::query::HttpQueryRequest; use super::query::HttpQueryResponseInternal; -use super::query::StopReason; use crate::clusters::ClusterDiscovery; use crate::servers::http::error::HttpErrorCode; use crate::servers::http::error::QueryError; @@ -166,10 +165,10 @@ pub struct QueryResponse { } impl QueryResponse { - pub(crate) fn removed(query_id: &str, remove_reason: StopReason) -> impl IntoResponse { + pub(crate) fn closed(query_id: &str, close_reason: CloseReason) -> impl IntoResponse { let id = query_id.to_string(); - let state = match remove_reason { - StopReason::Finished => ExecuteStateKind::Succeeded, + let state = match close_reason { + CloseReason::Finalized => ExecuteStateKind::Succeeded, _ => ExecuteStateKind::Failed, }; Json(QueryResponse { @@ -198,7 +197,7 @@ impl QueryResponse { id: String, r: HttpQueryResponseInternal, is_final: bool, - ) -> (impl IntoResponse, bool) { + ) -> impl IntoResponse { let state = r.state.clone(); let (data, next_uri) = if is_final { (Arc::new(BlocksSerializer::empty()), None) @@ -248,12 +247,7 @@ impl QueryResponse { }; let rows = data.num_rows(); - let next_is_final = next_uri - .as_ref() - .map(|u| u.ends_with("final")) - .unwrap_or(false); - - let resp = Json(QueryResponse { + Json(QueryResponse { data, state: state.state, schema: state.schema.clone(), @@ -274,8 +268,7 @@ impl QueryResponse { }) .with_header(HEADER_QUERY_ID, id.clone()) .with_header(HEADER_QUERY_STATE, state.state.to_string()) - .with_header(HEADER_QUERY_PAGE_ROWS, rows); - (resp, next_is_final) + .with_header(HEADER_QUERY_PAGE_ROWS, rows) } } @@ -305,11 +298,11 @@ async fn query_final_handler( ); let http_query_manager = HttpQueryManager::instance(); match http_query_manager - .stop_query( + .close_query( &query_id, + CloseReason::Finalized, &ctx.client_session_id, - StopReason::Finished, - ErrorCode::ClosedQuery("Query closed by client"), + true, ) .await? { @@ -320,7 +313,7 @@ async fn query_final_handler( // it is safe to set these 2 fields to None, because client now check for null/None first. response.session = None; response.state.affect = None; - Ok(QueryResponse::from_internal(query_id, response, true).0) + Ok(QueryResponse::from_internal(query_id, response, true)) } None => Err(query_id_not_found(&query_id, &ctx.node_id)), } @@ -346,11 +339,11 @@ async fn query_cancel_handler( ); let http_query_manager = HttpQueryManager::instance(); match http_query_manager - .stop_query( + .close_query( &query_id, + CloseReason::Canceled, &ctx.client_session_id, - StopReason::Canceled, - ErrorCode::AbortedQuery("canceled by client"), + true, ) .await? { @@ -374,15 +367,13 @@ async fn query_state_handler( let http_query_manager = HttpQueryManager::instance(); match http_query_manager.get_query(&query_id) { Some(query) => { - if let Some(reason) = query.check_removed() { - Ok(QueryResponse::removed(&query_id, reason).into_response()) + if let Some(reason) = query.check_closed() { + Ok(QueryResponse::closed(&query_id, reason.reason).into_response()) } else { let response = query .get_response_state_only() .map_err(HttpErrorCode::server_error)?; - Ok(QueryResponse::from_internal(query_id, response, false) - .0 - .into_response()) + Ok(QueryResponse::from_internal(query_id, response, false).into_response()) } } None => Err(query_id_not_found(&query_id, &ctx.node_id)), @@ -423,18 +414,16 @@ async fn query_page_handler( } query.check_client_session_id(&ctx.client_session_id)?; - if let Some(reason) = query.check_removed() { - log::info!( - "[HTTP-QUERY] /query/{}/page/{} - query is removed (reason: {})", - query_id, - page_no, - reason + if let Some(st) = query.check_closed() { + info!( + "[HTTP-QUERY] /query/{}/page/{} - query is close (reason: {:?})", + query_id, page_no, st ); - Err(query_id_removed(&query_id, reason)) + Err(query_id_closed(&query_id, st.reason)) } else { - query.update_expire_time(true).await; + query.update_expire_time(true, false).await; let resp = query.get_response_page(page_no).await.map_err(|err| { - log::info!( + info!( "[HTTP-QUERY] /query/{}/page/{} - get response page error (reason: {})", query_id, page_no, @@ -445,12 +434,10 @@ async fn query_page_handler( StatusCode::NOT_FOUND, ) })?; - query.update_expire_time(false).await; - let (resp, next_is_final) = QueryResponse::from_internal(query_id, resp, false); - if next_is_final { - query.set_data_drained() - } - Ok(resp) + query + .update_expire_time(false, resp.is_data_drained()) + .await; + Ok(QueryResponse::from_internal(query_id, resp, false)) } } }; @@ -519,7 +506,6 @@ pub(crate) async fn query_handler( let http_query_manager = HttpQueryManager::instance(); let query = http_query_manager.add_query(query).await; - // tmp workaround to tolerant old clients let resp = query .get_response_page(0) .await @@ -542,13 +528,10 @@ pub(crate) async fn query_handler( info!("[HTTP-QUERY] Initial response for query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'", &query.id, &resp.state, rows, next_page, mask_connection_info(&sql) ); - query.update_expire_time(false).await; - let (resp, next_is_final) = - QueryResponse::from_internal(query.id.to_string(), resp, false); - if next_is_final { - query.set_data_drained() - } - Ok(resp.into_response()) + query + .update_expire_time(false, resp.is_data_drained()) + .await; + Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response()) } } }; @@ -838,9 +821,12 @@ pub fn query_route() -> Route { route } -fn query_id_removed(query_id: &str, remove_reason: StopReason) -> PoemError { +fn query_id_closed(query_id: &str, closed_reason: CloseReason) -> PoemError { PoemError::from_string( - format!("[HTTP-QUERY] Query ID {query_id} {}", remove_reason), + format!( + "[HTTP-QUERY] Query {query_id} is closed for {}", + closed_reason + ), StatusCode::BAD_REQUEST, ) } diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 0d38e40e71fa6..ad3f39063b9c0 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -395,7 +395,7 @@ impl ExecuteState { let ctx_clone = ctx.clone(); let block_sender_closer = block_sender.closer(); - let res = Self::execute( + let res = Self::pull_and_send( interpreter, plan.schema(), ctx_clone, @@ -418,7 +418,7 @@ impl ExecuteState { } #[fastrace::trace(name = "ExecuteState::execute")] - async fn execute( + async fn pull_and_send( interpreter: Arc, schema: DataSchemaRef, ctx: Arc, diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 09ac90ef25c2d..6395a5cd2ec5b 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -29,6 +29,7 @@ use databend_common_base::runtime::MemStat; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::session_type::SessionType; use databend_common_catalog::table_context::StageAttachment; +use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; @@ -51,8 +52,8 @@ use serde::Serialize; use serde::Serializer; use super::execute_state::ExecutionError; +use super::CloseReason; use super::HttpQueryContext; -use super::StopReason; use crate::servers::http::error::QueryError; use crate::servers::http::v1::http_query_handlers::QueryResponseField; use crate::servers::http::v1::query::blocks_serializer::BlocksSerializer; @@ -72,7 +73,6 @@ use crate::servers::http::v1::QueryResponse; use crate::servers::http::v1::QueryStats; use crate::sessions::QueryAffect; use crate::sessions::Session; -use crate::sessions::TableContext; fn default_as_true() -> bool { true @@ -406,14 +406,11 @@ impl HttpSessionConf { } if let Some(id) = state.last_query_ids.first() { if let Some(last_query) = http_query_manager.get_query(id) { - let state = *last_query.state.lock(); - if !matches!( - state, - HttpQueryState::Stopped(StopReason::Finished | StopReason::Canceled) - ) { + let state = *last_query.client_state.lock(); + if !matches!(state, ClientState::Closed(_)) { warn!( - "[HTTP-QUERY] Last query id not finished yet, id = {}, state = {:?}, is_data_drained={}", - id, state, last_query.is_data_drained.load(Ordering::Relaxed) + "[HTTP-QUERY] Last query id not finished yet, id = {}, state = {:?}", + id, state ); } } @@ -509,39 +506,76 @@ pub struct HttpQueryResponseInternal { pub result_timeout_secs: u64, } +impl HttpQueryResponseInternal { + pub fn is_data_drained(&self) -> bool { + self.data + .as_ref() + .map(|d| d.next_page_no.is_none()) + .unwrap_or(true) + } +} + +#[derive(Debug, Clone, Copy)] +pub enum ClientState { + LastAccess { + ts: Instant, + is_polling: bool, + // if is_drained (exec should be stopped): + // 1. no longer update ts. + // 2. heartbeat_handler should notify client that no need for heartbeat anymore. + is_drained: bool, + }, + // closed by client or server(timeout) + Closed(ClientStateClosed), +} + #[derive(Debug, Clone, Copy)] -pub enum HttpQueryState { - ExpireAt(Instant), - Stopped(StopReason), +pub struct ClientStateClosed { + ts: Instant, + pub reason: CloseReason, + is_drained: bool, +} + +impl ClientStateClosed { + pub(crate) fn error_code(&self, timeout: u64) -> ErrorCode { + match self.reason { + CloseReason::Finalized => ErrorCode::ClosedQuery("closed by client"), + CloseReason::Canceled => ErrorCode::AbortedQuery("canceled by client"), + CloseReason::TimedOut => ErrorCode::AbortedQuery(format!( + "timed out after {timeout} secs, drained = {}", + self.is_drained + )), + } + } } #[derive(Debug)] -pub enum ExpireResult { - Expired, +pub enum TimeoutResult { Sleep(Duration), - Stopped, + TimedOut, + Remove, } pub struct HttpQuery { pub(crate) id: String, pub(crate) user_name: String, pub(crate) client_session_id: Option, - pub(crate) node_id: String, - request: HttpQueryRequest, - executor: Arc>, - page_manager: Arc>, - state: Arc>, - is_data_drained: AtomicBool, - /// The timeout for the query result polling. In the normal case, the client driver - /// should fetch the paginated result in a timely manner, and the interval should not - /// exceed this result_timeout_secs. - pub(crate) result_timeout_secs: u64, - - pub(crate) is_txn_mgr_saved: AtomicBool, + pagination: PaginationConf, + // refs pub(crate) temp_tbl_mgr: TempTblMgrRef, pub(crate) query_mem_stat: Option>, + + // cache only + pub(crate) result_timeout_secs: u64, + + execute_state: Arc>, + + // client states + client_state: Arc>, + page_manager: Arc>, last_session_conf: Arc>>, + pub(crate) is_txn_mgr_saved: AtomicBool, } fn try_set_txn( @@ -592,8 +626,8 @@ impl HttpQuery { .await?; let query_id = http_ctx.query_id.clone(); let client_session_id = http_ctx.client_session_id.as_deref().unwrap_or("None"); - let node_id = ctx.get_cluster().local_id.clone(); let sql = &req.sql; + let node_id = &GlobalConfig::instance().query.node_id; info!(query_id = query_id, session_id = client_session_id, node_id = node_id, sql = sql; "[HTTP-QUERY] Creating new query"); // Stage attachment is used to carry the data payload to the INSERT/REPLACE statements. @@ -623,25 +657,25 @@ impl HttpQuery { let settings = session.get_settings(); let result_timeout_secs = settings.get_http_handler_result_timeout_secs()?; - let deadline = Instant::now() - + Duration::from_secs(result_timeout_secs + req.pagination.wait_time_secs as u64); Ok(HttpQuery { id: query_id, user_name: http_ctx.user_name.clone(), client_session_id: http_ctx.client_session_id.clone(), - node_id, - request: req, - executor, + pagination: req.pagination, + execute_state: executor, page_manager: Arc::new(TokioMutex::new(page_manager)), result_timeout_secs, - state: Arc::new(Mutex::new(HttpQueryState::ExpireAt(deadline))), + client_state: Arc::new(Mutex::new(ClientState::LastAccess { + ts: Instant::now(), + is_drained: false, + is_polling: true, + })), temp_tbl_mgr: session.temp_tbl_mgr().clone(), query_mem_stat: ctx.get_query_memory_tracking(), is_txn_mgr_saved: Default::default(), last_session_conf: Default::default(), - is_data_drained: AtomicBool::new(false), }) } @@ -665,7 +699,7 @@ impl HttpQuery { data, state, session, - node_id: self.node_id.clone(), + node_id: HttpQueryManager::instance().server_info.id.clone(), session_id: self.client_session_id.clone().unwrap_or_default(), result_timeout_secs: self.result_timeout_secs, }) @@ -677,7 +711,7 @@ impl HttpQuery { Ok(HttpQueryResponseInternal { data: None, session_id: self.client_session_id.clone().unwrap_or_default(), - node_id: self.node_id.clone(), + node_id: HttpQueryManager::instance().server_info.id.clone(), state, session: None, result_timeout_secs: self.result_timeout_secs, @@ -685,7 +719,7 @@ impl HttpQuery { } fn get_state(&self) -> ResponseState { - let state = self.executor.lock(); + let state = self.execute_state.lock(); state.get_response_state() } @@ -699,7 +733,7 @@ impl HttpQuery { // - query cache: last_query_id and result_scan let (session_state, is_stopped) = { - let executor = self.executor.lock(); + let executor = self.execute_state.lock(); let session_state = executor.get_session_state(); let is_stopped = matches!(executor.state, ExecuteState::Stopped(_)); @@ -731,7 +765,6 @@ impl HttpQuery { if is_stopped && txn_state == TxnState::Active - && !self.is_txn_mgr_saved.load(Ordering::Relaxed) && self .is_txn_mgr_saved .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) @@ -767,7 +800,7 @@ impl HttpQuery { async fn get_page(&self, page_no: usize) -> Result { let mut page_manager = self.page_manager.lock().await; let page = page_manager - .get_a_page(page_no, &self.request.pagination.get_wait_type()) + .get_a_page(page_no, &self.pagination.get_wait_type()) .await?; let response = ResponseData { page, @@ -778,7 +811,7 @@ impl HttpQuery { pub async fn start_query(&mut self, sql: String) -> Result<()> { let (block_sender, query_context) = { - let state = &mut self.executor.lock().state; + let state = &mut self.execute_state.lock().state; let ExecuteState::Starting(state) = state else { return Err(ErrorCode::Internal( "[HTTP-QUERY] Invalid query state: expected Starting state", @@ -790,7 +823,7 @@ impl HttpQuery { let query_session = query_context.get_current_session(); - let query_state = self.executor.clone(); + let query_state = self.execute_state.clone(); GlobalQueryRuntime::instance().runtime().try_spawn( async move { @@ -841,41 +874,45 @@ impl HttpQuery { #[async_backtrace::framed] pub async fn kill(&self, reason: ErrorCode) { - // the query will be removed from the query manager before the session is dropped. - self.page_manager.lock().await.detach().await; - - Executor::stop(&self.executor, Err(reason)); + // stop execution first to avoid page_handler get wrong EOF + Executor::stop(&self.execute_state, Err(reason)); + self.page_manager.lock().await.close().await; } #[async_backtrace::framed] - pub async fn update_expire_time(&self, before_wait: bool) { - let mut to = Duration::from_secs(self.result_timeout_secs); - if before_wait { - to += Duration::from_secs(self.request.pagination.wait_time_secs as u64) - }; - let deadline = Instant::now() + to; - let mut t = self.state.lock(); - *t = HttpQueryState::ExpireAt(deadline); - } - - pub fn mark_stopped(&self, remove_reason: StopReason) -> bool { - let mut t = self.state.lock(); - if !matches!(*t, HttpQueryState::Stopped(_)) { - *t = HttpQueryState::Stopped(remove_reason); - true - } else { - false + pub async fn update_expire_time(&self, is_polling: bool, drained: bool) { + let now = Instant::now(); + let mut t = self.client_state.lock(); + if let ClientState::LastAccess { is_drained, .. } = *t { + if !is_drained { + *t = ClientState::LastAccess { + is_drained: drained, + is_polling, + ts: now, + } + } } } - pub fn set_data_drained(&self) { - self.is_data_drained.store(true, Ordering::Relaxed); + pub fn mark_closed(&self, reason: CloseReason) -> Option { + let mut t = self.client_state.lock(); + match *t { + ClientState::Closed(_) => None, + ClientState::LastAccess { is_drained, .. } => { + let st = ClientStateClosed { + ts: Instant::now(), + is_drained, + reason, + }; + *t = ClientState::Closed(st); + Some(st) + } + } } - - pub fn check_removed(&self) -> Option { - let t = self.state.lock(); - if let HttpQueryState::Stopped(r) = *t { - Some(r) + pub fn check_closed(&self) -> Option { + let t = self.client_state.lock(); + if let ClientState::Closed(st) = *t { + Some(st) } else { None } @@ -883,33 +920,49 @@ impl HttpQuery { // return Duration to sleep #[async_backtrace::framed] - pub async fn check_expire(&self) -> ExpireResult { - let expire_state = self.state.lock(); + pub async fn check_timeout(&self) -> TimeoutResult { + let now = Instant::now(); + let expire_state = self.client_state.lock(); match *expire_state { - HttpQueryState::ExpireAt(expire_at) => { - let now = Instant::now(); + ClientState::LastAccess { ts, is_polling, .. } => { + let mut expire_at = ts + Duration::from_secs(self.result_timeout_secs); + if is_polling { + expire_at += Duration::from_secs(self.pagination.wait_time_secs as u64); + } if now >= expire_at { - ExpireResult::Expired + TimeoutResult::TimedOut } else { - ExpireResult::Sleep(expire_at - now) + TimeoutResult::Sleep(expire_at - now) + } + } + ClientState::Closed(st) => { + let to = match st.reason { + CloseReason::Finalized => 30, + _ => self.result_timeout_secs, + }; + let expire_at = st.ts + Duration::from_secs(to); + if now >= expire_at { + TimeoutResult::Remove + } else { + TimeoutResult::Sleep(expire_at - now) } } - HttpQueryState::Stopped(_) => ExpireResult::Stopped, } } + /// return true if no need to send heartbeat anymore #[async_backtrace::framed] #[fastrace::trace(name = "HttpQuery::on_heartbeat")] pub fn on_heartbeat(&self) -> bool { - let mut expire_state = self.state.lock(); - match *expire_state { - HttpQueryState::ExpireAt(_) => { - let duration = Duration::from_secs(self.result_timeout_secs); - let deadline = Instant::now() + duration; - *expire_state = HttpQueryState::ExpireAt(deadline); - true + let mut client_state = self.client_state.lock(); + match &mut *client_state { + ClientState::LastAccess { is_drained, ts, .. } => { + if !(*is_drained) { + *ts = Instant::now() + } + *is_drained } - HttpQueryState::Stopped(_) => false, + ClientState::Closed { .. } => true, } } diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs index b093cc5073d3b..1ae1b37c38f91 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::HashMap; -use std::collections::VecDeque; +use std::fmt; use std::fmt::Display; use std::fmt::Formatter; use std::sync::Arc; @@ -34,83 +34,71 @@ use databend_storages_common_session::TxnManagerRef; use parking_lot::Mutex; use parking_lot::RwLock; -use crate::servers::http::v1::query::http_query::ExpireResult; +use crate::servers::http::v1::query::http_query::ClientStateClosed; use crate::servers::http::v1::query::http_query::HttpQuery; use crate::servers::http::v1::query::http_query::ServerInfo; +use crate::servers::http::v1::query::http_query::TimeoutResult; #[derive(Clone, Debug, Copy, Eq, PartialEq)] -pub enum StopReason { - Timeout, +pub enum CloseReason { + Finalized, Canceled, - Finished, + TimedOut, } -impl Display for StopReason { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}", format!("{self:?}").to_lowercase()) - } -} - -pub struct LimitedQueue { - deque: VecDeque, - max_size: usize, -} - -impl LimitedQueue { - fn new(max_size: usize) -> Self { - LimitedQueue { - deque: VecDeque::new(), - max_size, - } - } - - fn push(&mut self, item: T) -> Option { - self.deque.push_back(item); - if self.deque.len() > self.max_size { - self.deque.pop_front() - } else { - None +impl Display for CloseReason { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + CloseReason::Finalized => write!(f, "finalized"), + CloseReason::Canceled => write!(f, "canceled"), + CloseReason::TimedOut => write!(f, "timed out"), } } } + #[derive(Default)] struct Queries { - #[allow(clippy::type_complexity)] - queries: HashMap>, + active: HashMap>, num_active_queries: u64, last_query_end_at: Option, } impl Queries { pub(crate) fn get(&self, query_id: &str) -> Option> { - self.queries.get(query_id).cloned() + self.active.get(query_id).cloned() } pub(crate) fn insert(&mut self, query: Arc) { self.num_active_queries += 1; - self.queries.insert(query.id.clone(), query); + self.active.insert(query.id.clone(), query); } pub(crate) fn remove(&mut self, query_id: &str) -> Option> { - self.queries.remove(query_id) + self.active.remove(query_id) } - pub(crate) fn stop( + pub(crate) fn close( &mut self, query_id: &str, - reason: StopReason, + reason: CloseReason, now: u64, - ) -> (Option>, bool) { - let q = self.queries.get(query_id).cloned(); + client_session_id: &Option, + check_client_session_id: bool, + ) -> poem::error::Result<(Option>, Option)> { + let q = self.active.get(query_id).cloned(); if let Some(q) = q { - let stop_first_run = q.mark_stopped(reason); - if stop_first_run { + if check_client_session_id { + q.check_client_session_id(client_session_id)?; + } + let closed_state = q.mark_closed(reason); + if let Some(st) = closed_state { self.last_query_end_at = Some(now); self.num_active_queries = self.num_active_queries.saturating_sub(1); + log::info!("[HTTP-QUERY] Query {query_id} closed: {st:?}"); } - return (Some(q), stop_first_run); + return Ok((Some(q), closed_state)); } - (None, false) + Ok((None, None)) } pub(crate) fn status(&self) -> (u64, Option) { @@ -122,7 +110,6 @@ pub struct HttpQueryManager { pub(crate) start_instant: Instant, pub(crate) server_info: ServerInfo, queries: RwLock, - removed_queries: Mutex>, #[allow(clippy::type_complexity)] pub(crate) txn_managers: Arc)>>>, } @@ -137,7 +124,6 @@ impl HttpQueryManager { start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Millis, false), }, queries: Default::default(), - removed_queries: Mutex::new(LimitedQueue::new(1000)), txn_managers: Arc::new(Mutex::new(HashMap::new())), })); @@ -163,7 +149,6 @@ impl HttpQueryManager { let self_clone = self.clone(); let query_id_clone = query.id.clone(); - let query_result_timeout_secs = query.result_timeout_secs; // downgrade to weak reference // it may cannot destroy with final or kill when we hold ref of Arc @@ -175,62 +160,53 @@ impl HttpQueryManager { None => { break; } - Some(query) => query.check_expire().await, + Some(query) => query.check_timeout().await, }; match expire_res { - ExpireResult::Expired => { - let msg = format!( - "[HTTP-QUERY] Query {} timed out after {} seconds", - &query_id_clone, query_result_timeout_secs - ); + TimeoutResult::TimedOut => { _ = self_clone - .stop_query( - &query_id_clone, - &None, - StopReason::Timeout, - ErrorCode::AbortedQuery(&msg), - ) + .close_query(&query_id_clone, CloseReason::TimedOut, &None, false) .await .ok(); - break; } - ExpireResult::Sleep(t) => { + TimeoutResult::Sleep(t) => { sleep(t).await; } - ExpireResult::Stopped => { + TimeoutResult::Remove => { + let mut queries = self_clone.queries.write(); + queries.remove(&query_id_clone); + log::info!("[HTTP-QUERY] Query {query_id_clone} removed"); break; } } } }); - query } #[async_backtrace::framed] - pub(crate) async fn stop_query( + pub(crate) async fn close_query( self: &Arc, query_id: &str, + reason: CloseReason, client_session_id: &Option, - reason: StopReason, - error: ErrorCode, + check_client_session_id: bool, ) -> poem::error::Result>> { let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("Time went backwards") .as_secs(); - let (query, stop_first_run) = self.queries.write().stop(query_id, reason, now); + let (query, closed_state) = self.queries.write().close( + query_id, + reason, + now, + client_session_id, + check_client_session_id, + )?; if let Some(q) = &query { - if reason != StopReason::Timeout { - q.check_client_session_id(client_session_id)?; - } - if stop_first_run { - q.kill(error).await; - let mut queue = self.removed_queries.lock(); - if let Some(to_evict) = queue.push(q.id.to_string()) { - self.queries.write().remove(&to_evict); - }; + if let Some(st) = closed_state { + q.kill(st.error_code(q.result_timeout_secs)).await; } } Ok(query) @@ -289,18 +265,18 @@ impl HttpQueryManager { } pub(crate) fn on_heartbeat(&self, query_ids: Vec) -> Vec { - let mut failed = vec![]; + let mut to_stop = vec![]; for query_id in query_ids { - if !self + let stop_heartbeat = self .queries .read() .get(&query_id) .map(|q| q.on_heartbeat()) - .unwrap_or(false) - { - failed.push(query_id); + .unwrap_or(true); + if stop_heartbeat { + to_stop.push(query_id); } } - failed + to_stop } } diff --git a/src/query/service/src/servers/http/v1/query/mod.rs b/src/query/service/src/servers/http/v1/query/mod.rs index 69e6fbeb8165c..559b7e67b4914 100644 --- a/src/query/service/src/servers/http/v1/query/mod.rs +++ b/src/query/service/src/servers/http/v1/query/mod.rs @@ -34,8 +34,8 @@ pub use http_query::HttpQueryResponseInternal; pub use http_query::HttpSessionConf; pub use http_query::HttpSessionStateInternal; pub use http_query_context::HttpQueryContext; +pub(crate) use http_query_manager::CloseReason; pub use http_query_manager::HttpQueryManager; -pub(crate) use http_query_manager::StopReason; pub use page_manager::PageManager; pub use page_manager::ResponseData; pub use sized_spsc::Wait; diff --git a/src/query/service/src/servers/http/v1/query/page_manager.rs b/src/query/service/src/servers/http/v1/query/page_manager.rs index 503f493e21748..7cb1d0e032caf 100644 --- a/src/query/service/src/servers/http/v1/query/page_manager.rs +++ b/src/query/service/src/servers/http/v1/query/page_manager.rs @@ -97,11 +97,11 @@ impl PageManager { ); } - self.total_rows += num_row; let page = Page { data: Arc::new(serializer), }; if num_row > 0 { + self.total_rows += num_row; self.total_pages += 1; self.last_page = Some(page.clone()); } @@ -111,10 +111,9 @@ impl PageManager { // when end is set to true, client should recv a response with next_url = final_url // but the response may be lost and client will retry, // we simply return an empty page. - let page = Page { + Ok(Page { data: Arc::new(BlocksSerializer::empty()), - }; - Ok(page) + }) } } else if page_no + 1 == next_no { // later, there may be other ways to ack and drop the last page except collect_new_page. @@ -136,14 +135,12 @@ impl PageManager { } #[async_backtrace::framed] - pub async fn detach(&mut self) { + pub async fn close(&mut self) { log::info!( target: "result-set-spill", "[RESULT-SET-SPILL] Query completed total_pages={}, total_rows={}", self.total_pages, self.total_rows ); - - self.last_page = None; if let Some(spiller) = self.receiver.close() { let start_time = std::time::Instant::now(); match spiller.cleanup().await { @@ -163,5 +160,6 @@ impl PageManager { } } }; + self.last_page = None; } } diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index 7a70db1f35919..87202bf6522aa 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -702,7 +702,11 @@ async fn test_result_timeout() -> Result<()> { .build(); let _fixture = TestFixture::setup_with_config(&config).await?; - let json = serde_json::json!({ "sql": "SELECT 1", "pagination": {"wait_time_secs": 5}, "session": { "settings": {"http_handler_result_timeout_secs": "1"}}}); + let json = serde_json::json!({ + "sql": "SELECT * from numbers(5)", + "pagination": {"wait_time_secs": 5, "max_rows_per_page": 1,}, + "session": { "settings": {"http_handler_result_timeout_secs": "1"}}} + ); let mut req = TestHttpQueryRequest::new(json); assert_eq!(req.status().await.running_queries_count, 0); let (status, result, _) = req.fetch_begin().await?; @@ -710,19 +714,9 @@ async fn test_result_timeout() -> Result<()> { assert_eq!(status, StatusCode::OK, "{:?}", result); assert_eq!(result.data.len(), 1); - sleep(std::time::Duration::from_secs(10)).await; + sleep(std::time::Duration::from_secs(6)).await; let status = req.status().await; assert_eq!(status.running_queries_count, 0); - - let query_id = result.id.clone(); - // fail to get page 0 again (e.g. retry) due to timeout - let (status, _, body) = req - .do_request(Method::GET, &format!("/v1/query/{query_id}/page/0",)) - .await?; - assert_eq!(status, StatusCode::BAD_REQUEST, "{:?}", body); - assert!(body.contains("timeout"), "{}", body); - // let (status, result, _) = req.fetch_next().await?; - // assert_eq!(status, StatusCode::BAD_REQUEST, "{:?}", result); Ok(()) } diff --git a/tests/nox/suites/1_stateful/09_http_handler/test_09_0010_heartbeat.py b/tests/nox/suites/1_stateful/09_http_handler/test_09_0010_heartbeat.py index 31f9ce4acc4dd..9796d7aca2389 100644 --- a/tests/nox/suites/1_stateful/09_http_handler/test_09_0010_heartbeat.py +++ b/tests/nox/suites/1_stateful/09_http_handler/test_09_0010_heartbeat.py @@ -1,15 +1,11 @@ import requests import time -from suites.utils import comparison_output auth = ("root", "") STICKY_HEADER = "X-DATABEND-STICKY-NODE" - -session = {"settings": {"http_handler_result_timeout_secs": "3", "max_threads": "32"}} - - -def do_query(query, port=8000): +def do_query(query, to, port=8000): + session = {"settings": {"http_handler_result_timeout_secs": f"{to}", "max_threads": "32"}} url = f"http://localhost:{port}/v1/query" query_payload = { "sql": query, @@ -27,57 +23,64 @@ def do_query(query, port=8000): response = requests.post(url, headers=headers, json=query_payload, auth=auth) return response.json() +def do_hb(resps): + m = {} + for resp in resps: + m.setdefault(resp.get("node_id"), []).append(resp.get("id")) + headers = { + "Content-Type": "application/json", + } + hb_uri = f"http://localhost:8000/v1/session/heartbeat" + payload = {"node_to_queries": m} + return requests.post(hb_uri, headers=headers, json=payload, auth=auth).json() -@comparison_output( - """started query 0 -started query 1 -sending heartbeat 0 -sending heartbeat 1 -sending heartbeat 2 -sending heartbeat 3 -sending heartbeat 4 -sending heartbeat 5 -sending heartbeat 6 -sending heartbeat 7 -sending heartbeat 8 -sending heartbeat 9 -continue fetch 0 -continue fetch 1 -end -""" -) def test_heartbeat(): - query_resp = do_query("select count(*) from system.clusters") - num_nodes = int(query_resp.get("data")[0][0]) + # t = 0 + timeout_0 = 8 + resp0 = do_query("select count(*) from system.clusters", timeout_0) + num_nodes = int(resp0.get("data")[0][0]) port = 8000 if num_nodes == 1 else 8002 + timeout_short = 3 - resp1 = do_query("select * from numbers(100)") - print("started query 0") - # print(resp1.get("node_id"), resp1.get("id")) - resp2 = do_query("select * from numbers(100)", port=port) - print("started query 1") - # print(resp1.get("node_id"), resp1.get("id")) - # print(resp2.get("node_id"), resp2.get("id")) + resp1 = do_query("select * from numbers(100)", timeout_short) + resp2 = do_query("select * from numbers(100)", timeout_short, port=port) - url = f"http://localhost:8000/v1/session/heartbeat" - m = {} - m.setdefault(resp1.get("node_id"), []).append(resp1.get("id")) - m.setdefault(resp2.get("node_id"), []).append(resp2.get("id")) - payload = {"node_to_queries": m} - headers = { - "Content-Type": "application/json", - } - for i in range(10): - print(f"sending heartbeat {i}") - response = requests.post(url, headers=headers, json=payload, auth=auth).json() - assert len(response.get("queries_to_remove")) == 0 + for i in range(timeout_0 - 2): + response = do_hb([resp1, resp2]) + assert len(response.get("queries_to_remove")) == 0, f"heartbeat {i}: {response}" time.sleep(1) + # t = timeout_0 - 2 + # query 1,2 not timed out, because of hb for i, r in enumerate([resp1, resp2]): - print(f"continue fetch {i}") headers = {STICKY_HEADER: r.get("node_id")} next_uri = f"http://localhost:8000/{r.get('next_uri')}?" response = requests.get(next_uri, headers=headers, auth=auth) - assert response.status_code == 200, f"{response.status_code} {response.text}" - assert len(response.json().get("data")) > 0 - print("end") + assert response.status_code == 200, f"query {i}:{response.status_code} {response.text}" + response = response.json() + assert len(response.get("data")) > 0, f"query {i}: {response}" + # query 0 not timed out, but drained, no need to hb + response = do_hb([resp0]) + assert len(response.get("queries_to_remove")) == 1, f"resp0: {response}" + time.sleep(4) + + # t = timeout_0 + 2 + # query 0,1,2 timed out + response = do_hb([resp0]) + assert len(response.get("queries_to_remove")) == 1, f"resp0: {response}" + + # final return ok even if timed out + final_uri = f"http://localhost:8000/{resp0.get('final_uri')}?" + headers = {STICKY_HEADER: resp0.get("node_id")} + response = requests.get(final_uri, headers=headers, auth=auth) + assert response.status_code == 200, f"{response}" + response = response.json() + assert response["error"] is None + + # next return fail + next_uri = f"http://localhost:8000/{resp1.get('next_uri')}?" + headers = {STICKY_HEADER: resp1.get("node_id")} + response = requests.get(next_uri, headers=headers, auth=auth) + assert response.status_code == 400, f"{response.text}" + response = response.json() + assert "timed" in response["error"]["message"], f"{response}"