Skip to content

Commit 6db8e72

Browse files
committed
OptimizerHints having more fields
1 parent b20baa4 commit 6db8e72

File tree

9 files changed

+90
-46
lines changed

9 files changed

+90
-46
lines changed

datafusion/src/physical_plan/coalesce_partitions.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
139139
} else {
140140
sort_order = None
141141
}
142-
OptimizerHints {
143-
sort_order,
144-
single_value_columns: input_hints.single_value_columns,
145-
}
142+
// TODO: This could do approximate sort order, no?
143+
OptimizerHints::new_sorted(sort_order, input_hints.single_value_columns)
146144
}
147145

148146
fn fmt_as(

datafusion/src/physical_plan/filter.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,7 @@ impl ExecutionPlan for FilterExec {
128128
single_value_columns.sort_unstable();
129129
single_value_columns.dedup();
130130

131-
OptimizerHints {
132-
sort_order: inputs_hints.sort_order,
133-
single_value_columns,
134-
}
131+
OptimizerHints::new_sorted(inputs_hints.sort_order, single_value_columns)
135132
}
136133

137134
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,15 @@ impl HashAggregateExec {
188188
match strategy {
189189
AggregateStrategy::Hash => assert!(output_sort_order.is_none()),
190190
AggregateStrategy::InplaceSorted => {
191-
assert!(output_sort_order.is_some());
192-
assert!(
193-
output_sort_order
194-
.as_ref()
195-
.unwrap()
196-
.iter()
197-
.all(|i| *i < group_expr.len()),
198-
"sort_order mentions value columns"
199-
);
191+
// TODO: If we make output hints contain partial sort orders, this will need to assert it's .is_some() and such.
192+
if let Some(output_sort_order) = &output_sort_order {
193+
assert!(
194+
output_sort_order
195+
.iter()
196+
.all(|i| *i < group_expr.len()),
197+
"sort_order mentions value columns"
198+
);
199+
}
200200
}
201201
}
202202

@@ -325,10 +325,10 @@ impl ExecutionPlan for HashAggregateExec {
325325
AggregateStrategy::Hash => None,
326326
AggregateStrategy::InplaceSorted => self.output_sort_order.clone(),
327327
};
328-
OptimizerHints {
328+
OptimizerHints::new_sorted(
329329
sort_order,
330-
single_value_columns: Vec::new(),
331-
}
330+
Vec::new(),
331+
)
332332
}
333333

334334
fn metrics(&self) -> HashMap<String, SQLMetric> {

datafusion/src/physical_plan/merge.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,10 @@ impl ExecutionPlan for MergeExec {
164164
} else {
165165
sort_order = None
166166
}
167-
OptimizerHints {
167+
OptimizerHints::new_sorted(
168168
sort_order,
169-
single_value_columns: input_hints.single_value_columns,
170-
}
169+
input_hints.single_value_columns,
170+
)
171171
}
172172
}
173173

datafusion/src/physical_plan/merge_sort.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ impl ExecutionPlan for MergeSortExec {
101101
}
102102

103103
fn output_hints(&self) -> OptimizerHints {
104-
OptimizerHints {
105-
single_value_columns: self.input.output_hints().single_value_columns,
106-
sort_order: Some(self.columns.iter().map(|c| c.index()).collect()),
107-
}
104+
OptimizerHints::new_sorted(
105+
Some(self.columns.iter().map(|c| c.index()).collect()),
106+
self.input.output_hints().single_value_columns,
107+
)
108108
}
109109

110110
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
@@ -115,6 +115,8 @@ impl ExecutionPlan for MergeSortExec {
115115
)));
116116
}
117117

