Skip to content

Commit 6329ebd

Browse files
committed
Implement tree explain for DataSourceExec
1 parent 3dc212c commit 6329ebd

File tree

6 files changed

+245
-44
lines changed

6 files changed

+245
-44
lines changed

datafusion/datasource-csv/src/source.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,8 +617,13 @@ impl FileSource for CsvSource {
617617
fn file_type(&self) -> &str {
618618
"csv"
619619
}
620-
fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
621-
write!(f, ", has_header={}", self.has_header)
620+
fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
621+
match t {
622+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
623+
write!(f, ", has_header={}", self.has_header)
624+
}
625+
DisplayFormatType::TreeRender => Ok(()),
626+
}
622627
}
623628
}
624629

datafusion/datasource-parquet/src/source.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -554,14 +554,11 @@ impl FileSource for ParquetSource {
554554

555555
fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
556556
match t {
557-
DisplayFormatType::Default
558-
| DisplayFormatType::Verbose
559-
| DisplayFormatType::TreeRender => {
557+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
560558
let predicate_string = self
561559
.predicate()
562560
.map(|p| format!(", predicate={p}"))
563561
.unwrap_or_default();
564-
565562
let pruning_predicate_string = self
566563
.pruning_predicate()
567564
.map(|pre| {
@@ -581,6 +578,12 @@ impl FileSource for ParquetSource {
581578

582579
write!(f, "{}{}", predicate_string, pruning_predicate_string)
583580
}
581+
DisplayFormatType::TreeRender => {
582+
if let Some(predicate) = self.predicate() {
583+
writeln!(f, "predicate={predicate}")?;
584+
}
585+
Ok(())
586+
}
584587
}
585588
}
586589
}

datafusion/datasource/src/file_scan_config.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ impl DataSource for FileScanConfig {
218218
self.fmt_file_source(t, f)
219219
}
220220
DisplayFormatType::TreeRender => {
221-
// TODO: collect info
221+
writeln!(f, "format={}", self.file_source.file_type())?;
222+
self.file_source.fmt_extra(t, f)?;
223+
let num_files = self.file_groups.iter().map(Vec::len).sum::<usize>();
224+
writeln!(f, "files={num_files}")?;
222225
Ok(())
223226
}
224227
}

datafusion/datasource/src/memory.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -425,25 +425,20 @@ impl DataSource for MemorySourceConfig {
425425
}
426426
}
427427
DisplayFormatType::TreeRender => {
428-
let partition_sizes: Vec<_> =
429-
self.partitions.iter().map(|b| b.len()).collect();
430-
writeln!(f, "partition_sizes={:?}", partition_sizes)?;
431-
432-
if let Some(output_ordering) = self.sort_information.first() {
433-
writeln!(f, "output_ordering={}", output_ordering)?;
434-
}
435-
436-
let eq_properties = self.eq_properties();
437-
let constraints = eq_properties.constraints();
438-
if !constraints.is_empty() {
439-
writeln!(f, "constraints={}", constraints)?;
440-
}
441-
442-
if let Some(limit) = self.fetch {
443-
writeln!(f, "fetch={}", limit)?;
444-
}
445-
446-
write!(f, "partitions={}", partition_sizes.len())
428+
let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
429+
let total_bytes = self
430+
.partitions
431+
.iter()
432+
.map(|b| {
433+
b.iter()
434+
.map(|batch| batch.get_array_memory_size())
435+
.sum::<usize>()
436+
})
437+
.sum::<usize>();
438+
writeln!(f, "format=memory")?;
439+
writeln!(f, "rows={total_rows}")?;
440+
writeln!(f, "bytes={total_bytes}")?;
441+
Ok(())
447442
}
448443
}
449444
}

datafusion/datasource/src/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub trait DataSource: Send + Sync + Debug {
5252
context: Arc<TaskContext>,
5353
) -> datafusion_common::Result<SendableRecordBatchStream>;
5454
fn as_any(&self) -> &dyn Any;
55+
/// Format this source for display in explain plans
5556
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
5657

