Skip to content

Commit 72b9f23

Browse files
committed
refactor: polish HTTP handler client state
1 parent e386f1f commit 72b9f23

File tree

8 files changed

+244
-275
lines changed

8 files changed

+244
-275
lines changed

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 28 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use databend_common_base::runtime::ParentMemStat;
2727
use databend_common_base::runtime::ThreadTracker;
2828
use databend_common_base::runtime::GLOBAL_MEM_STAT;
2929
use databend_common_config::GlobalConfig;
30-
use databend_common_exception::ErrorCode;
3130
use databend_common_expression::DataSchemaRef;
3231
use databend_common_management::WorkloadGroupResourceManager;
3332
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
@@ -56,11 +55,11 @@ use serde::Deserialize;
5655
use serde::Serialize;
5756
use uuid::Uuid;
5857

58+
use super::query::CloseReason;
5959
use super::query::ExecuteStateKind;
6060
use super::query::HttpQuery;
6161
use super::query::HttpQueryRequest;
6262
use super::query::HttpQueryResponseInternal;
63-
use super::query::StopReason;
6463
use crate::clusters::ClusterDiscovery;
6564
use crate::servers::http::error::HttpErrorCode;
6665
use crate::servers::http::error::QueryError;
@@ -166,10 +165,10 @@ pub struct QueryResponse {
166165
}
167166

