Skip to content

Commit c89c298

Browse files
committed
refactor: clarify the HTTP query lifecycle.
1 parent 213ccf8 commit c89c298

File tree

8 files changed

+299
-289
lines changed

8 files changed

+299
-289
lines changed

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 37 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use databend_common_base::runtime::ParentMemStat;
2727
use databend_common_base::runtime::ThreadTracker;
2828
use databend_common_base::runtime::GLOBAL_MEM_STAT;
2929
use databend_common_config::GlobalConfig;
30-
use databend_common_exception::ErrorCode;
3130
use databend_common_expression::DataSchemaRef;
3231
use databend_common_management::WorkloadGroupResourceManager;
3332
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
@@ -56,11 +55,11 @@ use serde::Deserialize;
5655
use serde::Serialize;
5756
use uuid::Uuid;
5857

58+
use super::query::CloseReason;
5959
use super::query::ExecuteStateKind;
6060
use super::query::HttpQuery;
6161
use super::query::HttpQueryRequest;
6262
use super::query::HttpQueryResponseInternal;
63-
use super::query::StopReason;
6463
use crate::clusters::ClusterDiscovery;
6564
use crate::servers::http::error::HttpErrorCode;
6665
use crate::servers::http::error::QueryError;
@@ -166,10 +165,10 @@ pub struct QueryResponse {
166165
}
167166