5758
/// Return a copy of this DataSource with a new partitioning scheme
@@ -103,7 +104,7 @@ impl DisplayAs for DataSourceExec {
103104
DisplayFormatType::Default | DisplayFormatType::Verbose => {
104105
write!(f, "DataSourceExec: ")?;
105106
}
106-
DisplayFormatType::TreeRender => write!(f, "")?,
107+
DisplayFormatType::TreeRender => {}
107108
}
108109
self.data_source.fmt_as(t, f)
109110
}

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 211 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,36 @@ STORED AS PARQUET
5454
LOCATION 'test_files/scratch/explain_tree/table2.parquet';
5555

5656

57-
# table3: Memoru
57+
# table3: Memory
5858
statement ok
5959
CREATE TABLE table3 as select * from table1;
6060

61+
# table4: JSON
62+
query I
63+
COPY (SELECT * from table1)
64+
TO 'test_files/scratch/explain_tree/table4.json'
65+
----
66+
3
67+
68+
statement ok
69+
CREATE EXTERNAL TABLE table4
70+
STORED AS JSON
71+
LOCATION 'test_files/scratch/explain_tree/table4.json';
72+
73+
# table5: ARROW
74+
query I
75+
COPY (SELECT * from table1)
76+
TO 'test_files/scratch/explain_tree/table5.arrow'
77+
----
78+
3
79+
80+
statement ok
81+
CREATE EXTERNAL TABLE table5
82+
STORED AS ARROW
83+
LOCATION 'test_files/scratch/explain_tree/table5.arrow';
84+
85+
86+
6187
######## Begin Queries ########
6288

6389
# Filter
@@ -83,7 +109,10 @@ physical_plan
83109
12)└─────────────┬─────────────┘
84110
13)┌─────────────┴─────────────┐
85111
14)│ DataSourceExec │
86-
15)└───────────────────────────┘
112+
15)│ -------------------- │
113+
16)│ files: 1 │
114+
17)│ format: csv │
115+
18)└───────────────────────────┘
87116

88117
# Aggregate
89118
query TT
@@ -110,7 +139,10 @@ physical_plan
110139
15)└─────────────┬─────────────┘
111140
16)┌─────────────┴─────────────┐
112141
17)│ DataSourceExec │
113-
18)└───────────────────────────┘
142+
18)│ -------------------- │
143+
19)│ files: 1 │
144+
20)│ format: csv │
145+
21)└───────────────────────────┘
114146

115147
# 2 Joins
116148
query TT
@@ -139,7 +171,10 @@ physical_plan
139171
15)└─────────────┬─────────────┘└─────────────┬─────────────┘
140172
16)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
141173
17)│ DataSourceExec ││ DataSourceExec │
142-
18)└───────────────────────────┘└───────────────────────────┘
174+
18)│ -------------------- ││ -------------------- │
175+
19)│ files: 1 ││ files: 1 │
176+
20)│ format: csv ││ format: parquet │
177+
21)└───────────────────────────┘└───────────────────────────┘
143178

