Skip to content
Closed
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6ee6def
filter
andygrove Nov 8, 2025
3a80a1f
limit
andygrove Nov 8, 2025
51e0aa2
hash join
andygrove Nov 8, 2025
8c0338e
sort merge join
andygrove Nov 8, 2025
e30e776
save
andygrove Nov 8, 2025
d8934db
window exec
andygrove Nov 8, 2025
ac30640
agg
andygrove Nov 8, 2025
b515d8f
docs
andygrove Nov 8, 2025
fcab57e
refactor
andygrove Nov 8, 2025
4cde286
refactor
andygrove Nov 8, 2025
8afadf7
fix
andygrove Nov 8, 2025
ae2254e
scalastyle
andygrove Nov 8, 2025
3dbb10d
remove map approach
andygrove Nov 8, 2025
30360ab
scalastyle
andygrove Nov 8, 2025
a6e9023
revert config
andygrove Nov 8, 2025
c719eeb
revert docs
andygrove Nov 8, 2025
812dd7f
revert a change
andygrove Nov 8, 2025
0343323
refactor sink serde
andygrove Nov 8, 2025
7aeea75
refactor
andygrove Nov 8, 2025
e811e7a
fix
andygrove Nov 8, 2025
7c59a0c
map lookup
andygrove Nov 8, 2025
83cc2c3
more refactoring
andygrove Nov 8, 2025
5131395
more refactoring
andygrove Nov 8, 2025
82da68f
comments
andygrove Nov 8, 2025
54e9d17
fix
andygrove Nov 8, 2025
266ff4d
fix
andygrove Nov 8, 2025
ff2b9ad
docs
andygrove Nov 8, 2025
4acc0d8
upmerge
andygrove Nov 8, 2025
ba2ca5a
docs
andygrove Nov 8, 2025
9140742
docs
andygrove Nov 8, 2025
7216f3c
improve withInfo
andygrove Nov 8, 2025
1a4807b
add operator incompat handling
andygrove Nov 8, 2025
865274f
update 3.5 golden files
andygrove Nov 8, 2025
74ee98e
3.4 golden files
andygrove Nov 8, 2025
468ea56
4.0 golden files
andygrove Nov 8, 2025
26d2480
update expected fallback reason in test
andygrove Nov 8, 2025
c997b95
hacky fix
andygrove Nov 8, 2025
50b62b8
less hacky fix
andygrove Nov 8, 2025
de2c6fd
refine
andygrove Nov 8, 2025
6183663
remove println
andygrove Nov 8, 2025
3a59359
fix
andygrove Nov 8, 2025
12ff805
scalastyle
andygrove Nov 8, 2025
fd4385a
update 4.0 golden files
andygrove Nov 9, 2025
839f6c7
update 4.0 golden files
andygrove Nov 9, 2025
5f1fbf4
update 3.5 golden files
andygrove Nov 9, 2025
04899f2
update 3.4 golden files
andygrove Nov 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("expand", defaultValue = true)
val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("window", defaultValue = true)
createExecEnabledConfig("window", defaultValue = false)
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.sortMergeJoinWithJoinFilter.enabled` | Experimental support for Sort Merge Join with filter | false |
| `spark.comet.exec.takeOrderedAndProject.enabled` | Whether to enable takeOrderedAndProject by default. | true |
| `spark.comet.exec.union.enabled` | Whether to enable union by default. | true |
| `spark.comet.exec.window.enabled` | Whether to enable window by default. | true |
| `spark.comet.exec.window.enabled` | Whether to enable window by default. | false |
<!--END:CONFIG_TABLE-->

## Enabling or Disabling Individual Scalar Expressions
Expand Down
17 changes: 17 additions & 0 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,23 @@ incompatible expressions.
| VariancePop | | Yes | |
| VarianceSamp | | Yes | |

## Window Functions

```{warning}
Window support is disabled by default due to known correctness issues. Tracking issue: [#2721](https://github.com/apache/datafusion-comet/issues/2721).
```

Comet supports using the following aggregate functions within window contexts with PARTITION BY and ORDER BY clauses.

| Expression | Spark-Compatible? | Compatibility Notes |
| ---------- | ----------------- | -------------------------------------------- |
| Count | Yes | |
| Max | Yes | |
| Min | Yes | |
| Sum | Yes | |

**Note:** Dedicated window functions such as `rank`, `dense_rank`, `row_number`, `lag`, `lead`, `ntile`, `cume_dist`, `percent_rank`, and `nth_value` are not currently supported and will fall back to Spark.

## Array Expressions

| Expression | Spark-Compatible? | Compatibility Notes |
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/latest/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ The following Spark operators are currently replaced with native versions. Query
not supported by Comet will fall back to regular Spark execution.

| Operator | Spark-Compatible? | Compatibility Notes |
| ----------------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------ |
| ----------------------- |-------------------|--------------------------------------------------------------------------------------------------------------------|
| BatchScanExec | Yes | Supports Parquet files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more information. |
| BroadcastExchangeExec | Yes | |
| BroadcastHashJoinExec | Yes | |
Expand All @@ -40,6 +40,6 @@ not supported by Comet will fall back to regular Spark execution.
| SortExec | Yes | |
| SortMergeJoinExec | Yes | |
| UnionExec | Yes | |
| WindowExec | Yes | |
| WindowExec | No | Disabled by default due to known correctness issues. |

[Comet Compatibility Guide]: compatibility.md
134 changes: 134 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/CometSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.serde

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometScanExec, CometSinkPlaceHolder, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.{CoalesceExec, CollectLimitExec, SparkPlan, TakeOrderedAndProjectExec, UnionExec}
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataType}

object CometSink extends CometOperatorSerde[SparkPlan] {

override def enabledConfig: Option[ConfigEntry[Boolean]] = None

override def convert(
op: SparkPlan,
builder: Operator.Builder,
childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = {
val supportedTypes =
op.output.forall(a => supportedDataType(a.dataType, allowComplex = true))

if (!supportedTypes) {
withInfo(op, "Unsupported data type")
return None
}

// These operators are source of Comet native execution chain
val scanBuilder = OperatorOuterClass.Scan.newBuilder()
val source = op.simpleStringWithNodeId()
if (source.isEmpty) {
scanBuilder.setSource(op.getClass.getSimpleName)
} else {
scanBuilder.setSource(source)
}

val ffiSafe = op match {
case _ if isExchangeSink(op) =>
// Source of broadcast exchange batches is ArrowStreamReader
// Source of shuffle exchange batches is NativeBatchDecoderIterator
true
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_COMET =>
// native_comet scan reuses mutable buffers
false
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
// native_iceberg_compat scan reuses mutable buffers for constant columns
// https://github.com/apache/datafusion-comet/issues/2152
false
case _ =>
false
}
scanBuilder.setArrowFfiSafe(ffiSafe)

val scanTypes = op.output.flatten { attr =>
serializeDataType(attr.dataType)
}

if (scanTypes.length == op.output.length) {
scanBuilder.addAllFields(scanTypes.asJava)

// Sink operators don't have children
builder.clearChildren()

Some(builder.setScan(scanBuilder).build())
} else {
// There are unsupported scan type
withInfo(
op,
s"unsupported Comet operator: ${op.nodeName}, due to unsupported data types above")
None
}

}

/**
* Whether the input Spark operator `op` can be considered as a Comet sink, i.e., the start of
* native execution. If it is true, we'll wrap `op` with `CometScanWrapper` or
* `CometSinkPlaceHolder` later in `CometSparkSessionExtensions` after `operator2proto` is
* called.
*/
def isCometSink(op: SparkPlan): Boolean = {
if (isExchangeSink(op)) {
return true
}
op match {
case s if isCometScan(s) => true
case _: CometSparkToColumnarExec => true
case _: CometSinkPlaceHolder => true
case _: CoalesceExec => true
case _: CollectLimitExec => true
case _: UnionExec => true
case _: TakeOrderedAndProjectExec => true
// https://github.com/apache/datafusion-comet/issues/2737
// case _: WindowExec => true
Comment on lines +116 to +117
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for #2737

case _ => false
}
}

private def isExchangeSink(op: SparkPlan): Boolean = {
op match {
case _: ShuffleExchangeExec => true
case ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => true
case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) => true
case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true
case BroadcastQueryStageExec(_, ReusedExchangeExec(_, _: CometBroadcastExchangeExec), _) =>
true
case _: BroadcastExchangeExec => true
case _ => false
}
}
}
179 changes: 177 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/CometWindow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ package org.apache.comet.serde

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, SortOrder, WindowExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CurrentRow, Expression, RangeFrame, RowFrame, SortOrder, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, WindowExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Max, Min, Sum}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DecimalType

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, windowExprToProto}
import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto}

object CometWindow extends CometOperatorSerde[WindowExec] {

Expand Down Expand Up @@ -83,6 +86,178 @@ object CometWindow extends CometOperatorSerde[WindowExec] {

}

def windowExprToProto(
windowExpr: WindowExpression,
output: Seq[Attribute],
conf: SQLConf): Option[OperatorOuterClass.WindowExpr] = {

val aggregateExpressions: Array[AggregateExpression] = windowExpr.flatMap { expr =>
expr match {
case agg: AggregateExpression =>
agg.aggregateFunction match {
case _: Count =>
Some(agg)
case min: Min =>
if (AggSerde.minMaxDataTypeSupported(min.dataType)) {
Some(agg)
} else {
withInfo(windowExpr, s"datatype ${min.dataType} is not supported", expr)
None
}
case max: Max =>
if (AggSerde.minMaxDataTypeSupported(max.dataType)) {
Some(agg)
} else {
withInfo(windowExpr, s"datatype ${max.dataType} is not supported", expr)
None
}
case s: Sum =>
if (AggSerde.sumDataTypeSupported(s.dataType) && !s.dataType
.isInstanceOf[DecimalType]) {
Some(agg)
} else {
withInfo(windowExpr, s"datatype ${s.dataType} is not supported", expr)
None
}
case _ =>
withInfo(
windowExpr,
s"aggregate ${agg.aggregateFunction}" +
" is not supported for window function",
expr)
None
}
case _ =>
None
}
}.toArray

val (aggExpr, builtinFunc) = if (aggregateExpressions.nonEmpty) {
val modes = aggregateExpressions.map(_.mode).distinct
assert(modes.size == 1 && modes.head == Complete)
(aggExprToProto(aggregateExpressions.head, output, true, conf), None)
} else {
(None, exprToProto(windowExpr.windowFunction, output))
}

if (aggExpr.isEmpty && builtinFunc.isEmpty) {
return None
}

val f = windowExpr.windowSpec.frameSpecification

val (frameType, lowerBound, upperBound) = f match {
case SpecifiedWindowFrame(frameType, lBound, uBound) =>
val frameProto = frameType match {
case RowFrame => OperatorOuterClass.WindowFrameType.Rows
case RangeFrame => OperatorOuterClass.WindowFrameType.Range
}

val lBoundProto = lBound match {
case UnboundedPreceding =>
OperatorOuterClass.LowerWindowFrameBound
.newBuilder()
.setUnboundedPreceding(OperatorOuterClass.UnboundedPreceding.newBuilder().build())
.build()
case CurrentRow =>
OperatorOuterClass.LowerWindowFrameBound
.newBuilder()
.setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build())
.build()
case e if frameType == RowFrame =>
val offset = e.eval() match {
case i: Integer => i.toLong
case l: Long => l
case _ => return None
}
OperatorOuterClass.LowerWindowFrameBound
.newBuilder()
.setPreceding(
OperatorOuterClass.Preceding
.newBuilder()
.setOffset(offset)
.build())
.build()
case _ =>
// TODO add support for numeric and temporal RANGE BETWEEN expressions
// see https://github.com/apache/datafusion-comet/issues/1246
return None
}

val uBoundProto = uBound match {
case UnboundedFollowing =>
OperatorOuterClass.UpperWindowFrameBound
.newBuilder()
.setUnboundedFollowing(OperatorOuterClass.UnboundedFollowing.newBuilder().build())
.build()
case CurrentRow =>
OperatorOuterClass.UpperWindowFrameBound
.newBuilder()
.setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build())
.build()
case e if frameType == RowFrame =>
val offset = e.eval() match {
case i: Integer => i.toLong
case l: Long => l
case _ => return None
}
OperatorOuterClass.UpperWindowFrameBound
.newBuilder()
.setFollowing(
OperatorOuterClass.Following
.newBuilder()
.setOffset(offset)
.build())
.build()
case _ =>
// TODO add support for numeric and temporal RANGE BETWEEN expressions
// see https://github.com/apache/datafusion-comet/issues/1246
return None
}

(frameProto, lBoundProto, uBoundProto)
case _ =>
(
OperatorOuterClass.WindowFrameType.Rows,
OperatorOuterClass.LowerWindowFrameBound
.newBuilder()
.setUnboundedPreceding(OperatorOuterClass.UnboundedPreceding.newBuilder().build())
.build(),
OperatorOuterClass.UpperWindowFrameBound
.newBuilder()
.setUnboundedFollowing(OperatorOuterClass.UnboundedFollowing.newBuilder().build())
.build())
}

val frame = OperatorOuterClass.WindowFrame
.newBuilder()
.setFrameType(frameType)
.setLowerBound(lowerBound)
.setUpperBound(upperBound)
.build()

val spec =
OperatorOuterClass.WindowSpecDefinition.newBuilder().setFrameSpecification(frame).build()

if (builtinFunc.isDefined) {
Some(
OperatorOuterClass.WindowExpr
.newBuilder()
.setBuiltInWindowFunction(builtinFunc.get)
.setSpec(spec)
.build())
} else if (aggExpr.isDefined) {
Some(
OperatorOuterClass.WindowExpr
.newBuilder()
.setAggFunc(aggExpr.get)
.setSpec(spec)
.build())
} else {
None
}
}

private def validatePartitionAndSortSpecsForWindowFunc(
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
Expand Down
Loading
Loading