Skip to content

Commit 6d5bdbe

Browse files
authored
chore: Refactor serde for ArrayCompact and ArrayFilter (#2536)
1 parent cade655 commit 6d5bdbe

File tree

3 files changed

+24
-7
lines changed

3 files changed

+24
-7
lines changed

docs/source/user-guide/latest/expressions.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ incompatible expressions.
6161
## String Functions
6262

6363
| Expression | Spark-Compatible? | Compatibility Notes |
64-
|-----------------| ----------------- | ---------------------------------------------------------------------------------------------------------- |
64+
| --------------- | ----------------- | ---------------------------------------------------------------------------------------------------------- |
6565
| Ascii | Yes | |
6666
| BitLength | Yes | |
6767
| Chr | Yes | |
@@ -93,7 +93,7 @@ incompatible expressions.
9393
## Date/Time Functions
9494

9595
| Expression | SQL | Spark-Compatible? | Compatibility Notes |
96-
|----------------|------------------------------| ----------------- |----------------------------------------------------------------------------------------------------------------------|
96+
| -------------- | ---------------------------- | ----------------- | -------------------------------------------------------------------------------------------------------------------- |
9797
| DateAdd | `date_add` | Yes | |
9898
| DateSub | `date_sub` | Yes | |
9999
| DatePart | `date_part(field, source)` | Yes | Supported values of `field`: `year`/`month`/`week`/`day`/`dayofweek`/`dayofweek_iso`/`doy`/`quarter`/`hour`/`minute` |
@@ -116,7 +116,7 @@ incompatible expressions.
116116
## Math Expressions
117117

118118
| Expression | SQL | Spark-Compatible? | Compatibility Notes |
119-
|----------------|-----------|-------------------|-----------------------------------|
119+
| -------------- | --------- | ----------------- | --------------------------------- |
120120
| Acos | `acos` | Yes | |
121121
| Add | `+` | Yes | |
122122
| Asin | `asin` | Yes | |
@@ -208,6 +208,7 @@ incompatible expressions.
208208
| ArrayContains | Yes | |
209209
| ArrayDistinct | No | Behaves differently than spark. Comet first sorts then removes duplicates while Spark preserves the original order. |
210210
| ArrayExcept | No | |
211+
| ArrayFilter | Yes | Only supports case where function is `IsNotNull` |
211212
| ArrayInsert | No | |
212213
| ArrayIntersect | No | |
213214
| ArrayJoin | No | |

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,11 @@ object QueryPlanSerde extends Logging with CometExprShim {
6969

7070
private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
7171
classOf[ArrayAppend] -> CometArrayAppend,
72-
// TODO ArrayCompact
72+
classOf[ArrayCompact] -> CometArrayCompact,
7373
classOf[ArrayContains] -> CometArrayContains,
7474
classOf[ArrayDistinct] -> CometArrayDistinct,
7575
classOf[ArrayExcept] -> CometArrayExcept,
76+
classOf[ArrayFilter] -> CometArrayFilter,
7677
classOf[ArrayInsert] -> CometArrayInsert,
7778
classOf[ArrayIntersect] -> CometArrayIntersect,
7879
classOf[ArrayJoin] -> CometArrayJoin,
@@ -911,8 +912,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
911912
withInfo(expr, bloomFilter, value)
912913
None
913914
}
914-
case af @ ArrayFilter(_, func) if func.children.head.isInstanceOf[IsNotNull] =>
915-
convert(af, CometArrayCompact)
916915
case l @ Length(child) if child.dataType == BinaryType =>
917916
withInfo(l, "Length on BinaryType is not supported")
918917
None

spark/src/main/scala/org/apache/comet/serde/arrays.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.comet.serde
2121

2222
import scala.annotation.tailrec
2323

24-
import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, Literal, Reverse}
24+
import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse}
2525
import org.apache.spark.sql.internal.SQLConf
2626
import org.apache.spark.sql.types._
2727

@@ -505,6 +505,23 @@ object CometFlatten extends CometExpressionSerde[Flatten] with ArraysBase {
505505
}
506506
}
507507

508+
object CometArrayFilter extends CometExpressionSerde[ArrayFilter] {
509+
510+
override def getSupportLevel(expr: ArrayFilter): SupportLevel = {
511+
expr.function.children.headOption match {
512+
case Some(_: IsNotNull) => Compatible()
513+
case _ => Unsupported()
514+
}
515+
}
516+
517+
override def convert(
518+
expr: ArrayFilter,
519+
inputs: Seq[Attribute],
520+
binding: Boolean): Option[ExprOuterClass.Expr] = {
521+
CometArrayCompact.convert(expr, inputs, binding)
522+
}
523+
}
524+
508525
trait ArraysBase {
509526

510527
def isTypeSupported(dt: DataType): Boolean = {

0 commit comments

Comments
 (0)