@@ -2,7 +2,7 @@ use cubesql::compile::DatabaseProtocol;
22use cubesql:: compile:: { convert_sql_to_cube_query, get_df_batches} ;
33use cubesql:: config:: processing_loop:: ShutdownMode ;
44use cubesql:: config:: ConfigObj ;
5- use cubesql:: sql:: SessionManager ;
5+ use cubesql:: sql:: { Session , SessionManager } ;
66use cubesql:: transport:: TransportService ;
77use futures:: StreamExt ;
88
@@ -178,32 +178,25 @@ fn shutdown_interface(mut cx: FunctionContext) -> JsResult<JsPromise> {
178178
179179const CHUNK_DELIM : & str = "\n " ;
180180
181- async fn handle_sql_query (
182- services : Arc < NodeCubeServices > ,
181+ async fn create_session (
182+ services : & NodeCubeServices ,
183183 native_auth_ctx : Arc < NativeAuthContext > ,
184- channel : Arc < Channel > ,
185- stream_methods : WritableStreamMethods ,
186- sql_query : & str ,
187- ) -> Result < ( ) , CubeError > {
184+ ) -> Result < Arc < Session > , CubeError > {
188185 let config = services
189186 . injector ( )
190187 . get_service_typed :: < dyn ConfigObj > ( )
191188 . await ;
192189
193- let transport_service = services
194- . injector ( )
195- . get_service_typed :: < dyn TransportService > ( )
196- . await ;
197190 let session_manager = services
198191 . injector ( )
199192 . get_service_typed :: < SessionManager > ( )
200193 . await ;
201194
202195 let ( host, port) = match SocketAddr :: from_str (
203- & config
196+ config
204197 . postgres_bind_address ( )
205- . clone ( )
206- . unwrap_or ( "127.0.0.1:15432" . into ( ) ) ,
198+ . as_deref ( )
199+ . unwrap_or ( "127.0.0.1:15432" ) ,
207200 ) {
208201 Ok ( addr) => ( addr. ip ( ) . to_string ( ) , addr. port ( ) ) ,
209202 Err ( e) => {
@@ -222,6 +215,27 @@ async fn handle_sql_query(
222215 . state
223216 . set_auth_context ( Some ( native_auth_ctx. clone ( ) ) ) ;
224217
218+ Ok ( session)
219+ }
220+
221+ async fn handle_sql_query (
222+ services : Arc < NodeCubeServices > ,
223+ native_auth_ctx : Arc < NativeAuthContext > ,
224+ channel : Arc < Channel > ,
225+ stream_methods : WritableStreamMethods ,
226+ sql_query : & str ,
227+ ) -> Result < ( ) , CubeError > {
228+ let session = create_session ( & services, native_auth_ctx. clone ( ) ) . await ?;
229+
230+ let transport_service = services
231+ . injector ( )
232+ . get_service_typed :: < dyn TransportService > ( )
233+ . await ;
234+ let session_manager = services
235+ . injector ( )
236+ . get_service_typed :: < SessionManager > ( )
237+ . await ;
238+
225239 let connection_id = session. state . connection_id ;
226240
227241 let execute = || async move {
0 commit comments