168167
impl QueryResponse {
169-
pub(crate) fn removed(query_id: &str, remove_reason: StopReason) -> impl IntoResponse {
168+
pub(crate) fn closed(query_id: &str, close_reason: CloseReason) -> impl IntoResponse {
170169
let id = query_id.to_string();
171-
let state = match remove_reason {
172-
StopReason::Finished => ExecuteStateKind::Succeeded,
170+
let state = match close_reason {
171+
CloseReason::Finalized => ExecuteStateKind::Succeeded,
173172
_ => ExecuteStateKind::Failed,
174173
};
175174
Json(QueryResponse {
@@ -198,7 +197,7 @@ impl QueryResponse {
198197
id: String,
199198
r: HttpQueryResponseInternal,
200199
is_final: bool,
201-
) -> (impl IntoResponse, bool) {
200+
) -> impl IntoResponse {
202201
let state = r.state.clone();
203202
let (data, next_uri) = if is_final {
204203
(Arc::new(BlocksSerializer::empty()), None)
@@ -248,12 +247,7 @@ impl QueryResponse {
248247
};
249248
let rows = data.num_rows();
250249

251-
let next_is_final = next_uri
252-
.as_ref()
253-
.map(|u| u.ends_with("final"))
254-
.unwrap_or(false);
255-
256-
let resp = Json(QueryResponse {
250+
Json(QueryResponse {
257251
data,
258252
state: state.state,
259253
schema: state.schema.clone(),
@@ -274,8 +268,7 @@ impl QueryResponse {
274268
})
275269
.with_header(HEADER_QUERY_ID, id.clone())
276270
.with_header(HEADER_QUERY_STATE, state.state.to_string())
277-
.with_header(HEADER_QUERY_PAGE_ROWS, rows);
278-
(resp, next_is_final)
271+
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
279272
}
280273
}
281274

@@ -305,11 +298,11 @@ async fn query_final_handler(
305298
);
306299
let http_query_manager = HttpQueryManager::instance();
307300
match http_query_manager
308-
.stop_query(
301+
.close_query(
309302
&query_id,
303+
CloseReason::Finalized,
310304
&ctx.client_session_id,
311-
StopReason::Finished,
312-
ErrorCode::ClosedQuery("Query closed by client"),
305+
true,
313306
)
314307
.await?
315308
{
@@ -320,7 +313,7 @@ async fn query_final_handler(
320313
// it is safe to set these 2 fields to None, because client now check for null/None first.
321314
response.session = None;
322315
response.state.affect = None;
323-
Ok(QueryResponse::from_internal(query_id, response, true).0)
316+
Ok(QueryResponse::from_internal(query_id, response, true))
324317
}
325318
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
326319
}
@@ -346,11 +339,11 @@ async fn query_cancel_handler(
346339
);
347340
let http_query_manager = HttpQueryManager::instance();
348341
match http_query_manager
349-
.stop_query(
342+
.close_query(
350343
&query_id,
344+
CloseReason::Canceled,
351345
&ctx.client_session_id,
352-
StopReason::Canceled,
353-
ErrorCode::AbortedQuery("canceled by client"),
346+
true,
354347
)
355348
.await?
356349
{
@@ -374,15 +367,13 @@ async fn query_state_handler(
374367
let http_query_manager = HttpQueryManager::instance();
375368
match http_query_manager.get_query(&query_id) {
376369
Some(query) => {
377-
if let Some(reason) = query.check_removed() {
378-
Ok(QueryResponse::removed(&query_id, reason).into_response())
370+
if let Some(reason) = query.check_closed() {
371+
Ok(QueryResponse::closed(&query_id, reason.reason).into_response())
379372
} else {
380373
let response = query
381374
.get_response_state_only()
382375
.map_err(HttpErrorCode::server_error)?;
383-
Ok(QueryResponse::from_internal(query_id, response, false)
384-
.0
385-
.into_response())
376+
Ok(QueryResponse::from_internal(query_id, response, false).into_response())
386377
}
387378
}
388379
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
@@ -423,18 +414,16 @@ async fn query_page_handler(
423414
}
424415

425416
query.check_client_session_id(&ctx.client_session_id)?;
426-
if let Some(reason) = query.check_removed() {
427-
log::info!(
428-
"[HTTP-QUERY] /query/{}/page/{} - query is removed (reason: {})",
429-
query_id,
430-
page_no,
431-
reason
417+
if let Some(st) = query.check_closed() {
418+
info!(
419+
"[HTTP-QUERY] /query/{}/page/{} - query is close (reason: {:?})",
420+
query_id, page_no, st
432421
);
433-
Err(query_id_removed(&query_id, reason))
422+
Err(query_id_closed(&query_id, st.reason))
434423
} else {
435-
query.update_expire_time(true).await;
424+
query.update_expire_time(true, false).await;
436425
let resp = query.get_response_page(page_no).await.map_err(|err| {
437-
log::info!(
426+
info!(
438427
"[HTTP-QUERY] /query/{}/page/{} - get response page error (reason: {})",
439428
query_id,
440429
page_no,
@@ -445,12 +434,10 @@ async fn query_page_handler(
445434
StatusCode::NOT_FOUND,
446435
)
447436
})?;
448-
query.update_expire_time(false).await;
449-
let (resp, next_is_final) = QueryResponse::from_internal(query_id, resp, false);
450-
if next_is_final {
451-
query.set_data_drained()
452-
}
453-
Ok(resp)
437+
query
438+
.update_expire_time(false, resp.is_data_drained())
439+
.await;
440+
Ok(QueryResponse::from_internal(query_id, resp, false))
454441
}
455442
}
456443
};
@@ -519,7 +506,6 @@ pub(crate) async fn query_handler(
519506
let http_query_manager = HttpQueryManager::instance();
520507
let query = http_query_manager.add_query(query).await;
521508

522-
// tmp workaround to tolerant old clients
523509
let resp = query
524510
.get_response_page(0)
525511
.await
@@ -542,13 +528,10 @@ pub(crate) async fn query_handler(
542528
info!("[HTTP-QUERY] Initial response for query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'",
543529
&query.id, &resp.state, rows, next_page, mask_connection_info(&sql)
544530
);
545-
query.update_expire_time(false).await;
546-
let (resp, next_is_final) =
547-
QueryResponse::from_internal(query.id.to_string(), resp, false);
548-
if next_is_final {
549-
query.set_data_drained()
550-
}
551-
Ok(resp.into_response())
531+
query
532+
.update_expire_time(false, resp.is_data_drained())
533+
.await;
534+
Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response())
552535
}
553536
}
554537
};
@@ -838,9 +821,12 @@ pub fn query_route() -> Route {
838821
route
839822
}
840823

841-
fn query_id_removed(query_id: &str, remove_reason: StopReason) -> PoemError {
824+
fn query_id_closed(query_id: &str, closed_reason: CloseReason) -> PoemError {
842825
PoemError::from_string(
843-
format!("[HTTP-QUERY] Query ID {query_id} {}", remove_reason),
826+
format!(
827+
"[HTTP-QUERY] Query {query_id} is closed for {}",
828+
closed_reason
829+
),
844830
StatusCode::BAD_REQUEST,
845831
)
846832
}

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ impl ExecuteState {
395395
let ctx_clone = ctx.clone();
396396
let block_sender_closer = block_sender.closer();
397397

398-
let res = Self::execute(
398+
let res = Self::pull_and_send(
399399
interpreter,
400400
plan.schema(),
401401
ctx_clone,
@@ -418,7 +418,7 @@ impl ExecuteState {
418418
}
419419

420420
#[fastrace::trace(name = "ExecuteState::execute")]
421-
async fn execute(
421+
async fn pull_and_send(
422422
interpreter: Arc<dyn Interpreter>,
423423
schema: DataSchemaRef,
424424
ctx: Arc<QueryContext>,

0 commit comments

Comments
 (0)