Skip to content

Commit f8e9b5a

Browse files
committed
chore(cubestore): Upgrade DF 46: Small code dedup of router_context and worker_context
1 parent 3e01da0 commit f8e9b5a

File tree

1 file changed

+34
-32
lines changed

1 file changed

+34
-32
lines changed

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -401,21 +401,49 @@ impl QueryExecutorImpl {
401401
&self,
402402
cluster: Arc<dyn Cluster>,
403403
serialized_plan: Arc<PreSerializedPlan>,
404+
) -> Result<Arc<SessionContext>, CubeError> {
405+
self.make_context(
406+
CubeQueryPlanner::new_on_router(
407+
cluster,
408+
serialized_plan,
409+
self.memory_handler.clone(),
410+
),
411+
None
412+
)
413+
}
414+
415+
fn worker_context(
416+
&self,
417+
serialized_plan: Arc<PreSerializedPlan>,
418+
worker_planning_params: WorkerPlanningParams,
419+
data_loaded_size: Option<Arc<DataLoadedSize>>,
420+
) -> Result<Arc<SessionContext>, CubeError> {
421+
self.make_context(
422+
CubeQueryPlanner::new_on_worker(
423+
serialized_plan,
424+
worker_planning_params,
425+
self.memory_handler.clone(),
426+
data_loaded_size.clone(),
427+
),
428+
data_loaded_size,
429+
)
430+
}
431+
432+
fn make_context(
433+
&self,
434+
query_planner: CubeQueryPlanner,
435+
data_loaded_size: Option<Arc<DataLoadedSize>>, // None on router
404436
) -> Result<Arc<SessionContext>, CubeError> {
405437
let runtime = Arc::new(RuntimeEnv::default());
406438
let config = self.session_config();
407439
let session_state = SessionStateBuilder::new()
408440
.with_config(config)
409441
.with_runtime_env(runtime)
410442
.with_default_features()
411-
.with_query_planner(Arc::new(CubeQueryPlanner::new_on_router(
412-
cluster,
413-
serialized_plan,
414-
self.memory_handler.clone(),
415-
)))
416-
.with_physical_optimizer_rules(self.optimizer_rules(None))
443+
.with_query_planner(Arc::new(query_planner))
417444
.with_aggregate_functions(registerable_arc_aggregate_udfs())
418445
.with_scalar_functions(registerable_arc_scalar_udfs())
446+
.with_physical_optimizer_rules(self.optimizer_rules(data_loaded_size))
419447
.build();
420448
let ctx = SessionContext::new_with_state(session_state);
421449
Ok(Arc::new(ctx))
@@ -450,32 +478,6 @@ impl QueryExecutorImpl {
450478
]
451479
}
452480

453-
fn worker_context(
454-
&self,
455-
serialized_plan: Arc<PreSerializedPlan>,
456-
worker_planning_params: WorkerPlanningParams,
457-
data_loaded_size: Option<Arc<DataLoadedSize>>,
458-
) -> Result<Arc<SessionContext>, CubeError> {
459-
let runtime = Arc::new(RuntimeEnv::default());
460-
let config = self.session_config();
461-
let session_state = SessionStateBuilder::new()
462-
.with_config(config)
463-
.with_runtime_env(runtime)
464-
.with_default_features()
465-
.with_query_planner(Arc::new(CubeQueryPlanner::new_on_worker(
466-
serialized_plan,
467-
worker_planning_params,
468-
self.memory_handler.clone(),
469-
data_loaded_size.clone(),
470-
)))
471-
.with_aggregate_functions(registerable_arc_aggregate_udfs())
472-
.with_scalar_functions(registerable_arc_scalar_udfs())
473-
.with_physical_optimizer_rules(self.optimizer_rules(data_loaded_size))
474-
.build();
475-
let ctx = SessionContext::new_with_state(session_state);
476-
Ok(Arc::new(ctx))
477-
}
478-
479481
fn session_config(&self) -> SessionConfig {
480482
let mut config = self.metadata_cache_factory.make_session_config()
481483
.with_batch_size(4096)

0 commit comments

Comments
 (0)