118+
log::error!("MergeSortExec executing across {} partitions", self.input.output_partitioning().partition_count());
119+
118120
let inputs = join_all(
119121
(0..self.input.output_partitioning().partition_count())
120122
.map(|i| self.input.execute(i))
@@ -614,10 +616,10 @@ impl ExecutionPlan for LastRowByUniqueKeyExec {
614616
}
615617

616618
fn output_hints(&self) -> OptimizerHints {
617-
OptimizerHints {
618-
single_value_columns: self.input.output_hints().single_value_columns,
619-
sort_order: self.input.output_hints().sort_order,
620-
}
619+
OptimizerHints::new_sorted(
620+
self.input.output_hints().sort_order,
621+
self.input.output_hints().single_value_columns,
622+
)
621623
}
622624

623625
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {

datafusion/src/physical_plan/mod.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,56 @@ use smallvec::SmallVec;
129129
pub struct OptimizerHints {
130130
/// If the output is sorted, contains indices of the sort key columns in the output schema.
131131
/// Each partition should meet this sort order, but order between partitions is unspecified.
132-
/// Note that this does **not** guarantee the exact ordering inside each of the columns, e.g.
132+
/// Note that this does **not** specify the exact ordering inside each of the columns, e.g.
133133
/// the values may end up in ascending or descending order, nulls can go first or last.
134134
pub sort_order: Option<Vec<usize>>,
135+
136+
// Describes the sawtoothing runs of the stream that is partially sorted. If sort_order is
137+
// present, the first element of this should be sort_order.unwrap(). If we take a sorted stream
138+
// and add a projection that removes a column in the middle of sort_order, and it isn't a single
139+
// value column, approximate_sort_order.len() would be 2, and it would be the input's sort order
140+
// split on the missing column.
141+
//
142+
// However, this is free to have jumps outside of the sort order. We might have a MergeNode
143+
// which retains the approximate_sort_order optimizer hint despite merging stuff out of order.
144+
// The approximate sort order is more "statistical" in nature.
145+
pub approximate_sort_order: Vec<Vec<usize>>,
146+
/// True if the sort order has no jumps other than those permitted by approximate_sort_order.
147+
/// This means that the ordering represents a truly sorted order with some columns missing.
148+
pub approximate_sort_order_is_strict: bool,
149+
/// True there are no missing columns in front of the approximate sort order. If and only if
150+
/// this and approximate_sort_order_is_strict are true, that implies sort_order should equal
151+
/// Some(approximate_sort_order[0]).
152+
pub approximate_sort_order_is_prefix: bool,
153+
135154
/// Indices of columns that will always have the same value in each row. No information about
136155
/// the value is provided.
137156
pub single_value_columns: Vec<usize>,
138157
}
139158

159+
impl OptimizerHints {
160+
/// Use with None for sort_order is arguably deprecated. Used to adapt code that preceded
161+
/// approximate_sort_order information.
162+
fn new_sorted(sort_order: Option<Vec<usize>>, single_value_columns: Vec<usize>) -> OptimizerHints {
163+
let mut approximate_sort_order = Vec::new();
164+
let mut approximate_sort_order_is_strict = false;
165+
let mut approximate_sort_order_is_prefix = false;
166+
if let Some(order) = &sort_order {
167+
approximate_sort_order.push(order.clone());
168+
approximate_sort_order_is_strict = true;
169+
approximate_sort_order_is_prefix = true;
170+
}
171+
let hints = OptimizerHints {
172+
sort_order,
173+
approximate_sort_order,
174+
approximate_sort_order_is_prefix,
175+
approximate_sort_order_is_strict,
176+
single_value_columns,
177+
};
178+
hints
179+
}
180+
}
181+
140182
/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
141183
///
142184
/// Each `ExecutionPlan` is Partition-aware and is responsible for

datafusion/src/physical_plan/planner.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ impl DefaultPhysicalPlanner {
513513
//single-value columns
514514
let input_sortedness =
515515
input_sortedness_by_group_key(input_exec.as_ref(), &groups);
516-
let (strategy, order): (AggregateStrategy, Option<Vec<usize>>) =
516+
let (strategy, partial_strategy, order): (AggregateStrategy, AggregateStrategy, Option<Vec<usize>>) =
517517
match input_sortedness.sawtooth_levels() {
518518
Some(0) => {
519519
log::error!("DefaultPhysicalExpr: Perfect match for inplace aggregation");
@@ -523,15 +523,17 @@ impl DefaultPhysicalPlanner {
523523
*group_key_offset
524524
})
525525
.collect_vec();
526-
(AggregateStrategy::InplaceSorted, Some(order))
526+
(AggregateStrategy::InplaceSorted, AggregateStrategy::InplaceSorted, Some(order))
527527
}
528528
Some(n) => {
529529
log::error!("DefaultPhysicalExpr: Non-perfect match for inplace aggregation: {} clumps", n);
530-
(AggregateStrategy::Hash, None)
530+
// TODO: Note that this is very oversimplified
531+
(AggregateStrategy::Hash, AggregateStrategy::InplaceSorted, None)
532+
// (AggregateStrategy::Hash, AggregateStrategy::Hash, None)
531533
},
532534
_ => {
533535
log::error!("DefaultPhysicalExpr: No match for inplace aggregation");
534-
(AggregateStrategy::Hash, None)
536+
(AggregateStrategy::Hash, AggregateStrategy::Hash, None)
535537
},
536538
};
537539

@@ -551,7 +553,7 @@ impl DefaultPhysicalPlanner {
551553

552554
let mut initial_aggr: Arc<dyn ExecutionPlan> =
553555
Arc::new(HashAggregateExec::try_new(
554-
strategy,
556+
partial_strategy,
555557
order.clone(),
556558
AggregateMode::Partial,
557559
groups.clone(),
@@ -590,7 +592,8 @@ impl DefaultPhysicalPlanner {
590592
&& ctx_state.config.concurrency > 1
591593
&& ctx_state.config.repartition_aggregations
592594
&& !contains_dict
593-
&& strategy == AggregateStrategy::Hash;
595+
&& strategy == AggregateStrategy::Hash
596+
&& partial_strategy == AggregateStrategy::Hash;
594597

595598
let (initial_aggr, next_partition_mode): (
596599
Arc<dyn ExecutionPlan>,
@@ -1764,6 +1767,8 @@ pub fn input_sortedness_by_group_key(
17641767
}
17651768

17661769
let hints = input.output_hints();
1770+
// log::error!("input_sortedness_by_group_key OptimizerHints is: {:?}", hints);
1771+
// log::error!("input_sortedness_by_group_key input is: {:#?}", input);
17671772
// We check the group key is a prefix of the sort key.
17681773
let sort_key = hints.sort_order;
17691774
if sort_key.is_none() {

datafusion/src/physical_plan/projection.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,14 @@ impl ExecutionPlan for ProjectionExec {
170170
}
171171
};
172172

173-
OptimizerHints {
174-
single_value_columns,
175-
sort_order: if sort_order.is_empty() {
173+
OptimizerHints::new_sorted(
174+
if sort_order.is_empty() {
176175
None
177176
} else {
178177
Some(sort_order)
179178
},
180-
}
179+
single_value_columns,
180+
)
181181
}
182182

183183
fn fmt_as(

datafusion/src/physical_plan/sort.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,10 @@ impl ExecutionPlan for SortExec {
209209
let input_hints = self.input.output_hints();
210210
// TODO: If sort_order_truncated is false, we can combine input_hints.sort_order. Do this.
211211

212-
OptimizerHints {
213-
sort_order: Some(order),
214-
single_value_columns: input_hints.single_value_columns.clone(),
215-
}
212+
OptimizerHints::new_sorted(
213+
Some(order),
214+
input_hints.single_value_columns,
215+
)
216216
}
217217
}
218218

0 commit comments

Comments
 (0)