Skip to content

Commit e08ecdc

Browse files
authored
perf: Respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config (apache#1619)
1 parent de32fc8 commit e08ecdc

File tree

3 files changed

+31
-18
lines changed

3 files changed

+31
-18
lines changed

docs/source/user-guide/tuning.md

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,25 @@ under the License.
2121

2222
Comet provides some tuning options to help you get the best performance from your queries.
2323

24+
## Parquet Scans
25+
26+
Comet currently has three distinct implementations of the Parquet scan operator. The configuration property
27+
`spark.comet.scan.impl` is used to select an implementation. These scans are described in the
28+
[Compatibility Guide].
29+
30+
[Compatibility Guide]: compatibility.md
31+
32+
When using `native_datafusion` or `native_iceberg_compat`, there are known performance issues when pushing filters
33+
down to Parquet scans. Until this issue is resolved, performance can be improved by setting
34+
`spark.sql.parquet.filterPushdown=false`.
35+
2436
## Configuring Tokio Runtime
2537

26-
Comet uses a global tokio runtime per executor process using tokio's defaults of one worker thread per core and a
38+
Comet uses a global tokio runtime per executor process using tokio's defaults of one worker thread per core and a
2739
maximum of 512 blocking threads. These values can be overridden using the environment variables `COMET_WORKER_THREADS`
2840
and `COMET_MAX_BLOCKING_THREADS`.
2941

30-
DataFusion currently has a known issue when merging spill files in sort operators where the process can deadlock if
42+
DataFusion currently has a known issue when merging spill files in sort operators where the process can deadlock if
3143
there are more spill files than `COMET_MAX_BLOCKING_THREADS` ([tracking issue](https://github.com/apache/datafusion/issues/15323)).
3244

3345
## Memory Tuning
@@ -54,14 +66,14 @@ Spark documentation: https://spark.apache.org/docs/latest/configuration.html.
5466

5567
When running in on-heap mode, Comet memory can be allocated by setting `spark.comet.memoryOverhead`. If this setting
5668
is not provided, it will be calculated by multiplying the current Spark executor memory by
57-
`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may not result in enough memory for
69+
`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may not result in enough memory for
5870
Comet to operate. It is not recommended to rely on this behavior. It is better to specify `spark.comet.memoryOverhead`
5971
explicitly.
6072

61-
Comet supports native shuffle and columnar shuffle (these terms are explained in the [shuffle] section below).
62-
In on-heap mode, columnar shuffle memory must be separately allocated using `spark.comet.columnar.shuffle.memorySize`.
73+
Comet supports native shuffle and columnar shuffle (these terms are explained in the [shuffle] section below).
74+
In on-heap mode, columnar shuffle memory must be separately allocated using `spark.comet.columnar.shuffle.memorySize`.
6375
If this setting is not provided, it will be calculated by multiplying `spark.comet.memoryOverhead` by
64-
`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a shuffle exceeds this amount of memory
76+
`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a shuffle exceeds this amount of memory
6577
then the query will fail.
6678

6779
[shuffle]: #shuffle
@@ -73,35 +85,35 @@ amount of time spent spilling to disk, especially for aggregate, join, and shuff
7385
memory can result in out-of-memory errors. This is no different from allocating memory in Spark and the amount of
7486
memory will vary for different workloads, so some experimentation will be required.
7587

76-
Here is a real-world example, based on running benchmarks derived from TPC-H, running on a single executor against
88+
Here is a real-world example, based on running benchmarks derived from TPC-H, running on a single executor against
7789
local Parquet files using the 100 GB data set.
7890

79-
Baseline Spark Performance
91+
Baseline Spark Performance
8092

8193
- Spark completes the benchmark in 632 seconds with 8 cores and 8 GB RAM
8294
- With less than 8 GB RAM, performance degrades due to spilling
8395
- Spark can complete the benchmark with as little as 3 GB of RAM, but with worse performance (744 seconds)
8496

8597
Comet Performance
8698

87-
- Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap mode, but performance at this level
99+
- Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap mode, but performance at this level
88100
is around 340 seconds, which is significantly faster than Spark with any amount of RAM
89101
- Comet running in off-heap with 8 cores completes the benchmark in 295 seconds, more than 2x faster than Spark
90-
- It is worth noting that running Comet with only 4 cores and 4 GB RAM completes the benchmark in 520 seconds,
102+
- It is worth noting that running Comet with only 4 cores and 4 GB RAM completes the benchmark in 520 seconds,
91103
providing better performance than Spark for half the resource
92104

93105
It may be possible to reduce Comet's memory overhead by reducing batch sizes or increasing number of partitions.
94106

95107
### SortExec
96108

97-
Comet's SortExec implementation spills to disk when under memory pressure, but there are some known issues in the
109+
Comet's SortExec implementation spills to disk when under memory pressure, but there are some known issues in the
98110
underlying DataFusion SortExec implementation that could cause out-of-memory errors during spilling. See
99111
https://github.com/apache/datafusion/issues/14692 for more information.
100112

101113
Workarounds for this problem include:
102114

103115
- Allocating more off-heap memory
104-
- Disabling native sort by setting `spark.comet.exec.sort.enabled=false`
116+
- Disabling native sort by setting `spark.comet.exec.sort.enabled=false`
105117

106118
## Advanced Memory Tuning
107119

@@ -122,11 +134,11 @@ The valid pool types for off-heap mode are:
122134
- `unified` (default when `spark.memory.offHeap.enabled=true` is set)
123135
- `fair_unified`
124136

125-
Both of these pools share off-heap memory between Spark and Comet. This approach is referred to as
137+
Both of these pools share off-heap memory between Spark and Comet. This approach is referred to as
126138
unified memory management. The size of the pool is specified by `spark.memory.offHeap.size`.
127139

128140
The `unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
129-
need to spill or have a single spillable operator.
141+
need to spill or have a single spillable operator.
130142

131143
The `fair_unified` pool type prevents operators from using more than an even fraction of the available memory
132144
(i.e. `pool_size / num_reservations`). This pool works best when you know beforehand

native/core/src/parquet/parquet_exec.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ pub(crate) fn init_datasource_exec(
104104

105105
fn get_options(session_timezone: &str) -> (TableParquetOptions, SparkParquetOptions) {
106106
let mut table_parquet_options = TableParquetOptions::new();
107-
// TODO: Maybe these are configs?
108107
table_parquet_options.global.pushdown_filters = true;
109108
table_parquet_options.global.reorder_filters = true;
110109
let mut spark_parquet_options =

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2241,9 +2241,11 @@ object QueryPlanSerde extends Logging with CometExprShim {
22412241
// Sink operators don't have children
22422242
result.clearChildren()
22432243

2244-
// TODO remove flatMap and add error handling for unsupported data filters
2245-
val dataFilters = scan.dataFilters.flatMap(exprToProto(_, scan.output))
2246-
nativeScanBuilder.addAllDataFilters(dataFilters.asJava)
2244+
if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED)) {
2245+
// TODO remove flatMap and add error handling for unsupported data filters
2246+
val dataFilters = scan.dataFilters.flatMap(exprToProto(_, scan.output))
2247+
nativeScanBuilder.addAllDataFilters(dataFilters.asJava)
2248+
}
22472249

22482250
// TODO: modify CometNativeScan to generate the file partitions without instantiating RDD.
22492251
scan.inputRDD match {

0 commit comments

Comments
 (0)