@@ -39,6 +39,7 @@ use databend_common_metrics::http::metrics_incr_http_response_errors_count;
3939use databend_common_settings:: ScopeLevel ;
4040use databend_storages_common_session:: TxnState ;
4141use http:: StatusCode ;
42+ use log:: error;
4243use log:: info;
4344use log:: warn;
4445use parking_lot:: Mutex ;
@@ -434,11 +435,19 @@ fn try_set_txn(
434435impl HttpQuery {
435436 #[ async_backtrace:: framed]
436437 #[ fastrace:: trace]
437- pub async fn try_create ( ctx : & HttpQueryContext , req : HttpQueryRequest ) -> Result < HttpQuery > {
438+ pub async fn try_create (
439+ http_ctx : & HttpQueryContext ,
440+ req : HttpQueryRequest ,
441+ ) -> Result < HttpQuery > {
438442 let http_query_manager = HttpQueryManager :: instance ( ) ;
439- let session = ctx. upgrade_session ( SessionType :: HTTPQuery ) . map_err ( |err| {
440- ErrorCode :: Internal ( format ! ( "[HTTP-QUERY] Failed to upgrade session: {err}" ) )
441- } ) ?;
443+ let client_session_id = http_ctx. client_session_id . as_deref ( ) . unwrap_or ( "None" ) ;
444+ let query_id = http_ctx. query_id . clone ( ) ;
445+ let session = http_ctx
446+ . upgrade_session ( SessionType :: HTTPQuery )
447+ . map_err ( |err| {
448+ ErrorCode :: Internal ( format ! ( "[HTTP-QUERY] Failed to upgrade session: {err}" ) )
449+ } ) ?;
450+ let session_id = session. get_id ( ) ;
442451
443452 // Read the session variables in the request, and set them to the current session.
444453 // the session variables includes:
@@ -492,24 +501,36 @@ impl HttpQuery {
492501 }
493502 }
494503
495- try_set_txn ( & ctx. query_id , & session, session_conf, & http_query_manager) ?;
504+ try_set_txn (
505+ & http_ctx. query_id ,
506+ & session,
507+ session_conf,
508+ & http_query_manager,
509+ ) ?;
496510
497511 if session_conf. need_sticky
498512 && matches ! ( session_conf. txn_state, None | Some ( TxnState :: AutoCommit ) )
499513 {
500- http_query_manager. check_sticky_for_temp_table ( & session_conf. last_server_info ) ?;
514+ http_query_manager
515+ . check_sticky_for_temp_table ( & session_conf. last_server_info )
516+ . map_err ( |e| {
517+ let msg = format ! (
518+ "[TEMP TABLE] invalid session, session_id={} query_id={}, error={e}, is_sticky_node={}" ,
519+ client_session_id, query_id, http_ctx. is_sticky_node,
520+ ) ;
521+ error ! ( "{}, session_state={:?}" , msg, session_conf) ;
522+ ErrorCode :: InvalidSessionState ( msg)
523+ } ) ?;
501524 }
502525 } ;
503526
504527 let settings = session. get_settings ( ) ;
505528 let result_timeout_secs = settings. get_http_handler_result_timeout_secs ( ) ?;
506- let deduplicate_label = & ctx. deduplicate_label ;
507- let user_agent = & ctx. user_agent ;
508- let query_id = ctx. query_id . clone ( ) ;
529+ let deduplicate_label = & http_ctx. deduplicate_label ;
530+ let user_agent = & http_ctx. user_agent ;
509531
510- session. set_client_host ( ctx . client_host . clone ( ) ) ;
532+ session. set_client_host ( http_ctx . client_host . clone ( ) ) ;
511533
512- let http_ctx = ctx;
513534 let ctx = session. create_query_context ( ) . await ?;
514535
515536 // Deduplicate label is used on the DML queries which may be retried by the client.
@@ -526,10 +547,9 @@ impl HttpQuery {
526547 // TODO: validate the query_id to be uuid format
527548 ctx. update_init_query_id ( query_id. clone ( ) ) ;
528549
529- let session_id = session. get_id ( ) . clone ( ) ;
530550 let node_id = ctx. get_cluster ( ) . local_id . clone ( ) ;
531551 let sql = & req. sql ;
532- info ! ( query_id = query_id, session_id = session_id , node_id = node_id, sql = sql; "[HTTP-QUERY] Creating new query" ) ;
552+ info ! ( query_id = query_id, session_id = client_session_id , node_id = node_id, sql = sql; "[HTTP-QUERY] Creating new query" ) ;
533553
534554 // Stage attachment is used to carry the data payload to the INSERT/REPLACE statements.
535555 // When stage attachment is specified, the query may looks like `INSERT INTO mytbl VALUES;`,
0 commit comments