168167
impl QueryResponse {
169-
pub(crate) fn removed(query_id: &str, remove_reason: StopReason) -> impl IntoResponse {
168+
pub(crate) fn removed(query_id: &str, remove_reason: CloseReason) -> impl IntoResponse {
170169
let id = query_id.to_string();
171170
let state = match remove_reason {
172-
StopReason::Finished => ExecuteStateKind::Succeeded,
171+
CloseReason::Finalized => ExecuteStateKind::Succeeded,
173172
_ => ExecuteStateKind::Failed,
174173
};
175174
Json(QueryResponse {
@@ -198,7 +197,7 @@ impl QueryResponse {
198197
id: String,
199198
r: HttpQueryResponseInternal,
200199
is_final: bool,
201-
) -> (impl IntoResponse, bool) {
200+
) -> impl IntoResponse {
202201
let state = r.state.clone();
203202
let (data, next_uri) = if is_final {
204203
(Arc::new(BlocksSerializer::empty()), None)
@@ -248,12 +247,7 @@ impl QueryResponse {
248247
};
249248
let rows = data.num_rows();
250249

251-
let next_is_final = next_uri
252-
.as_ref()
253-
.map(|u| u.ends_with("final"))
254-
.unwrap_or(false);
255-
256-
let resp = Json(QueryResponse {
250+
Json(QueryResponse {
257251
data,
258252
state: state.state,
259253
schema: state.schema.clone(),
@@ -274,8 +268,7 @@ impl QueryResponse {
274268
})
275269
.with_header(HEADER_QUERY_ID, id.clone())
276270
.with_header(HEADER_QUERY_STATE, state.state.to_string())
277-
.with_header(HEADER_QUERY_PAGE_ROWS, rows);
278-
(resp, next_is_final)
271+
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
279272
}
280273
}
281274

@@ -305,12 +298,7 @@ async fn query_final_handler(
305298
);
306299
let http_query_manager = HttpQueryManager::instance();
307300
match http_query_manager
308-
.stop_query(
309-
&query_id,
310-
&ctx.client_session_id,
311-
StopReason::Finished,
312-
ErrorCode::ClosedQuery("Query closed by client"),
313-
)
301+
.close_query(&query_id, &ctx.client_session_id, CloseReason::Finalized)
314302
.await?
315303
{
316304
Some(query) => {
@@ -320,7 +308,7 @@ async fn query_final_handler(
320308
// it is safe to set these 2 fields to None, because client now check for null/None first.
321309
response.session = None;
322310
response.state.affect = None;
323-
Ok(QueryResponse::from_internal(query_id, response, true).0)
311+
Ok(QueryResponse::from_internal(query_id, response, true))
324312
}
325313
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
326314
}
@@ -346,12 +334,7 @@ async fn query_cancel_handler(
346334
);
347335
let http_query_manager = HttpQueryManager::instance();
348336
match http_query_manager
349-
.stop_query(
350-
&query_id,
351-
&ctx.client_session_id,
352-
StopReason::Canceled,
353-
ErrorCode::AbortedQuery("canceled by client"),
354-
)
337+
.close_query(&query_id, &ctx.client_session_id, CloseReason::Canceled)
355338
.await?
356339
{
357340
Some(_) => Ok(StatusCode::OK),
@@ -374,15 +357,13 @@ async fn query_state_handler(
374357
let http_query_manager = HttpQueryManager::instance();
375358
match http_query_manager.get_query(&query_id) {
376359
Some(query) => {
377-
if let Some(reason) = query.check_removed() {
378-
Ok(QueryResponse::removed(&query_id, reason).into_response())
360+
if let Some(reason) = query.check_closed() {
361+
Ok(QueryResponse::removed(&query_id, reason.reason).into_response())
379362
} else {
380363
let response = query
381364
.get_response_state_only()
382365
.map_err(HttpErrorCode::server_error)?;
383-
Ok(QueryResponse::from_internal(query_id, response, false)
384-
.0
385-
.into_response())
366+
Ok(QueryResponse::from_internal(query_id, response, false).into_response())
386367
}
387368
}
388369
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
@@ -423,18 +404,16 @@ async fn query_page_handler(
423404
}
424405

425406
query.check_client_session_id(&ctx.client_session_id)?;
426-
if let Some(reason) = query.check_removed() {
427-
log::info!(
428-
"[HTTP-QUERY] /query/{}/page/{} - query is removed (reason: {})",
429-
query_id,
430-
page_no,
431-
reason
407+
if let Some(st) = query.check_closed() {
408+
info!(
409+
"[HTTP-QUERY] /query/{}/page/{} - query is close (reason: {:?})",
410+
query_id, page_no, st
432411
);
433-
Err(query_id_removed(&query_id, reason))
412+
Err(query_id_removed(&query_id, st.reason))
434413
} else {
435-
query.update_expire_time(true).await;
414+
query.update_expire_time(true, false).await;
436415
let resp = query.get_response_page(page_no).await.map_err(|err| {
437-
log::info!(
416+
info!(
438417
"[HTTP-QUERY] /query/{}/page/{} - get response page error (reason: {})",
439418
query_id,
440419
page_no,
@@ -445,12 +424,10 @@ async fn query_page_handler(
445424
StatusCode::NOT_FOUND,
446425
)
447426
})?;
448-
query.update_expire_time(false).await;
449-
let (resp, next_is_final) = QueryResponse::from_internal(query_id, resp, false);
450-
if next_is_final {
451-
query.set_data_drained()
452-
}
453-
Ok(resp)
427+
query
428+
.update_expire_time(false, resp.is_data_drained())
429+
.await;
430+
Ok(QueryResponse::from_internal(query_id, resp, false))
454431
}
455432
}
456433
};
@@ -519,7 +496,6 @@ pub(crate) async fn query_handler(
519496
let http_query_manager = HttpQueryManager::instance();
520497
let query = http_query_manager.add_query(query).await;
521498

522-
// tmp workaround to tolerant old clients
523499
let resp = query
524500
.get_response_page(0)
525501
.await
@@ -542,13 +518,10 @@ pub(crate) async fn query_handler(
542518
info!("[HTTP-QUERY] Initial response for query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'",
543519
&query.id, &resp.state, rows, next_page, mask_connection_info(&sql)
544520
);
545-
query.update_expire_time(false).await;
546-
let (resp, next_is_final) =
547-
QueryResponse::from_internal(query.id.to_string(), resp, false);
548-
if next_is_final {
549-
query.set_data_drained()
550-
}
551-
Ok(resp.into_response())
521+
query
522+
.update_expire_time(false, resp.is_data_drained())
523+
.await;
524+
Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response())
552525
}
553526
}
554527
};
@@ -838,7 +811,7 @@ pub fn query_route() -> Route {
838811
route
839812
}
840813

841-
fn query_id_removed(query_id: &str, remove_reason: StopReason) -> PoemError {
814+
fn query_id_removed(query_id: &str, remove_reason: CloseReason) -> PoemError {
842815
PoemError::from_string(
843816
format!("[HTTP-QUERY] Query ID {query_id} {}", remove_reason),
844817
StatusCode::BAD_REQUEST,

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub enum ExecuteState {
104104
Starting(ExecuteStarting),
105105
Running(ExecuteRunning),
106106
Stopped(Box<ExecuteStopped>),
107+
// TombStone(Box<ExecuteStopped>),
107108
}
108109

109110
impl ExecuteState {
@@ -395,7 +396,7 @@ impl ExecuteState {
395396
let ctx_clone = ctx.clone();
396397
let block_sender_closer = block_sender.closer();
397398

398-
let res = Self::execute(
399+
let res = Self::pull_and_send(
399400
interpreter,
400401
plan.schema(),
401402
ctx_clone,
@@ -418,7 +419,7 @@ impl ExecuteState {
418419
}
419420

420421
#[fastrace::trace(name = "ExecuteState::execute")]
421-
async fn execute(
422+
async fn pull_and_send(
422423
interpreter: Arc<dyn Interpreter>,
423424
schema: DataSchemaRef,
424425
ctx: Arc<QueryContext>,

0 commit comments

Comments
 (0)