144179
# 3 Joins
145180
query TT
@@ -175,18 +210,22 @@ physical_plan
175210
13)┌─────────────┴─────────────┐┌─────────────┴─────────────┐┌─────────────┴─────────────┐
176211
14)│ CoalesceBatchesExec ││ CoalesceBatchesExec ││ DataSourceExec │
177212
15)│ ││ ││ -------------------- │
178-
16)│ ││ ││ partition_sizes: [1] │
179-
17)│ ││ ││ partitions: 1 │
180-
18)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
181-
19)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
182-
20)│ RepartitionExec ││ RepartitionExec │
183-
21)└─────────────┬─────────────┘└─────────────┬─────────────┘
184-
22)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
185-
23)│ RepartitionExec ││ RepartitionExec │
186-
24)└─────────────┬─────────────┘└─────────────┬─────────────┘
187-
25)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
188-
26)│ DataSourceExec ││ DataSourceExec │
189-
27)└───────────────────────────┘└───────────────────────────┘
213+
16)│ ││ ││ bytes: 1560 │
214+
17)│ ││ ││ format: memory │
215+
18)│ ││ ││ rows: 1 │
216+
19)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
217+
20)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
218+
21)│ RepartitionExec ││ RepartitionExec │
219+
22)└─────────────┬─────────────┘└─────────────┬─────────────┘
220+
23)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
221+
24)│ RepartitionExec ││ RepartitionExec │
222+
25)└─────────────┬─────────────┘└─────────────┬─────────────┘
223+
26)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
224+
27)│ DataSourceExec ││ DataSourceExec │
225+
28)│ -------------------- ││ -------------------- │
226+
29)│ files: 1 ││ files: 1 │
227+
30)│ format: csv ││ format: parquet │
228+
31)└───────────────────────────┘└───────────────────────────┘
190229

