Skip to content

Commit 29ff72a

Browse files
authored
feat: heartbeat to avoid query result timeout. (#17624)
feat: heartbeat to avoid query result timeout.
1 parent d57c648 commit 29ff72a

File tree

7 files changed

+277
-7
lines changed

7 files changed

+277
-7
lines changed

src/query/service/src/servers/http/middleware/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod session;
1818

1919
pub(crate) use metrics::MetricsMiddleware;
2020
pub(crate) use panic_handler::PanicHandler;
21+
pub(crate) use session::forward_request_with_body;
2122
pub use session::json_response;
2223
pub(crate) use session::sanitize_request_headers;
2324
pub use session::EndpointKind;

src/query/service/src/servers/http/middleware/session.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use headers::authorization::Credentials;
4141
use http::header::AUTHORIZATION;
4242
use http::HeaderMap;
4343
use http::HeaderValue;
44+
use http::Method;
4445
use http::StatusCode;
4546
use log::error;
4647
use log::info;
@@ -462,6 +463,26 @@ impl<E> HTTPSessionEndpoint<E> {
462463
}
463464

464465
async fn forward_request(mut req: Request, node: Arc<NodeInfo>) -> PoemResult<Response> {
466+
let body = req.take_body().into_bytes().await?;
467+
let mut headers = req.headers().clone();
468+
headers.remove(http::header::HOST);
469+
forward_request_with_body(
470+
node,
471+
&req.uri().to_string(),
472+
body,
473+
req.method().to_owned(),
474+
headers,
475+
)
476+
.await
477+
}
478+
479+
pub async fn forward_request_with_body<T: Into<reqwest::Body>>(
480+
node: Arc<NodeInfo>,
481+
uri: &str,
482+
body: T,
483+
method: Method,
484+
headers: HeaderMap,
485+
) -> PoemResult<Response> {
465486
let addr = node.http_address.clone();
466487
let config = GlobalConfig::instance();
467488
let scheme = if config.query.http_handler_tls_server_key.is_empty()
@@ -471,13 +492,13 @@ async fn forward_request(mut req: Request, node: Arc<NodeInfo>) -> PoemResult<Re
471492
} else {
472493
"https"
473494
};
474-
let url = format!("{scheme}://{addr}/v1{}", req.uri());
495+
let url = format!("{scheme}://{addr}/v1{}", uri);
475496

476497
let client = reqwest::Client::new();
477498
let reqwest_request = client
478-
.request(req.method().clone(), &url)
479-
.headers(req.headers().clone())
480-
.body(req.take_body().into_bytes().await?)
499+
.request(method, &url)
500+
.headers(headers)
501+
.body(body)
481502
.build()
482503
.map_err(|e| {
483504
HttpErrorCode::bad_request(ErrorCode::BadArguments(format!(

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,24 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
1517
use databend_common_base::base::mask_connection_info;
1618
use databend_common_base::headers::HEADER_QUERY_ID;
1719
use databend_common_base::headers::HEADER_QUERY_PAGE_ROWS;
1820
use databend_common_base::headers::HEADER_QUERY_STATE;
1921
use databend_common_base::runtime::drop_guard;
22+
use databend_common_base::runtime::execute_futures_in_parallel;
23+
use databend_common_base::version::DATABEND_SEMVER;
24+
use databend_common_config::GlobalConfig;
2025
use databend_common_exception::ErrorCode;
2126
use databend_common_expression::DataSchemaRef;
2227
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
2328
use fastrace::func_path;
2429
use fastrace::prelude::*;
2530
use highway::HighwayHash;
31+
use http::HeaderMap;
32+
use http::HeaderValue;
2633
use http::StatusCode;
2734
use log::error;
2835
use log::info;
@@ -37,6 +44,7 @@ use poem::web::Json;
3744
use poem::web::Path;
3845
use poem::EndpointExt;
3946
use poem::IntoResponse;
47+
use poem::Request;
4048
use poem::Route;
4149
use serde::Deserialize;
4250
use serde::Serialize;
@@ -45,8 +53,10 @@ use super::query::ExecuteStateKind;
4553
use super::query::HttpQueryRequest;
4654
use super::query::HttpQueryResponseInternal;
4755
use super::query::RemoveReason;
56+
use crate::clusters::ClusterDiscovery;
4857
use crate::servers::http::error::HttpErrorCode;
4958
use crate::servers::http::error::QueryError;
59+
use crate::servers::http::middleware::forward_request_with_body;
5060
use crate::servers::http::middleware::EndpointKind;
5161
use crate::servers::http::middleware::HTTPSessionMiddleware;
5262
use crate::servers::http::middleware::MetricsMiddleware;
@@ -135,6 +145,7 @@ pub struct QueryResponse {
135145
pub schema: Vec<QueryResponseField>,
136146
pub data: Vec<Vec<Option<String>>>,
137147
pub affect: Option<QueryAffect>,
148+
pub result_timeout_secs: Option<u64>,
138149

139150
pub stats: QueryStats,
140151

@@ -208,6 +219,7 @@ impl QueryResponse {
208219
kill_uri: Some(make_kill_uri(&id)),
209220
error: r.state.error.map(QueryError::from_error_code),
210221
has_result_set: r.state.has_result_set,
222+
result_timeout_secs: Some(r.result_timeout_secs),
211223
})
212224
.with_header(HEADER_QUERY_ID, id.clone())
213225
.with_header(HEADER_QUERY_STATE, state.state.to_string())
@@ -423,11 +435,116 @@ pub(crate) async fn query_handler(
423435
.await
424436
}
425437

438+
#[derive(Deserialize, Serialize, Debug)]
439+
struct HeartBeatRequest {
440+
node_to_queries: HashMap<String, Vec<String>>,
441+
}
442+
443+
#[derive(Deserialize, Serialize)]
444+
struct HeartBeatResponse {
445+
queries_to_remove: Vec<String>,
446+
}
447+
448+
/// /v1/session/heartbeat are used for 2 purpose:
449+
/// 1. heartbeat to avoid session token/temp table expire
450+
/// 2. heartbeat to avoid result timeout of queries in this session
426451
#[poem::handler]
427452
#[async_backtrace::framed]
428-
pub async fn heartbeat_handler() -> poem::error::Result<impl IntoResponse> {
429-
// work is already done in session manager
430-
Ok(())
453+
pub async fn heartbeat_handler(
454+
ctx: &HttpQueryContext,
455+
req: &Request,
456+
Json(body): Json<HeartBeatRequest>,
457+
) -> poem::error::Result<impl IntoResponse> {
458+
let local_id = GlobalConfig::instance().query.node_id.clone();
459+
let mut queries_to_remove = vec![];
460+
let mut nodes_to_forwards = vec![];
461+
for (node_id, queries) in body.node_to_queries {
462+
if node_id == local_id {
463+
queries_to_remove.extend(HttpQueryManager::instance().on_heartbeat(queries));
464+
} else if let Some(node) = ClusterDiscovery::instance()
465+
.find_node_by_id(&node_id)
466+
.await
467+
.map_err(HttpErrorCode::server_error)?
468+
{
469+
let mut node_to_queries = HashMap::new();
470+
node_to_queries.insert(node_id.to_string(), queries);
471+
let body = HeartBeatRequest { node_to_queries };
472+
let body = serde_json::to_vec(&body).unwrap();
473+
nodes_to_forwards.push((node, body));
474+
} else {
475+
queries_to_remove.extend(queries)
476+
}
477+
}
478+
479+
let num_task = nodes_to_forwards.len();
480+
if num_task > 0 {
481+
let mut tasks = Vec::with_capacity(num_task);
482+
let uri = req.uri().to_string();
483+
let method = req.method();
484+
let mut headers = HeaderMap::new();
485+
headers.insert(
486+
http::header::CONTENT_TYPE,
487+
HeaderValue::from_static("application/json"),
488+
);
489+
let agent = format!("databend-query/{}", *DATABEND_SEMVER);
490+
headers.insert(
491+
http::header::USER_AGENT,
492+
HeaderValue::from_str(&agent).unwrap(),
493+
);
494+
headers.insert(
495+
http::header::AUTHORIZATION,
496+
req.headers()
497+
.get(http::header::AUTHORIZATION)
498+
.expect("heartbeat request should contain auth header")
499+
.to_owned(),
500+
);
501+
for (node, body) in nodes_to_forwards {
502+
let uri = uri.clone();
503+
let method = method.clone();
504+
let headers = headers.clone();
505+
506+
tasks.push(async move {
507+
match forward_request_with_body(node, &uri, body, method, headers).await {
508+
Ok(mut resp) => {
509+
if resp.status() == StatusCode::OK {
510+
Some(
511+
resp.take_body()
512+
.into_json::<HeartBeatResponse>()
513+
.await
514+
.unwrap(),
515+
)
516+
} else {
517+
warn!("heartbeat forward fail: {:?}", resp);
518+
None
519+
}
520+
}
521+
Err(e) => {
522+
warn!("heartbeat forward error: {:?}", e);
523+
None
524+
}
525+
}
526+
});
527+
}
528+
let settings = ctx.session.get_settings();
529+
let num_threads = num_task.max(
530+
settings
531+
.get_max_threads()
532+
.map_err(HttpErrorCode::server_error)? as usize,
533+
);
534+
let responses = execute_futures_in_parallel(
535+
tasks,
536+
num_threads,
537+
num_threads * 2,
538+
"forward_heartbeat".to_owned(),
539+
)
540+
.await
541+
.map_err(HttpErrorCode::server_error)?;
542+
for response in responses.into_iter().flatten() {
543+
queries_to_remove.extend(response.queries_to_remove);
544+
}
545+
}
546+
547+
Ok(Json(HeartBeatResponse { queries_to_remove }).into_response())
431548
}
432549

433550
pub fn query_route() -> Route {

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ impl HttpQueryRequest {
121121
kill_uri: None,
122122
error: Some(QueryError::from_error_code(err)),
123123
has_result_set: None,
124+
result_timeout_secs: None,
124125
})
125126
}
126127
}
@@ -328,6 +329,7 @@ pub struct HttpQueryResponseInternal {
328329
pub session: Option<HttpSessionConf>,
329330
pub state: ResponseState,
330331
pub node_id: String,
332+
pub result_timeout_secs: u64,
331333
}
332334

333335
#[derive(Debug, Clone, Copy)]
@@ -337,6 +339,7 @@ pub enum ExpireState {
337339
Removed(RemoveReason),
338340
}
339341

342+
#[derive(Debug)]
340343
pub enum ExpireResult {
341344
Expired,
342345
Sleep(Duration),
@@ -613,6 +616,7 @@ impl HttpQuery {
613616
session: Some(session),
614617
node_id: self.node_id.clone(),
615618
session_id: self.session_id.clone(),
619+
result_timeout_secs: self.result_timeout_secs,
616620
})
617621
}
618622

@@ -626,6 +630,7 @@ impl HttpQuery {
626630
node_id: self.node_id.clone(),
627631
state,
628632
session: None,
633+
result_timeout_secs: self.result_timeout_secs,
629634
})
630635
}
631636

@@ -819,6 +824,21 @@ impl HttpQuery {
819824
}
820825
}
821826

827+
#[async_backtrace::framed]
828+
pub fn on_heartbeat(&self) -> bool {
829+
let mut expire_state = self.expire_state.lock();
830+
match *expire_state {
831+
ExpireState::ExpireAt(_) => {
832+
let duration = Duration::from_secs(self.result_timeout_secs);
833+
let deadline = Instant::now() + duration;
834+
*expire_state = ExpireState::ExpireAt(deadline);
835+
true
836+
}
837+
ExpireState::Removed(_) => false,
838+
ExpireState::Working => true,
839+
}
840+
}
841+
822842
pub fn check_client_session_id(&self, id: &Option<String>) -> poem::error::Result<()> {
823843
if *id != self.client_session_id {
824844
return Err(poem::error::Error::from_string(

src/query/service/src/servers/http/v1/query/http_query_manager.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,4 +277,19 @@ impl HttpQueryManager {
277277
}
278278
Ok(())
279279
}
280+
281+
pub(crate) fn on_heartbeat(&self, query_ids: Vec<String>) -> Vec<String> {
282+
let mut failed = vec![];
283+
for query_id in query_ids {
284+
if !self
285+
.queries
286+
.get(&query_id)
287+
.map(|q| q.on_heartbeat())
288+
.unwrap_or(false)
289+
{
290+
failed.push(query_id);
291+
}
292+
}
293+
failed
294+
}
280295
}

0 commit comments

Comments
 (0)