From 46a4c456e85d05ad7b346f0b3c08e7914dec5ebb Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Sat, 1 Nov 2025 07:56:56 -0400 Subject: [PATCH 1/2] Don't serialize unfiltered data filters for native_datafusion. --- .../apache/comet/serde/QueryPlanSerde.scala | 2 +- .../apache/spark/sql/comet/CometScanExec.scala | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 570c07cb09..a0b89ef95d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -959,7 +959,7 @@ object QueryPlanSerde extends Logging with CometExprShim { CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(conf)) { val dataFilters = new ListBuffer[Expr]() - for (filter <- scan.dataFilters) { + for (filter <- scan.supportedDataFilters) { exprToProto(filter, scan.output) match { case Some(proto) => dataFilters += proto case _ => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 4dd889d231..ba73c2a24d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -155,16 +155,22 @@ case class CometScanExec( } } - @transient - private lazy val pushedDownFilters = { - val supportedFilters = if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION) { - // `native_datafusion` scan does not support subquery pushdown filters, - // see: https://github.com/apache/datafusion-comet/issues/2424 + /** + * Returns the data filters that are supported for this scan implementation. For + * native_datafusion scans, this excludes dynamic pruning filters (subqueries) since they are + * not supported by DataFusion. See: https://github.com/apache/datafusion-comet/issues/2424 + */ + lazy val supportedDataFilters: Seq[Expression] = { + if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION) { dataFilters.filterNot(isDynamicPruningFilter) } else { dataFilters } - getPushedDownFilters(relation, supportedFilters) + } + + @transient + private lazy val pushedDownFilters = { + getPushedDownFilters(relation, supportedDataFilters) } override lazy val metadata: Map[String, String] = From bbf6091d7880c81364f1b26669afd7b45c826b3e Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Sat, 1 Nov 2025 08:02:24 -0400 Subject: [PATCH 2/2] Simplify comment. --- .../main/scala/org/apache/spark/sql/comet/CometScanExec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index ba73c2a24d..efc54ab712 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -157,8 +157,7 @@ case class CometScanExec( /** * Returns the data filters that are supported for this scan implementation. For - * native_datafusion scans, this excludes dynamic pruning filters (subqueries) since they are - * not supported by DataFusion. See: https://github.com/apache/datafusion-comet/issues/2424 + * native_datafusion scans, this excludes dynamic pruning filters (subqueries) */ lazy val supportedDataFilters: Seq[Expression] = { if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION) {