191230
# Long Filter (demonstrate what happens with wrapping)
192231
query TT
@@ -213,7 +252,156 @@ physical_plan
213252
12)└─────────────┬─────────────┘
214253
13)┌─────────────┴─────────────┐
215254
14)│ DataSourceExec │
216-
15)└───────────────────────────┘
255+
15)│ -------------------- │
256+
16)│ files: 1 │
257+
17)│ format: csv │
258+
18)└───────────────────────────┘
259+
260+
# Query with filter on csv
261+
query TT
262+
explain SELECT int_col FROM table2 WHERE string_col != 'foo';
263+
----
264+
logical_plan
265+
01)Projection: table2.int_col
266+
02)--Filter: table2.string_col != Utf8View("foo")
267+
03)----TableScan: table2 projection=[int_col, string_col], partial_filters=[table2.string_col != Utf8View("foo")]
268+
physical_plan
269+
01)┌───────────────────────────┐
270+
02)│ CoalesceBatchesExec │
271+
03)└─────────────┬─────────────┘
272+
04)┌─────────────┴─────────────┐
273+
05)│ FilterExec │
274+
06)│ -------------------- │
275+
07)│ predicate: │
276+
08)│ string_col@1 != foo │
277+
09)└─────────────┬─────────────┘
278+
10)┌─────────────┴─────────────┐
279+
11)│ RepartitionExec │
280+
12)└─────────────┬─────────────┘
281+
13)┌─────────────┴─────────────┐
282+
14)│ DataSourceExec │
283+
15)│ -------------------- │
284+
16)│ files: 1 │
285+
17)│ format: parquet │
286+
18)│ │
287+
19)│ predicate: │
288+
20)│ string_col@1 != foo │
289+
21)└───────────────────────────┘
290+
291+
292+
# Query with filter on parquet
293+
query TT
294+
explain SELECT int_col FROM table2 WHERE string_col != 'foo';
295+
----
296+
logical_plan
297+
01)Projection: table2.int_col
298+
02)--Filter: table2.string_col != Utf8View("foo")
299+
03)----TableScan: table2 projection=[int_col, string_col], partial_filters=[table2.string_col != Utf8View("foo")]
300+
physical_plan
301+
01)┌───────────────────────────┐
302+
02)│ CoalesceBatchesExec │
303+
03)└─────────────┬─────────────┘
304+
04)┌─────────────┴─────────────┐
305+
05)│ FilterExec │
306+
06)│ -------------------- │
307+
07)│ predicate: │
308+
08)│ string_col@1 != foo │
309+
09)└─────────────┬─────────────┘
310+
10)┌─────────────┴─────────────┐
311+
11)│ RepartitionExec │
312+
12)└─────────────┬─────────────┘
313+
13)┌─────────────┴─────────────┐
314+
14)│ DataSourceExec │
315+
15)│ -------------------- │
316+
16)│ files: 1 │
317+
17)│ format: parquet │
318+
18)│ │
319+
19)│ predicate: │
320+
20)│ string_col@1 != foo │
321+
21)└───────────────────────────┘
322+
323+
# Query with filter on memory
324+
query TT
325+
explain SELECT int_col FROM table3 WHERE string_col != 'foo';
326+
----
327+
logical_plan
328+
01)Projection: table3.int_col
329+
02)--Filter: table3.string_col != Utf8("foo")
330+
03)----TableScan: table3 projection=[int_col, string_col]
331+
physical_plan
332+
01)┌───────────────────────────┐
333+
02)│ CoalesceBatchesExec │
334+
03)└─────────────┬─────────────┘
335+
04)┌─────────────┴─────────────┐
336+
05)│ FilterExec │
337+
06)│ -------------------- │
338+
07)│ predicate: │
339+
08)│ string_col@1 != foo │
340+
09)└─────────────┬─────────────┘
341+
10)┌─────────────┴─────────────┐
342+
11)│ DataSourceExec │
343+
12)│ -------------------- │
344+
13)│ bytes: 1560 │
345+
14)│ format: memory │
346+
15)│ rows: 1 │
347+
16)└───────────────────────────┘
348+
349+
# Query with filter on json
350+
query TT
351+
explain SELECT int_col FROM table4 WHERE string_col != 'foo';
352+
----
353+
logical_plan
354+
01)Projection: table4.int_col
355+
02)--Filter: table4.string_col != Utf8("foo")
356+
03)----TableScan: table4 projection=[int_col, string_col], partial_filters=[table4.string_col != Utf8("foo")]
357+
physical_plan
358+
01)┌───────────────────────────┐
359+
02)│ CoalesceBatchesExec │
360+
03)└─────────────┬─────────────┘
361+
04)┌─────────────┴─────────────┐
362+
05)│ FilterExec │
363+
06)│ -------------------- │
364+
07)│ predicate: │
365+
08)│ string_col@1 != foo │
366+
09)└─────────────┬─────────────┘
367+
10)┌─────────────┴─────────────┐
368+
11)│ RepartitionExec │
369+
12)└─────────────┬─────────────┘
370+
13)┌─────────────┴─────────────┐
371+
14)│ DataSourceExec │
372+
15)│ -------------------- │
373+
16)│ files: 1 │
374+
17)│ format: json │
375+
18)└───────────────────────────┘
376+
377+
# Query with filter on arrow
378+
query TT
379+
explain SELECT int_col FROM table5 WHERE string_col != 'foo';
380+
----
381+
logical_plan
382+
01)Projection: table5.int_col
383+
02)--Filter: table5.string_col != Utf8("foo")
384+
03)----TableScan: table5 projection=[int_col, string_col], partial_filters=[table5.string_col != Utf8("foo")]
385+
physical_plan
386+
01)┌───────────────────────────┐
387+
02)│ CoalesceBatchesExec │
388+
03)└─────────────┬─────────────┘
389+
04)┌─────────────┴─────────────┐
390+
05)│ FilterExec │
391+
06)│ -------------------- │
392+
07)│ predicate: │
393+
08)│ string_col@1 != foo │
394+
09)└─────────────┬─────────────┘
395+
10)┌─────────────┴─────────────┐
396+
11)│ RepartitionExec │
397+
12)└─────────────┬─────────────┘
398+
13)┌─────────────┴─────────────┐
399+
14)│ DataSourceExec │
400+
15)│ -------------------- │
401+
16)│ files: 1 │
402+
17)│ format: arrow │
403+
18)└───────────────────────────┘
404+
217405

218406

219407
# cleanup
@@ -225,3 +413,9 @@ drop table table2;
225413

226414
statement ok
227415
drop table table3;
416+
417+
statement ok
418+
drop table table4;
419+
420+
statement ok
421+
drop table table5;

0 commit comments

Comments
 (0)