Skip to content

Commit 845e289

Browse files
committed
Add customizer to context
1 parent 5af6cf5 commit 845e289

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

src/planning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ pub fn add_distributed_analyze(
439439
/// final stage only as that's all we care about from the call site
440440
pub async fn distribute_stages(
441441
query_id: &str,
442-
stages: Vec<DDStage>,
442+
stages: &[DDStage],
443443
worker_addrs: Vec<Host>,
444444
codec: &dyn PhysicalExtensionCodec,
445445
) -> Result<(Addrs, Vec<DDTask>)> {

src/query_planner.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,13 @@ impl QueryPlanner {
119119
/// distributed plan, and distributed stages, but it does not yet contain
120120
/// worker addresses or tasks, as they are filled in later by `distribute_plan()`.
121121
pub async fn prepare_substrait(&self, substrait_plan: Plan) -> Result<QueryPlan> {
122-
let ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?;
122+
let mut ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?;
123+
if let Some(customizer) = &self.customizer {
124+
customizer
125+
.customize(&mut ctx)
126+
.await
127+
.map_err(|e| anyhow!("Customization failed: {e:#?}"))?;
128+
}
123129

124130
let logical_plan = from_substrait_plan(&ctx.state(), &substrait_plan).await?;
125131

@@ -257,7 +263,7 @@ impl QueryPlanner {
257263
})
258264
}
259265

260-
/// Performs worker discovery, and distributes the query plan to workers.
266+
/// Performs worker discovery, and distributes the query plan to workers,
261267
/// also sets the final worker addresses and distributed tasks in the query plan.
262268
pub async fn distribute_plan(&self, initial_plan: &mut QueryPlan) -> Result<()> {
263269
// Perform worker discovery
@@ -267,7 +273,7 @@ impl QueryPlanner {
267273
// into chunks of partitions (partition_groups)
268274
let (final_workers, tasks) = distribute_stages(
269275
&initial_plan.query_id,
270-
initial_plan.distributed_stages.clone(),
276+
&initial_plan.distributed_stages,
271277
worker_addrs,
272278
self.codec.as_ref(),
273279
)

0 commit comments

Comments
 (0)