Skip to content

Commit e4653e0

Browse files
committed
Implemented input_sortedness_by_group_key
1 parent b3acc9f commit e4653e0

File tree

1 file changed

+136
-0
lines changed

1 file changed

+136
-0
lines changed

datafusion/src/physical_plan/planner.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1746,6 +1746,131 @@ fn input_sorted_by_group_key(
17461746
true
17471747
}
17481748

1749+
#[derive(Debug, Clone)]
1750+
/// Return value of input_sortedness_by_group_key. If succeeded, every group key offset appears in
1751+
/// sort_order or unsorted exactly once.
1752+
pub struct SortednessByGroupKey {
1753+
/// Elems are (offset into the sort key, offset into the group key), with sort key offsets
1754+
/// strictly increasing. Each Vec<(usize, usize)> is a clump of adjacent columns, with
1755+
/// adjacency considered after ignoring single value columns.
1756+
///
1757+
/// Each column clump sees the input ordering in sawtoothing runs of rows, sawtoothing with different
1758+
/// granularity.
1759+
pub sort_order: Vec<Vec<(usize, usize)>>,
1760+
/// Indexes into the group key.
1761+
pub unsorted: Vec<usize>,
1762+
/// true if the first clump of sort_order is detached from the prefix of the sort key (ignoring
1763+
/// single value columns). Used by is_sorted_by_group_key().
1764+
pub detached_from_prefix: bool,
1765+
/// If false, back out and use hash aggregation. Might fail early to avoid pointlessly calculating.
1766+
pub succeeded: bool,
1767+
}
1768+
1769+
impl SortednessByGroupKey {
1770+
/// Constructs the succeeded == false case.
1771+
pub fn failed() -> Self {
1772+
Self {
1773+
sort_order: Vec::new(),
1774+
unsorted: Vec::new(),
1775+
detached_from_prefix: false,
1776+
succeeded: false,
1777+
}
1778+
}
1779+
/// Returns true if the input is sorted by group key.
1780+
pub fn is_sorted_by_group_key(&self) -> bool {
1781+
self.sawtooth_levels() == Some(0)
1782+
}
1783+
1784+
/// Returns the number of "sawtooth levels" the group key may experience, with 0 being perfectly
1785+
/// sorted, 1 meaning the group key is missing one clump of index columns, etc. Returns None if
1786+
/// there are any unsorted group keys or the analysis simply failed.
1787+
pub fn sawtooth_levels(&self) -> Option<usize> {
1788+
if self.succeeded && self.unsorted.is_empty() {
1789+
Some((self.sort_order.len() - 1) + (self.detached_from_prefix as usize))
1790+
} else {
1791+
None
1792+
}
1793+
}
1794+
}
1795+
1796+
/// Checks the degree to which input is sortable by a group key. If it succeeds, returns clumps of
1797+
/// effectively adjacent sort key columns. For example, if the input's sort key is (A, B, S, C, D,
1798+
/// E, F, G, H, I, J), and S is a single value column, and the group keys are for Column values C,
1799+
/// E, F, I, B, and K, then this function will return {sort_order: [[(1, B), (3, C)], [(5, E), (6,
1800+
/// F)], [(9, I)]], unsorted: [K], succeeded: true}.
1801+
pub fn input_sortedness_by_group_key(
1802+
input: &dyn ExecutionPlan,
1803+
group_key: &[(Arc<dyn PhysicalExpr>, String)],
1804+
) -> SortednessByGroupKey {
1805+
assert!(!group_key.is_empty());
1806+
1807+
let hints = input.output_hints();
1808+
// We check the group key is a prefix of the sort key.
1809+
let sort_key = hints.sort_order;
1810+
if sort_key.is_none() {
1811+
// I guess we're using hash aggregation.
1812+
return SortednessByGroupKey::failed();
1813+
}
1814+
let sort_key = sort_key.unwrap();
1815+
// Tracks which elements of sort key are used in the group key or have a single value.
1816+
let mut sort_key_hit = vec![false; sort_key.len()];
1817+
let mut sort_to_group = vec![usize::MAX; sort_key.len()];
1818+
let mut unsorted_group_keys = Vec::<usize>::with_capacity(group_key.len());
1819+
for (group_i, (g, _)) in group_key.iter().enumerate() {
1820+
let col = g.as_any().downcast_ref::<Column>();
1821+
if col.is_none() {
1822+
return SortednessByGroupKey::failed();
1823+
}
1824+
let input_col = input.schema().index_of(col.unwrap().name());
1825+
if input_col.is_err() {
1826+
return SortednessByGroupKey::failed();
1827+
}
1828+
let input_col = input_col.unwrap();
1829+
match sort_key.iter().find_position(|i| **i == input_col) {
1830+
None => {
1831+
unsorted_group_keys.push(group_i);
1832+
}
1833+
Some((sort_key_pos, _)) => {
1834+
sort_key_hit[sort_key_pos] = true;
1835+
if sort_to_group[sort_key_pos] != usize::MAX {
1836+
return SortednessByGroupKey::failed(); // Bail out to simplify code a bit. This should not happen in practice.
1837+
}
1838+
sort_to_group[sort_key_pos] = group_i;
1839+
}
1840+
};
1841+
}
1842+
1843+
let mut clumps = Vec::<Vec<(usize, usize)>>::new();
1844+
// At this point we walk through the sort_key_hit vec.
1845+
let mut clump = Vec::<(usize, usize)>::new();
1846+
// Are our clumps detached from the sort prefix?
1847+
let mut detached_from_prefix = false;
1848+
for (i, &hit) in sort_key_hit.iter().enumerate() {
1849+
if hit {
1850+
clump.push((i, sort_to_group[i]));
1851+
} else if hints.single_value_columns.contains(&sort_key[i]) {
1852+
// Don't end the clump.
1853+
} else {
1854+
if clump.is_empty() {
1855+
detached_from_prefix |= clumps.is_empty();
1856+
} else {
1857+
clumps.push(clump);
1858+
clump = Vec::new();
1859+
}
1860+
}
1861+
}
1862+
if !clump.is_empty() {
1863+
clumps.push(clump);
1864+
}
1865+
1866+
SortednessByGroupKey {
1867+
sort_order: clumps,
1868+
unsorted: unsorted_group_keys,
1869+
detached_from_prefix,
1870+
succeeded: true,
1871+
}
1872+
}
1873+
17491874
fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
17501875
match value {
17511876
(Ok(e), Ok(e1)) => Ok((e, e1)),
@@ -2105,6 +2230,17 @@ mod tests {
21052230
assert!(is_sorted);
21062231
assert_eq!(sort_order, vec![0, 1, 2, 3, 4]);
21072232

2233+
let sortedness =
2234+
input_sortedness_by_group_key(execution_plan.as_ref(), &physical_group_key);
2235+
assert!(sortedness.succeeded);
2236+
assert_eq!(
2237+
sortedness.sort_order,
2238+
vec![vec![(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]]
2239+
);
2240+
assert_eq!(sortedness.unsorted, vec![] as Vec<usize>);
2241+
assert_eq!(sortedness.detached_from_prefix, false);
2242+
assert!(sortedness.is_sorted_by_group_key());
2243+
21082244
Ok(())
21092245
}
21102246

0 commit comments

Comments
 (0)