Skip to content

Commit d21a6b9

Browse files
committed
refactor(backend-native): Extract with_session function
1 parent b946b7b commit d21a6b9

File tree

1 file changed

+28
-16
lines changed

1 file changed

+28
-16
lines changed

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use cubenativeutils::wrappers::serializer::NativeDeserialize;
2626
use cubenativeutils::wrappers::NativeContextHolder;
2727
use cubesqlplanner::cube_bridge::base_query_options::NativeBaseQueryOptions;
2828
use cubesqlplanner::planner::base_query::BaseQuery;
29+
use std::future::Future;
2930
use std::net::SocketAddr;
3031
use std::rc::Rc;
3132
use std::str::FromStr;
@@ -178,6 +179,30 @@ fn shutdown_interface(mut cx: FunctionContext) -> JsResult<JsPromise> {
178179

179180
const CHUNK_DELIM: &str = "\n";
180181

182+
async fn with_session<T, F, Fut>(
183+
services: &NodeCubeServices,
184+
native_auth_ctx: Arc<NativeAuthContext>,
185+
f: F,
186+
) -> Result<T, CubeError>
187+
where
188+
F: FnOnce(Arc<Session>) -> Fut,
189+
Fut: Future<Output = Result<T, CubeError>>,
190+
{
191+
let session_manager = services
192+
.injector()
193+
.get_service_typed::<SessionManager>()
194+
.await;
195+
let session = create_session(services, native_auth_ctx).await?;
196+
let connection_id = session.state.connection_id;
197+
198+
// From now there's a session we should close before returning, as in `finally`
199+
let result = { f(session).await };
200+
201+
session_manager.drop_session(connection_id).await;
202+
203+
result
204+
}
205+
181206
async fn create_session(
182207
services: &NodeCubeServices,
183208
native_auth_ctx: Arc<NativeAuthContext>,
@@ -225,20 +250,12 @@ async fn handle_sql_query(
225250
stream_methods: WritableStreamMethods,
226251
sql_query: &str,
227252
) -> Result<(), CubeError> {
228-
let session = create_session(&services, native_auth_ctx.clone()).await?;
229-
230253
let transport_service = services
231254
.injector()
232255
.get_service_typed::<dyn TransportService>()
233256
.await;
234-
let session_manager = services
235-
.injector()
236-
.get_service_typed::<SessionManager>()
237-
.await;
238-
239-
let connection_id = session.state.connection_id;
240257

241-
let execute = || async move {
258+
with_session(&services, native_auth_ctx.clone(), |session| async move {
242259
// todo: can we use compiler_cache?
243260
let meta_context = transport_service
244261
.meta(native_auth_ctx)
@@ -321,13 +338,8 @@ async fn handle_sql_query(
321338
}
322339

323340
Ok::<(), CubeError>(())
324-
};
325-
326-
let result = execute().await;
327-
328-
session_manager.drop_session(connection_id).await;
329-
330-
result
341+
})
342+
.await
331343
}
332344

333345
struct WritableStreamMethods {

0 commit comments

Comments
 (0)