@@ -43,9 +43,16 @@ pub struct HandlerFactory {
4343}
4444
4545impl HandlerFactory {
46- pub fn new ( session_context : Arc < SessionContext > , auth_manager : Arc < AuthManager > , query_timeout : Option < std:: time:: Duration > ) -> Self {
47- let session_service =
48- Arc :: new ( DfSessionService :: new ( session_context, auth_manager. clone ( ) , query_timeout) ) ;
46+ pub fn new (
47+ session_context : Arc < SessionContext > ,
48+ auth_manager : Arc < AuthManager > ,
49+ query_timeout : Option < std:: time:: Duration > ,
50+ ) -> Self {
51+ let session_service = Arc :: new ( DfSessionService :: new (
52+ session_context,
53+ auth_manager. clone ( ) ,
54+ query_timeout,
55+ ) ) ;
4956 HandlerFactory { session_service }
5057 }
5158}
@@ -103,8 +110,6 @@ impl DfSessionService {
103110 }
104111 }
105112
106-
107-
108113 /// Check if the current user has permission to execute a query
109114 async fn check_query_permission < C > ( & self , client : & C , query : & str ) -> PgWireResult < ( ) >
110115 where
@@ -558,16 +563,19 @@ impl ExtendedQueryHandler for DfSessionService {
558563 . map_err ( |e| PgWireError :: ApiError ( Box :: new ( e) ) ) ?;
559564
560565 let dataframe = if let Some ( timeout) = self . query_timeout {
561- tokio:: time:: timeout ( timeout, self . session_context . execute_logical_plan ( optimised) )
562- . await
563- . map_err ( |_| {
564- PgWireError :: UserError ( Box :: new ( pgwire:: error:: ErrorInfo :: new (
565- "ERROR" . to_string ( ) ,
566- "57014" . to_string ( ) , // query_canceled error code
567- "canceling statement due to query timeout" . to_string ( ) ,
568- ) ) )
569- } ) ?
570- . map_err ( |e| PgWireError :: ApiError ( Box :: new ( e) ) ) ?
566+ tokio:: time:: timeout (
567+ timeout,
568+ self . session_context . execute_logical_plan ( optimised) ,
569+ )
570+ . await
571+ . map_err ( |_| {
572+ PgWireError :: UserError ( Box :: new ( pgwire:: error:: ErrorInfo :: new (
573+ "ERROR" . to_string ( ) ,
574+ "57014" . to_string ( ) , // query_canceled error code
575+ "canceling statement due to query timeout" . to_string ( ) ,
576+ ) ) )
577+ } ) ?
578+ . map_err ( |e| PgWireError :: ApiError ( Box :: new ( e) ) ) ?
571579 } else {
572580 match self . session_context . execute_logical_plan ( optimised) . await {
573581 Ok ( df) => df,
@@ -649,7 +657,7 @@ mod tests {
649657 let session_context = Arc :: new ( SessionContext :: new ( ) ) ;
650658 let auth_manager = Arc :: new ( AuthManager :: new ( ) ) ;
651659 let timeout = Some ( Duration :: from_secs ( 60 ) ) ;
652-
660+
653661 let factory = HandlerFactory :: new ( session_context, auth_manager, timeout) ;
654662 assert_eq ! ( factory. session_service. query_timeout, timeout) ;
655663 }
@@ -658,21 +666,20 @@ mod tests {
658666 fn test_session_service_timeout_configuration ( ) {
659667 let session_context = Arc :: new ( SessionContext :: new ( ) ) ;
660668 let auth_manager = Arc :: new ( AuthManager :: new ( ) ) ;
661-
669+
662670 // Test with timeout
663671 let service_with_timeout = DfSessionService :: new (
664- session_context. clone ( ) ,
665- auth_manager. clone ( ) ,
672+ session_context. clone ( ) ,
673+ auth_manager. clone ( ) ,
674+ Some ( Duration :: from_secs ( 45 ) ) ,
675+ ) ;
676+ assert_eq ! (
677+ service_with_timeout. query_timeout,
666678 Some ( Duration :: from_secs( 45 ) )
667679 ) ;
668- assert_eq ! ( service_with_timeout. query_timeout, Some ( Duration :: from_secs( 45 ) ) ) ;
669-
680+
670681 // Test without timeout (None)
671- let service_no_timeout = DfSessionService :: new (
672- session_context,
673- auth_manager,
674- None
675- ) ;
682+ let service_no_timeout = DfSessionService :: new ( session_context, auth_manager, None ) ;
676683 assert_eq ! ( service_no_timeout. query_timeout, None ) ;
677684 }
678685
@@ -681,17 +688,20 @@ mod tests {
681688 // Test 0 seconds = no timeout
682689 let opts_no_timeout = ServerOptions :: new ( ) . with_query_timeout_secs ( 0 ) ;
683690 assert_eq ! ( opts_no_timeout. query_timeout, None ) ;
684-
691+
685692 // Test positive seconds = Some(Duration)
686693 let opts_with_timeout = ServerOptions :: new ( ) . with_query_timeout_secs ( 60 ) ;
687- assert_eq ! ( opts_with_timeout. query_timeout, Some ( Duration :: from_secs( 60 ) ) ) ;
694+ assert_eq ! (
695+ opts_with_timeout. query_timeout,
696+ Some ( Duration :: from_secs( 60 ) )
697+ ) ;
688698 }
689699
690700 #[ test]
691701 fn test_max_connections_configuration ( ) {
692702 let opts = ServerOptions :: new ( ) . with_max_connections ( 500 ) ;
693703 assert_eq ! ( opts. max_connections, 500 ) ;
694-
704+
695705 let opts_default = ServerOptions :: default ( ) ;
696706 assert_eq ! ( opts_default. max_connections, 1000 ) ;
697707 }
0 commit comments