Skip to content

Commit 2f58fac

Browse files
authored
feat: Supports array_union (#1945)
1 parent 06ed88b commit 2f58fac

File tree

5 files changed

+64
-29
lines changed

5 files changed

+64
-29
lines changed

docs/source/user-guide/expressions.md

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -186,22 +186,23 @@ The following Spark expressions are currently available. Any known compatibility
186186

187187
## Arrays
188188

189-
| Expression | Notes |
190-
|----------------|----------------------------------------------------------------------------------------------------------------------------------------|
191-
| ArrayAppend | Experimental |
192-
| ArrayCompact | Experimental |
193-
| ArrayContains | Experimental |
194-
| ArrayDistinct | Experimental: behaves differently than spark. Datafusion first sorts then removes duplicates while spark preserves the original order. |
195-
| ArrayExcept | Experimental |
196-
| ArrayInsert | Experimental |
197-
| ArrayIntersect | Experimental |
198-
| ArrayJoin | Experimental |
199-
| ArrayMax | Experimental |
200-
| ArrayRemove | |
201-
| ArrayRepeat | Experimental |
202-
| ArraysOverlap | Experimental |
203-
| ElementAt | Arrays only |
204-
| GetArrayItem | |
189+
| Expression | Notes |
190+
|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
191+
| ArrayAppend | Experimental |
192+
| ArrayCompact | Experimental |
193+
| ArrayContains | Experimental |
194+
| ArrayDistinct | Experimental: behaves differently than spark. Datafusion first sorts then removes duplicates while spark preserves the original order. |
195+
| ArrayExcept | Experimental |
196+
| ArrayInsert | Experimental |
197+
| ArrayIntersect | Experimental |
198+
| ArrayJoin | Experimental |
199+
| ArrayMax | Experimental |
200+
| ArrayRemove | |
201+
| ArrayRepeat | Experimental |
202+
| ArraysOverlap | Experimental |
203+
| ArrayUnion | Experimental: behaves differently than spark. Datafusion sorts the input arrays before performing the union, while spark preserves the order of the first array and appends unique elements from the second. |
204+
| ElementAt | Arrays only |
205+
| GetArrayItem | |
205206

206207
## Structs
207208

docs/spark_expressions_support.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,21 +81,21 @@
8181

8282
### array_funcs
8383
- [x] array
84-
- [ ] array_append
85-
- [ ] array_compact
86-
- [ ] array_contains
87-
- [ ] array_distinct
88-
- [ ] array_except
89-
- [ ] array_insert
90-
- [ ] array_intersect
91-
- [ ] array_join
92-
- [ ] array_max
84+
- [x] array_append
85+
- [x] array_compact
86+
- [x] array_contains
87+
- [x] array_distinct
88+
- [x] array_except
89+
- [x] array_insert
90+
- [x] array_intersect
91+
- [x] array_join
92+
- [x] array_max
9393
- [ ] array_min
9494
- [ ] array_position
95-
- [ ] array_remove
96-
- [ ] array_repeat
97-
- [ ] array_union
98-
- [ ] arrays_overlap
95+
- [x] array_remove
96+
- [x] array_repeat
97+
- [x] array_union
98+
- [x] arrays_overlap
9999
- [ ] arrays_zip
100100
- [x] element_at
101101
- [ ] flatten

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
7878
classOf[ArrayRemove] -> CometArrayRemove,
7979
classOf[ArrayRepeat] -> CometArrayRepeat,
8080
classOf[ArraysOverlap] -> CometArraysOverlap,
81+
classOf[ArrayUnion] -> CometArrayUnion,
8182
classOf[Ascii] -> CometAscii,
8283
classOf[ConcatWs] -> CometConcatWs,
8384
classOf[Chr] -> CometChr,

spark/src/main/scala/org/apache/comet/serde/arrays.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,3 +347,17 @@ object CometArrayInsert extends CometExpressionSerde with IncompatExpr {
347347
}
348348
}
349349
}
350+
351+
object CometArrayUnion extends CometExpressionSerde with IncompatExpr {
352+
override def convert(
353+
expr: Expression,
354+
inputs: Seq[Attribute],
355+
binding: Boolean): Option[ExprOuterClass.Expr] = {
356+
val leftArrayExprProto = exprToProto(expr.children.head, inputs, binding)
357+
val rightArrayExprProto = exprToProto(expr.children(1), inputs, binding)
358+
359+
val arraysUnionScalarExpr =
360+
scalarFunctionExprToProto("array_union", leftArrayExprProto, rightArrayExprProto)
361+
optExprWithInfo(arraysUnionScalarExpr, expr, expr.children: _*)
362+
}
363+
}

spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,25 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
259259
}
260260
}
261261

262+
test("array_union") {
263+
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
264+
Seq(true, false).foreach { dictionaryEnabled =>
265+
withTempDir { dir =>
266+
val path = new Path(dir.toURI.toString, "test.parquet")
267+
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, n = 10000)
268+
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
269+
checkSparkAnswerAndOperator(
270+
spark.sql("SELECT array_union(array(_2, _3, _4), array(_3, _4)) FROM t1"))
271+
checkSparkAnswerAndOperator(sql("SELECT array_union(array(_18), array(_19)) from t1"))
272+
checkSparkAnswerAndOperator(spark.sql(
273+
"SELECT array_union(array(CAST(NULL AS INT), _2, _3, _4), array(CAST(NULL AS INT), _2, _3)) FROM t1"))
274+
checkSparkAnswerAndOperator(spark.sql(
275+
"SELECT array_union(array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _3, _4), array(CAST(NULL AS INT), CAST(NULL AS INT), _2, _3)) FROM t1"))
276+
}
277+
}
278+
}
279+
}
280+
262281
test("array_max") {
263282
Seq(true, false).foreach { dictionaryEnabled =>
264283
withTempDir { dir =>

0 commit comments

Comments
 (0)