Skip to content

Commit 4353f3a

Browse files
committed
[WIP] support PartialMerge
1 parent 8bfb110 commit 4353f3a

File tree

2 files changed

+48
-1
lines changed

2 files changed

+48
-1
lines changed

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.broadcast.Broadcast
3131
import org.apache.spark.rdd.RDD
3232
import org.apache.spark.sql.catalyst.InternalRow
3333
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder}
34-
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateMode, Complete, Final, Partial, PartialMerge}
34+
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateMode, Final, Partial, PartialMerge}
3535
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
3636
import org.apache.spark.sql.catalyst.plans._
3737
import org.apache.spark.sql.catalyst.plans.physical._

spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,53 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
129129
}
130130
}
131131

132+
test("partialMerge - cnt distinct + sum") {
133+
withTempDir(dir => {
134+
withSQLConf("spark.comet.enabled" -> "false") {
135+
sql("""
136+
CREATE OR REPLACE TEMP VIEW t (v, v1, i) AS
137+
VALUES
138+
('c', 'a', 1),
139+
('c1', 'a1', 1),
140+
('c2', 'a2', 2),
141+
('c3', 'a3', 2),
142+
('c4', 'a4', 2),
143+
('c', 'a', 1),
144+
('c1', 'a1', 1),
145+
('c2', 'a2', 2),
146+
('c3', 'a3', 2),
147+
('c4', 'a4', 2),
148+
('c', 'a', 1),
149+
('c1', 'a1', 1),
150+
('c2', 'a2', 2),
151+
('c3', 'a3', 2),
152+
('c4', 'a4', 2)
153+
""")
154+
sql("select * from t").repartition(3).write.mode("overwrite").parquet(dir.getAbsolutePath)
155+
}
156+
157+
withSQLConf(
158+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
159+
"spark.comet.exec.shuffle.fallbackToColumnar" -> "false",
160+
"spark.comet.cast.allowIncompatible" -> "true",
161+
"spark.sql.adaptive.enabled" -> "false",
162+
"spark.comet.explain.native.enabled" -> "true",
163+
"spark.comet.enabled" -> "true",
164+
"spark.comet.expression.Cast.allowIncompatible" -> "true",
165+
"spark.comet.exec.shuffle.enableFastEncoding" -> "true",
166+
"spark.comet.exec.shuffle.enabled" -> "true",
167+
"spark.comet.explainFallback.enabled" -> "true",
168+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_iceberg_compat",
169+
"spark.shuffle.manager" -> "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager",
170+
"spark.comet.logFallbackReasons.enabled" -> "true") {
171+
spark.read.parquet(dir.getAbsolutePath).createOrReplaceTempView("t2")
172+
// sql("SELECT count(distinct v, v) FROM t2").explain("formatted")
173+
// sql("SELECT i, sum(v1), count(distinct v) FROM t2 group by i").explain()
174+
checkSparkAnswerAndOperator("SELECT i, sum(v1), count(distinct v) FROM t2 group by i")
175+
}
176+
})
177+
}
178+
132179
test("multiple column distinct count") {
133180
withSQLConf(
134181
CometConf.COMET_ENABLED.key -> "true",

0 commit comments

Comments
 (0)