1414
1515use std:: collections:: HashMap ;
1616use std:: sync:: Arc ;
17- use std:: time:: Duration ;
18- use std:: time:: SystemTime ;
1917
2018use databend_common_base:: base:: GlobalInstance ;
2119use databend_common_base:: headers:: HEADER_DEDUPLICATE_LABEL ;
@@ -53,7 +51,6 @@ use opentelemetry::propagation::TextMapPropagator;
5351use opentelemetry_sdk:: propagation:: BaggagePropagator ;
5452use poem:: error:: ResponseError ;
5553use poem:: error:: Result as PoemResult ;
56- use poem:: web:: cookie:: Cookie ;
5754use poem:: web:: Json ;
5855use poem:: Addr ;
5956use poem:: Endpoint ;
@@ -70,8 +67,7 @@ use crate::clusters::ClusterDiscovery;
7067use crate :: servers:: http:: error:: HttpErrorCode ;
7168use crate :: servers:: http:: error:: JsonErrorOnly ;
7269use crate :: servers:: http:: error:: QueryError ;
73- use crate :: servers:: http:: v1:: unix_ts;
74- use crate :: servers:: http:: v1:: ClientSessionManager ;
70+ use crate :: servers:: http:: middleware:: session_header:: ClientSession ;
7571use crate :: servers:: http:: v1:: HttpQueryContext ;
7672use crate :: servers:: http:: v1:: SessionClaim ;
7773use crate :: servers:: login_history:: LoginEventType ;
@@ -81,9 +77,7 @@ use crate::servers::HttpHandlerKind;
8177use crate :: sessions:: SessionManager ;
8278const USER_AGENT : & str = "User-Agent" ;
8379const TRACE_PARENT : & str = "traceparent" ;
84- const COOKIE_LAST_REFRESH_TIME : & str = "last_refresh_time" ;
85- const COOKIE_SESSION_ID : & str = "session_id" ;
86- const COOKIE_COOKIE_ENABLED : & str = "cookie_enabled" ;
80+
8781#[ derive( Debug , Copy , Clone ) ]
8882pub enum EndpointKind {
8983 Login ,
@@ -352,12 +346,6 @@ pub struct HTTPSessionEndpoint<E> {
352346 pub auth_manager : Arc < AuthMgr > ,
353347}
354348
355- fn make_cookie ( name : impl Into < String > , value : impl Into < String > ) -> Cookie {
356- let mut cookie = Cookie :: new_with_str ( name, value) ;
357- cookie. set_path ( "/" ) ;
358- cookie
359- }
360-
361349impl < E > HTTPSessionEndpoint < E > {
362350 #[ async_backtrace:: framed]
363351 async fn auth (
@@ -371,6 +359,31 @@ impl<E> HTTPSessionEndpoint<E> {
371359 let node_id = GlobalConfig :: instance ( ) . query . node_id . clone ( ) ;
372360 login_history. client_ip = client_host. clone ( ) . unwrap_or_default ( ) ;
373361 login_history. node_id = node_id. clone ( ) ;
362+ let user_agent = req
363+ . headers ( )
364+ . get ( USER_AGENT )
365+ . map ( |id| id. to_str ( ) . unwrap ( ) . to_string ( ) ) ;
366+
367+ let is_worksheet = user_agent
368+ . as_ref ( )
369+ . map ( |ua_str| {
370+ [
371+ // only worksheet client run in browser.
372+ // most browser ua contain multi of them
373+ "Mozilla" ,
374+ "Chrome" ,
375+ "Firefox" ,
376+ "Safari" ,
377+ "Edge" ,
378+ // worksheet start query with ua like DatabendCloud/worksheet=4703;
379+ "worksheet" ,
380+ ]
381+ . iter ( )
382+ . any ( |kw| ua_str. contains ( kw) )
383+ } )
384+ . unwrap_or ( false ) ;
385+
386+ login_history. user_agent = user_agent. clone ( ) . unwrap_or_default ( ) ;
374387
375388 let credential = get_credential ( req, self . kind , self . endpoint_kind ) ?;
376389 login_history. auth_type = credential. type_name ( ) ;
@@ -383,14 +396,6 @@ impl<E> HTTPSessionEndpoint<E> {
383396 let tenant = Tenant :: new_or_err ( tenant_id. clone ( ) , func_name ! ( ) ) ?;
384397 session. set_current_tenant ( tenant) ;
385398 }
386-
387- // cookie_enabled is used to recognize old clients that not support cookie yet.
388- // for these old clients, there is no session id available, thus can not use temp table.
389- let cookie_enabled = req. cookie ( ) . get ( COOKIE_COOKIE_ENABLED ) . is_some ( ) ;
390- let cookie_session_id = req
391- . cookie ( )
392- . get ( COOKIE_SESSION_ID )
393- . map ( |s| s. value_str ( ) . to_string ( ) ) ;
394399 let ( user_name, authed_client_session_id) = self
395400 . auth_manager
396401 . auth (
@@ -401,75 +406,42 @@ impl<E> HTTPSessionEndpoint<E> {
401406 . await ?;
402407 login_history. user_name = user_name. clone ( ) ;
403408
404- // If cookie_session_id is set, we disable writing to login_history.
405- // The cookie_session_id is initially issued by the server to the client upon the first successful login.
406- // For all subsequent requests, the client includes this session_id with each request.
407- // This indicates the user is already logged in, so we skip recording another login event.
408- if cookie_session_id. is_some ( ) {
409- login_history. disable_write = true ;
409+ let mut client_session = ClientSession :: try_decode ( req) ?;
410+ if client_session. is_none ( ) && !matches ! ( self . endpoint_kind, EndpointKind :: PollQuery ) {
411+ info ! (
412+ "[HTTP-SESSION] got request without session, url={}, headers={:?}" ,
413+ req. uri( ) ,
414+ & req. headers( )
415+ ) ;
410416 }
411- let client_session_id = match ( & authed_client_session_id, & cookie_session_id) {
412- ( Some ( id1) , Some ( id2) ) => {
413- if id1 != id2 {
414- return Err ( ErrorCode :: AuthenticateFailure ( format ! (
415- "[HTTP-SESSION] Session ID mismatch: token session ID '{}' does not match cookie session ID '{}'" ,
416- id1, id2
417- ) ) ) ;
418- }
419- Some ( id1. clone ( ) )
420- }
421- ( Some ( id) , None ) => {
422- if cookie_enabled {
423- req. cookie ( ) . add ( make_cookie ( COOKIE_SESSION_ID , id) ) ;
424- }
425- Some ( id. clone ( ) )
426- }
427- ( None , Some ( id) ) => Some ( id. clone ( ) ) ,
428- ( None , None ) => {
429- if cookie_enabled {
430- let id = Uuid :: new_v4 ( ) . to_string ( ) ;
431- info ! ( "[HTTP-SESSION] Created new session with ID: {}" , id) ;
432- req. cookie ( ) . add ( make_cookie ( COOKIE_SESSION_ID , & id) ) ;
433- Some ( id)
434- } else {
435- None
436- }
437- }
438- } ;
439- login_history. session_id = client_session_id. clone ( ) . unwrap_or_default ( ) ;
440417
441- if let Some ( id) = & client_session_id {
442- session. set_client_session_id ( id. clone ( ) ) ;
418+ if let ( Some ( id1) , Some ( c) ) = ( & authed_client_session_id, & client_session) {
419+ if * id1 != c. header . id {
420+ return Err ( ErrorCode :: AuthenticateFailure ( format ! (
421+ "[HTTP-SESSION] Session ID mismatch: token session ID '{}' does not match header session ID '{}'" ,
422+ id1, c. header. id
423+ ) ) ) ;
424+ }
443425 }
444-
445- if cookie_enabled {
446- let last_refresh_time = req
447- . cookie ( )
448- . get ( COOKIE_LAST_REFRESH_TIME )
449- . map ( |s| s. value_str ( ) . to_string ( ) ) ;
450-
451- let need_update = if let Some ( ts) = & last_refresh_time {
452- let ts = ts. parse :: < u64 > ( ) . map_err ( |_| {
453- ErrorCode :: BadArguments ( format ! (
454- "[HTTP-SESSION] Invalid last_refresh_time value: {}" ,
455- ts
456- ) )
457- } ) ?;
458- let ts = SystemTime :: UNIX_EPOCH + Duration :: from_secs ( ts) ;
459- if let Some ( id) = & client_session_id {
460- ClientSessionManager :: instance ( )
461- . refresh_state ( session. get_current_tenant ( ) , id, & user_name, & ts)
462- . await ?
463- } else {
464- true
465- }
466- } else {
467- true
468- } ;
469- if need_update {
470- let ts = unix_ts ( ) . as_secs ( ) . to_string ( ) ;
471- req. cookie ( ) . add ( make_cookie ( COOKIE_LAST_REFRESH_TIME , ts) ) ;
426+ login_history. disable_write = false ;
427+ if let Some ( s) = & mut client_session {
428+ let sid = s. header . id . clone ( ) ;
429+ session. set_client_session_id ( sid. clone ( ) ) ;
430+ login_history. session_id = sid. clone ( ) ;
431+ if !s. is_new_session {
432+ // if session enabled by client:
433+ // log for the first request of the session.
434+ // else:
435+ // log every request, which can be distinguished by `session_id = ''`
436+ login_history. disable_write = true ;
472437 }
438+ s. try_refresh_state (
439+ session. get_current_tenant ( ) ,
440+ & user_name,
441+ req. cookie ( ) ,
442+ is_worksheet,
443+ )
444+ . await ?;
473445 }
474446
475447 let session = session_manager. register_session ( session) ?;
@@ -479,12 +451,6 @@ impl<E> HTTPSessionEndpoint<E> {
479451 . get ( HEADER_DEDUPLICATE_LABEL )
480452 . map ( |id| id. to_str ( ) . unwrap ( ) . to_string ( ) ) ;
481453
482- let user_agent = req
483- . headers ( )
484- . get ( USER_AGENT )
485- . map ( |id| id. to_str ( ) . unwrap ( ) . to_string ( ) ) ;
486- login_history. user_agent = user_agent. clone ( ) . unwrap_or_default ( ) ;
487-
488454 let expected_node_id = req
489455 . headers ( )
490456 . get ( HEADER_NODE_ID )
@@ -509,9 +475,11 @@ impl<E> HTTPSessionEndpoint<E> {
509475 http_method : req. method ( ) . to_string ( ) ,
510476 uri : req. uri ( ) . to_string ( ) ,
511477 client_host,
512- client_session_id,
478+ client_session_id : client_session . as_ref ( ) . map ( |s| s . header . id . clone ( ) ) ,
513479 user_name,
514480 is_sticky_node,
481+ client_session,
482+ fixed_coordinator_node : is_worksheet,
515483 } )
516484 }
517485}
@@ -698,8 +666,15 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
698666 Ok ( ctx) => {
699667 login_history. event_type = LoginEventType :: LoginSuccess ;
700668 login_history. write_to_log ( ) ;
669+ let client_session = ctx. client_session . clone ( ) ;
701670 req. extensions_mut ( ) . insert ( ctx) ;
702- self . ep . call ( req) . await . map ( |v| v. into_response ( ) )
671+ self . ep . call ( req) . await . map ( |v| {
672+ let mut r = v. into_response ( ) ;
673+ if let Some ( s) = client_session {
674+ s. on_response ( & mut r) ;
675+ }
676+ r
677+ } )
703678 }
704679 Err ( err) => {
705680 login_history. event_type = LoginEventType :: LoginFailed ;
0 commit comments