Skip to content

Commit c4b9995

Browse files
authored
fix: add order_requirement & dist_requirement to OutputRequirementExec display (#16726)
* add: `order_requirement` & `dist_requirement` to `OutputRequirementExec` plan display * chore: cargo clippy * add: get `order_by` and `dost_cols` * fix: fmt order_by as (col, direction) * fix: fmt * fix: expr fmt * chore: cargo clippy * test: fix test for output requirement * test: fix sqllogictest * fix: fmt * refactor: use first() to borrow ordering requirements
1 parent a614716 commit c4b9995

File tree

4 files changed

+34
-9
lines changed

4 files changed

+34
-9
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,7 @@ async fn test_soft_hard_requirements_with_multiple_soft_requirements_and_output_
983983
));
984984

985985
let expected_input = [
986-
"OutputRequirementExec",
986+
"OutputRequirementExec: order_by=[(non_nullable_col@1, asc)], dist_by=SinglePartition",
987987
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]",
988988
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]",
989989
" SortExec: expr=[nullable_col@0 DESC NULLS LAST], preserve_partitioning=[false]",
@@ -998,7 +998,7 @@ async fn test_soft_hard_requirements_with_multiple_soft_requirements_and_output_
998998
// " DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet",
999999
// ];
10001000
let expected_optimized = [
1001-
"OutputRequirementExec",
1001+
"OutputRequirementExec: order_by=[(non_nullable_col@1, asc)], dist_by=SinglePartition",
10021002
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]",
10031003
" SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]",
10041004
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]",

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ fn test_output_req_after_projection() -> Result<()> {
729729
actual,
730730
@r"
731731
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
732-
OutputRequirementExec
732+
OutputRequirementExec: order_by=[(b@1, asc), (c@2 + a@0, asc)], dist_by=HashPartitioned[[a@0, b@1]])
733733
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
734734
"
735735
);
@@ -745,7 +745,7 @@ fn test_output_req_after_projection() -> Result<()> {
745745
assert_snapshot!(
746746
actual,
747747
@r"
748-
OutputRequirementExec
748+
OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)], dist_by=HashPartitioned[[new_a@1, b@2]])
749749
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
750750
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
751751
"

datafusion/physical-optimizer/src/output_requirements.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,35 @@ impl DisplayAs for OutputRequirementExec {
138138
) -> std::fmt::Result {
139139
match t {
140140
DisplayFormatType::Default | DisplayFormatType::Verbose => {
141-
write!(f, "OutputRequirementExec")
141+
let order_cols = self
142+
.order_requirement
143+
.as_ref()
144+
.map(|reqs| reqs.first())
145+
.map(|lex| {
146+
let pairs: Vec<String> = lex
147+
.iter()
148+
.map(|req| {
149+
let direction = req
150+
.options
151+
.as_ref()
152+
.map(
153+
|opt| if opt.descending { "desc" } else { "asc" },
154+
)
155+
.unwrap_or("unspecified");
156+
format!("({}, {direction})", req.expr)
157+
})
158+
.collect();
159+
format!("[{}]", pairs.join(", "))
160+
})
161+
.unwrap_or_else(|| "[]".to_string());
162+
163+
write!(
164+
f,
165+
"OutputRequirementExec: order_by={}, dist_by={}",
166+
order_cols, self.dist_requirement
167+
)
142168
}
143169
DisplayFormatType::TreeRender => {
144-
// TODO: collect info
145170
write!(f, "")
146171
}
147172
}

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ initial_physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/dat
226226
initial_physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]
227227
initial_physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true, schema=[a:Int32;N, b:Int32;N, c:Int32;N]
228228
physical_plan after OutputRequirements
229-
01)OutputRequirementExec
229+
01)OutputRequirementExec: order_by=[], dist_by=Unspecified
230230
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
231231
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
232232
physical_plan after join_selection SAME TEXT AS ABOVE
@@ -303,7 +303,7 @@ initial_physical_plan_with_schema
303303
01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
304304
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
305305
physical_plan after OutputRequirements
306-
01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
306+
01)OutputRequirementExec: order_by=[], dist_by=Unspecified, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
307307
02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
308308
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
309309
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
@@ -347,7 +347,7 @@ initial_physical_plan_with_schema
347347
01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
348348
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
349349
physical_plan after OutputRequirements
350-
01)OutputRequirementExec
350+
01)OutputRequirementExec: order_by=[], dist_by=Unspecified
351351
02)--GlobalLimitExec: skip=0, fetch=10
352352
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
353353
physical_plan after aggregate_statistics SAME TEXT AS ABOVE

0 commit comments

Comments
 (0)