Skip to content

Commit 4bebd28

Browse files
committed
chore(cubestore): Upgrade DF: Small code dedup of router_context and worker_context
1 parent 1825235 commit 4bebd28

File tree

1 file changed

+30
-32
lines changed

1 file changed

+30
-32
lines changed

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -410,21 +410,45 @@ impl QueryExecutorImpl {
410410
&self,
411411
cluster: Arc<dyn Cluster>,
412412
serialized_plan: Arc<PreSerializedPlan>,
413+
) -> Result<Arc<SessionContext>, CubeError> {
414+
self.make_context(
415+
CubeQueryPlanner::new_on_router(cluster, serialized_plan, self.memory_handler.clone()),
416+
None,
417+
)
418+
}
419+
420+
fn worker_context(
421+
&self,
422+
serialized_plan: Arc<PreSerializedPlan>,
423+
worker_planning_params: WorkerPlanningParams,
424+
data_loaded_size: Option<Arc<DataLoadedSize>>,
425+
) -> Result<Arc<SessionContext>, CubeError> {
426+
self.make_context(
427+
CubeQueryPlanner::new_on_worker(
428+
serialized_plan,
429+
worker_planning_params,
430+
self.memory_handler.clone(),
431+
data_loaded_size.clone(),
432+
),
433+
data_loaded_size,
434+
)
435+
}
436+
437+
fn make_context(
438+
&self,
439+
query_planner: CubeQueryPlanner,
440+
data_loaded_size: Option<Arc<DataLoadedSize>>, // None on router
413441
) -> Result<Arc<SessionContext>, CubeError> {
414442
let runtime = Arc::new(RuntimeEnv::default());
415443
let config = self.session_config();
416444
let session_state = SessionStateBuilder::new()
417445
.with_config(config)
418446
.with_runtime_env(runtime)
419447
.with_default_features()
420-
.with_query_planner(Arc::new(CubeQueryPlanner::new_on_router(
421-
cluster,
422-
serialized_plan,
423-
self.memory_handler.clone(),
424-
)))
425-
.with_physical_optimizer_rules(self.optimizer_rules(None))
448+
.with_query_planner(Arc::new(query_planner))
426449
.with_aggregate_functions(registerable_arc_aggregate_udfs())
427450
.with_scalar_functions(registerable_arc_scalar_udfs())
451+
.with_physical_optimizer_rules(self.optimizer_rules(data_loaded_size))
428452
.build();
429453
let ctx = SessionContext::new_with_state(session_state);
430454
Ok(Arc::new(ctx))
@@ -459,32 +483,6 @@ impl QueryExecutorImpl {
459483
]
460484
}
461485

462-
fn worker_context(
463-
&self,
464-
serialized_plan: Arc<PreSerializedPlan>,
465-
worker_planning_params: WorkerPlanningParams,
466-
data_loaded_size: Option<Arc<DataLoadedSize>>,
467-
) -> Result<Arc<SessionContext>, CubeError> {
468-
let runtime = Arc::new(RuntimeEnv::default());
469-
let config = self.session_config();
470-
let session_state = SessionStateBuilder::new()
471-
.with_config(config)
472-
.with_runtime_env(runtime)
473-
.with_default_features()
474-
.with_query_planner(Arc::new(CubeQueryPlanner::new_on_worker(
475-
serialized_plan,
476-
worker_planning_params,
477-
self.memory_handler.clone(),
478-
data_loaded_size.clone(),
479-
)))
480-
.with_aggregate_functions(registerable_arc_aggregate_udfs())
481-
.with_scalar_functions(registerable_arc_scalar_udfs())
482-
.with_physical_optimizer_rules(self.optimizer_rules(data_loaded_size))
483-
.build();
484-
let ctx = SessionContext::new_with_state(session_state);
485-
Ok(Arc::new(ctx))
486-
}
487-
488486
fn session_config(&self) -> SessionConfig {
489487
let mut config = self
490488
.metadata_cache_factory

0 commit comments

Comments
 (0)