Skip to content

Commit 2889215

Browse files
committed
Sortedness using approximate sort order
1 parent 6db8e72 commit 2889215

File tree

9 files changed

+179
-55
lines changed

9 files changed

+179
-55
lines changed

datafusion/src/physical_plan/coalesce_partitions.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use async_trait::async_trait;
2929
use arrow::record_batch::RecordBatch;
3030
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};
3131

32+
use super::merge::MergeExec;
3233
use super::RecordBatchStream;
3334
use crate::error::{DataFusionError, Result};
3435
use crate::physical_plan::{
@@ -38,7 +39,6 @@ use crate::physical_plan::{
3839
use super::SendableRecordBatchStream;
3940
use crate::physical_plan::common::spawn_execution;
4041
use pin_project_lite::pin_project;
41-
use std::option::Option::None;
4242

4343
/// Merge execution plan executes partitions in parallel and combines them into a single
4444
/// partition. No guarantees are made about the order of the resulting partition.
@@ -132,15 +132,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
132132
}
133133

134134
fn output_hints(&self) -> OptimizerHints {
135-
let input_hints = self.input.output_hints();
136-
let sort_order;
137-
if self.input.output_partitioning().partition_count() <= 1 {
138-
sort_order = input_hints.sort_order
139-
} else {
140-
sort_order = None
141-
}
142-
// TODO: This could do approximate sort order, no?
143-
OptimizerHints::new_sorted(sort_order, input_hints.single_value_columns)
135+
MergeExec::output_hints_from_input_hints(self.input.as_ref())
144136
}
145137

146138
fn fmt_as(

datafusion/src/physical_plan/filter.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,13 @@ impl ExecutionPlan for FilterExec {
128128
single_value_columns.sort_unstable();
129129
single_value_columns.dedup();
130130

131-
OptimizerHints::new_sorted(inputs_hints.sort_order, single_value_columns)
131+
OptimizerHints {
132+
sort_order: inputs_hints.sort_order,
133+
approximate_sort_order: inputs_hints.approximate_sort_order,
134+
approximate_sort_order_is_strict: inputs_hints.approximate_sort_order_is_strict,
135+
approximate_sort_order_is_prefix: inputs_hints.approximate_sort_order_is_prefix,
136+
single_value_columns,
137+
}
132138
}
133139

134140
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,9 @@ impl ExecutionPlan for HashAggregateExec {
325325
AggregateStrategy::Hash => None,
326326
AggregateStrategy::InplaceSorted => self.output_sort_order.clone(),
327327
};
328+
// TODO: This could pass up self.approximate_sort_order (after saving it like
329+
// self.output_sort_order). (It is possible for self.output_sort_order to be None when
330+
// there is an approximate sort order.)
328331
OptimizerHints::new_sorted(
329332
sort_order,
330333
Vec::new(),

datafusion/src/physical_plan/merge.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,28 @@ impl MergeExec {
6262
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
6363
&self.input
6464
}
65+
66+
pub fn output_hints_from_input_hints(input: &dyn ExecutionPlan) -> OptimizerHints {
67+
let input_hints = input.output_hints();
68+
let sort_order;
69+
let approximate_sort_order_is_strict: bool;
70+
if input.output_partitioning().partition_count() <= 1 {
71+
sort_order = input_hints.sort_order;
72+
approximate_sort_order_is_strict = input_hints.approximate_sort_order_is_strict;
73+
} else {
74+
sort_order = None;
75+
approximate_sort_order_is_strict = false;
76+
}
77+
let approximate_sort_order = input_hints.approximate_sort_order;
78+
79+
OptimizerHints {
80+
sort_order,
81+
approximate_sort_order,
82+
approximate_sort_order_is_prefix: input_hints.approximate_sort_order_is_prefix & approximate_sort_order_is_strict,
83+
approximate_sort_order_is_strict,
84+
single_value_columns: input_hints.single_value_columns,
85+
}
86+
}
6587
}
6688

6789
#[async_trait]
@@ -157,17 +179,7 @@ impl ExecutionPlan for MergeExec {
157179
}
158180

159181
fn output_hints(&self) -> OptimizerHints {
160-
let input_hints = self.input.output_hints();
161-
let sort_order;
162-
if self.input.output_partitioning().partition_count() <= 1 {
163-
sort_order = input_hints.sort_order
164-
} else {
165-
sort_order = None
166-
}
167-
OptimizerHints::new_sorted(
168-
sort_order,
169-
input_hints.single_value_columns,
170-
)
182+
MergeExec::output_hints_from_input_hints(self.input.as_ref())
171183
}
172184
}
173185

datafusion/src/physical_plan/merge_sort.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -616,9 +616,11 @@ impl ExecutionPlan for LastRowByUniqueKeyExec {
616616
}
617617

618618
fn output_hints(&self) -> OptimizerHints {
619+
// Possibly, this is abandoning approximate sort order information.
620+
let input_hints = self.input.output_hints();
619621
OptimizerHints::new_sorted(
620-
self.input.output_hints().sort_order,
621-
self.input.output_hints().single_value_columns,
622+
input_hints.sort_order,
623+
input_hints.single_value_columns,
622624
)
623625
}
624626

datafusion/src/physical_plan/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,9 @@ pub struct OptimizerHints {
157157
}
158158

159159
impl OptimizerHints {
160-
/// Use with None for sort_order is arguably deprecated. Used to adapt code that preceded
161-
/// approximate_sort_order information.
162-
fn new_sorted(sort_order: Option<Vec<usize>>, single_value_columns: Vec<usize>) -> OptimizerHints {
160+
/// Mostly used to adapt code that preceded the creation of approximate_sort_order fields --
161+
/// callers may be throwing away information about approximate sort order.
162+
pub fn new_sorted(sort_order: Option<Vec<usize>>, single_value_columns: Vec<usize>) -> OptimizerHints {
163163
let mut approximate_sort_order = Vec::new();
164164
let mut approximate_sort_order_is_strict = false;
165165
let mut approximate_sort_order_is_prefix = false;
@@ -177,6 +177,7 @@ impl OptimizerHints {
177177
};
178178
hints
179179
}
180+
180181
}
181182

182183
/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.

datafusion/src/physical_plan/planner.rs

Lines changed: 88 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -517,12 +517,7 @@ impl DefaultPhysicalPlanner {
517517
match input_sortedness.sawtooth_levels() {
518518
Some(0) => {
519519
log::error!("DefaultPhysicalExpr: Perfect match for inplace aggregation");
520-
let order = input_sortedness.sort_order[0]
521-
.iter()
522-
.map(|(_sort_key_offset, group_key_offset)| {
523-
*group_key_offset
524-
})
525-
.collect_vec();
520+
let order = input_sortedness.sort_order[0].clone(); // TODO: No clone?
526521
(AggregateStrategy::InplaceSorted, AggregateStrategy::InplaceSorted, Some(order))
527522
}
528523
Some(n) => {
@@ -1695,13 +1690,12 @@ pub fn evaluate_const(expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExp
16951690
/// Return value of input_sortedness_by_group_key. If succeeded, every group key offset appears in
16961691
/// sort_order or unsorted exactly once.
16971692
pub struct SortednessByGroupKey {
1698-
/// Elems are (offset into the sort key, offset into the group key), with sort key offsets
1699-
/// strictly increasing. Each Vec<(usize, usize)> is a clump of adjacent columns, with
1693+
/// Elems are offsets into the group key. Each Vec<usize> is a clump of adjacent columns, with
17001694
/// adjacency considered after ignoring single value columns.
17011695
///
1702-
/// Each column clump sees the input ordering in sawtoothing runs of rows, sawtoothing with different
1703-
/// granularity.
1704-
pub sort_order: Vec<Vec<(usize, usize)>>,
1696+
/// Each column clump sees the input ordering in sawtoothing runs of rows, sawtoothing with
1697+
/// different granularity.
1698+
pub sort_order: Vec<Vec<usize>>,
17051699
/// Indexes into the group key.
17061700
pub unsorted: Vec<usize>,
17071701
/// true if the first clump of sort_order is detached from the prefix of the sort key (ignoring
@@ -1741,10 +1735,7 @@ impl SortednessByGroupKey {
17411735
/// existing compute_aggregate_strategy function.
17421736
pub fn compute_aggregate_strategy(&self) -> (AggregateStrategy, Option<Vec<usize>>) {
17431737
if self.is_sorted_by_group_key() {
1744-
let order = self.sort_order[0]
1745-
.iter()
1746-
.map(|&(_sort_i, group_i)| group_i)
1747-
.collect_vec();
1738+
let order = self.sort_order[0].clone();
17481739
(AggregateStrategy::InplaceSorted, Some(order))
17491740
} else {
17501741
(AggregateStrategy::Hash, None)
@@ -1804,14 +1795,14 @@ pub fn input_sortedness_by_group_key(
18041795
};
18051796
}
18061797

1807-
let mut clumps = Vec::<Vec<(usize, usize)>>::new();
1798+
let mut clumps = Vec::<Vec<usize>>::new();
18081799
// At this point we walk through the sort_key_hit vec.
1809-
let mut clump = Vec::<(usize, usize)>::new();
1800+
let mut clump = Vec::<usize>::new();
18101801
// Are our clumps detached from the sort prefix?
18111802
let mut detached_from_prefix = false;
18121803
for (i, &hit) in sort_key_hit.iter().enumerate() {
18131804
if hit {
1814-
clump.push((i, sort_to_group[i]));
1805+
clump.push(sort_to_group[i]);
18151806
} else if hints.single_value_columns.contains(&sort_key[i]) {
18161807
// Don't end the clump.
18171808
} else {
@@ -1835,6 +1826,85 @@ pub fn input_sortedness_by_group_key(
18351826
}
18361827
}
18371828

1829+
pub fn input_sortedness_by_group_key_using_approximate(
1830+
input: &dyn ExecutionPlan,
1831+
group_key: &[(Arc<dyn PhysicalExpr>, String)],
1832+
) -> SortednessByGroupKey {
1833+
if group_key.is_empty() {
1834+
// The caller has to deal with it (and in fact it wants to).
1835+
return SortednessByGroupKey::failed();
1836+
}
1837+
1838+
let hints = input.output_hints();
1839+
let input_schema = input.schema();
1840+
let mut input_to_group = vec![None; input_schema.fields().len()];
1841+
1842+
for (group_i, (g, _)) in group_key.iter().enumerate() {
1843+
let col = g.as_any().downcast_ref::<Column>();
1844+
if col.is_none() {
1845+
return SortednessByGroupKey::failed();
1846+
}
1847+
let input_col = input_schema.index_of(col.unwrap().name());
1848+
if input_col.is_err() {
1849+
return SortednessByGroupKey::failed();
1850+
}
1851+
let input_col = input_col.unwrap();
1852+
// If we have two group by exprs for the same input column, we might not optimize well in that case.
1853+
input_to_group[input_col] = Some(group_i);
1854+
}
1855+
1856+
let mut group_key_used = vec![false; group_key.len()];
1857+
let mut prefix_maintained = None::<bool>;
1858+
let mut approximate_sort_order = Vec::new();
1859+
for in_segment in hints.approximate_sort_order {
1860+
let mut out_segment = Vec::new();
1861+
for in_col in in_segment {
1862+
if let Some(group_i) = input_to_group[in_col] {
1863+
if prefix_maintained.is_none() {
1864+
prefix_maintained = Some(true);
1865+
}
1866+
out_segment.push(group_i);
1867+
group_key_used[group_i] = true;
1868+
} else if hints.single_value_columns.contains(&in_col) {
1869+
continue;
1870+
} else {
1871+
if !out_segment.is_empty() {
1872+
approximate_sort_order.push(out_segment);
1873+
out_segment = Vec::new();
1874+
}
1875+
if prefix_maintained.is_none() {
1876+
prefix_maintained = Some(false);
1877+
}
1878+
}
1879+
1880+
break;
1881+
}
1882+
if prefix_maintained.is_none() {
1883+
prefix_maintained = Some(false);
1884+
}
1885+
if !out_segment.is_empty() {
1886+
approximate_sort_order.push(out_segment);
1887+
out_segment = Vec::new();
1888+
}
1889+
}
1890+
1891+
let approximate_sort_order_is_strict = hints.approximate_sort_order_is_strict;
1892+
let approximate_sort_order_is_prefix = hints.approximate_sort_order_is_prefix && prefix_maintained == Some(true);
1893+
let mut unsorted = Vec::<usize>::new();
1894+
for (group_i, key_used) in group_key_used.into_iter().enumerate() {
1895+
if !key_used {
1896+
unsorted.push(group_i);
1897+
}
1898+
}
1899+
1900+
SortednessByGroupKey {
1901+
sort_order: approximate_sort_order,
1902+
unsorted,
1903+
detached_from_prefix: approximate_sort_order_is_prefix,
1904+
succeeded: true,
1905+
}
1906+
}
1907+
18381908
fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
18391909
match value {
18401910
(Ok(e), Ok(e1)) => Ok((e, e1)),

datafusion/src/physical_plan/projection.rs

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl ExecutionPlan for ProjectionExec {
149149
} else {
150150
continue;
151151
}
152+
// If we project input to two output columns, we just end up picking one (and have incomplete analysis).
152153
input_to_output[column.index()] = Some(out_i);
153154
}
154155

@@ -170,14 +171,54 @@ impl ExecutionPlan for ProjectionExec {
170171
}
171172
};
172173

173-
OptimizerHints::new_sorted(
174-
if sort_order.is_empty() {
175-
None
176-
} else {
177-
Some(sort_order)
178-
},
174+
// Becomes Some(true) if the first column of the first segment is mapped.
175+
let mut prefix_maintained = None::<bool>;
176+
let mut approximate_sort_order = Vec::new();
177+
for in_segment in input_hints.approximate_sort_order {
178+
let mut out_segment = Vec::new();
179+
for in_col in in_segment {
180+
if let Some(out_col) = input_to_output[in_col] {
181+
if prefix_maintained.is_none() {
182+
prefix_maintained = Some(true);
183+
}
184+
out_segment.push(out_col);
185+
} else if input_hints.single_value_columns.contains(&in_col) {
186+
continue;
187+
} else {
188+
// Some column is missing. Note that handling this case right here --
189+
// projections missing columns, and splitting up the sort order into multiple
190+
// segments -- is the main purpose of approximate_sort_order.
191+
if !out_segment.is_empty() {
192+
approximate_sort_order.push(out_segment);
193+
out_segment = Vec::new();
194+
}
195+
if prefix_maintained.is_none() {
196+
prefix_maintained = Some(false);
197+
}
198+
199+
break;
200+
}
201+
}
202+
if prefix_maintained.is_none() {
203+
// The whole first segment was single-value columns and it's gone now.
204+
prefix_maintained = Some(false);
205+
}
206+
207+
if !out_segment.is_empty() {
208+
approximate_sort_order.push(out_segment);
209+
out_segment = Vec::new();
210+
}
211+
}
212+
let approximate_sort_order_is_strict = input_hints.approximate_sort_order_is_strict;
213+
let approximate_sort_order_is_prefix = input_hints.approximate_sort_order_is_prefix && prefix_maintained == Some(true);
214+
215+
OptimizerHints {
216+
sort_order: if sort_order.is_empty() { None } else { Some(sort_order) },
217+
approximate_sort_order,
218+
approximate_sort_order_is_prefix,
219+
approximate_sort_order_is_strict,
179220
single_value_columns,
180-
)
221+
}
181222
}
182223

183224
fn fmt_as(

datafusion/src/physical_plan/sort.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,9 @@ impl ExecutionPlan for SortExec {
190190

191191
fn output_hints(&self) -> OptimizerHints {
192192
let mut order = Vec::with_capacity(self.expr.len());
193-
// let mut sort_order_truncated = false;
194193
for s in &self.expr {
195194
let column = s.expr.as_any().downcast_ref::<Column>();
196195
if column.is_none() {
197-
// sort_order_truncated = true;
198196
break;
199197
}
200198
let column = column.unwrap();
@@ -207,7 +205,6 @@ impl ExecutionPlan for SortExec {
207205
}
208206

209207
let input_hints = self.input.output_hints();
210-
// TODO: If sort_order_truncated is false, we can combine input_hints.sort_order. Do this.
211208

212209
OptimizerHints::new_sorted(
213210
Some(order),

0 commit comments

Comments
 (0)