Skip to content

Commit b6b5b4c

Browse files
authored
chore: Refactor remaining predicate expression serde (#2265)
1 parent 62b3c91 commit b6b5b4c

File tree

2 files changed

+102
-76
lines changed

2 files changed

+102
-76
lines changed

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 5 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ object QueryPlanSerde extends Logging with CometExprShim {
142142
classOf[MapValues] -> CometMapValues,
143143
classOf[MapFromArrays] -> CometMapFromArrays,
144144
classOf[GetMapValue] -> CometMapExtract,
145+
classOf[EqualTo] -> CometEqualTo,
146+
classOf[EqualNullSafe] -> CometEqualNullSafe,
147+
classOf[Not] -> CometNot,
148+
classOf[And] -> CometAnd,
149+
classOf[Or] -> CometOr,
145150
classOf[GreaterThan] -> CometGreaterThan,
146151
classOf[GreaterThanOrEqual] -> CometGreaterThanOrEqual,
147152
classOf[LessThan] -> CometLessThan,
@@ -716,42 +721,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
716721
case c @ Cast(child, dt, timeZoneId, _) =>
717722
handleCast(expr, child, inputs, binding, dt, timeZoneId, evalMode(c))
718723

719-
case EqualTo(left, right) =>
720-
createBinaryExpr(
721-
expr,
722-
left,
723-
right,
724-
inputs,
725-
binding,
726-
(builder, binaryExpr) => builder.setEq(binaryExpr))
727-
728-
case Not(EqualTo(left, right)) =>
729-
createBinaryExpr(
730-
expr,
731-
left,
732-
right,
733-
inputs,
734-
binding,
735-
(builder, binaryExpr) => builder.setNeq(binaryExpr))
736-
737-
case EqualNullSafe(left, right) =>
738-
createBinaryExpr(
739-
expr,
740-
left,
741-
right,
742-
inputs,
743-
binding,
744-
(builder, binaryExpr) => builder.setEqNullSafe(binaryExpr))
745-
746-
case Not(EqualNullSafe(left, right)) =>
747-
createBinaryExpr(
748-
expr,
749-
left,
750-
right,
751-
inputs,
752-
binding,
753-
(builder, binaryExpr) => builder.setNeqNullSafe(binaryExpr))
754-
755724
case Literal(value, dataType)
756725
if supportedDataType(
757726
dataType,
@@ -956,24 +925,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
956925
None
957926
}
958927

959-
case And(left, right) =>
960-
createBinaryExpr(
961-
expr,
962-
left,
963-
right,
964-
inputs,
965-
binding,
966-
(builder, binaryExpr) => builder.setAnd(binaryExpr))
967-
968-
case Or(left, right) =>
969-
createBinaryExpr(
970-
expr,
971-
left,
972-
right,
973-
inputs,
974-
binding,
975-
(builder, binaryExpr) => builder.setOr(binaryExpr))
976-
977928
case UnaryExpression(child) if expr.prettyName == "promote_precision" =>
978929
// `UnaryExpression` includes `PromotePrecision` for Spark 3.3
979930
// `PromotePrecision` is just a wrapper, don't need to serialize it.
@@ -1163,17 +1114,6 @@ object QueryPlanSerde extends Logging with CometExprShim {
11631114
None
11641115
}
11651116

1166-
case n @ Not(In(_, _)) =>
1167-
CometNotIn.convert(n, inputs, binding)
1168-
1169-
case Not(child) =>
1170-
createUnaryExpr(
1171-
expr,
1172-
child,
1173-
inputs,
1174-
binding,
1175-
(builder, unaryExpr) => builder.setNot(unaryExpr))
1176-
11771117
case UnaryMinus(child, failOnError) =>
11781118
val childExpr = exprToProtoInternal(child, inputs, binding)
11791119
if (childExpr.isDefined) {

spark/src/main/scala/org/apache/comet/serde/comparisons.scala renamed to spark/src/main/scala/org/apache/comet/serde/predicates.scala

Lines changed: 97 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,109 @@ package org.apache.comet.serde
2121

2222
import scala.collection.JavaConverters._
2323

24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GreaterThan, GreaterThanOrEqual, In, InSet, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not}
24+
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, InSet, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or}
2525
import org.apache.spark.sql.types.BooleanType
2626

2727
import org.apache.comet.CometSparkSessionExtensions.withInfo
2828
import org.apache.comet.serde.ExprOuterClass.Expr
2929
import org.apache.comet.serde.QueryPlanSerde._
3030

31+
object CometNot extends CometExpressionSerde[Not] {
32+
override def convert(
33+
expr: Not,
34+
inputs: Seq[Attribute],
35+
binding: Boolean): Option[ExprOuterClass.Expr] = {
36+
37+
expr.child match {
38+
case expr: EqualTo =>
39+
createBinaryExpr(
40+
expr,
41+
expr.left,
42+
expr.right,
43+
inputs,
44+
binding,
45+
(builder, binaryExpr) => builder.setNeq(binaryExpr))
46+
case expr: EqualNullSafe =>
47+
createBinaryExpr(
48+
expr,
49+
expr.left,
50+
expr.right,
51+
inputs,
52+
binding,
53+
(builder, binaryExpr) => builder.setNeqNullSafe(binaryExpr))
54+
case expr: In =>
55+
ComparisonUtils.in(expr, expr.value, expr.list, inputs, binding, negate = true)
56+
case _ =>
57+
createUnaryExpr(
58+
expr,
59+
expr.child,
60+
inputs,
61+
binding,
62+
(builder, unaryExpr) => builder.setNot(unaryExpr))
63+
}
64+
}
65+
}
66+
67+
object CometAnd extends CometExpressionSerde[And] {
68+
override def convert(
69+
expr: And,
70+
inputs: Seq[Attribute],
71+
binding: Boolean): Option[ExprOuterClass.Expr] = {
72+
createBinaryExpr(
73+
expr,
74+
expr.left,
75+
expr.right,
76+
inputs,
77+
binding,
78+
(builder, binaryExpr) => builder.setAnd(binaryExpr))
79+
}
80+
}
81+
82+
object CometOr extends CometExpressionSerde[Or] {
83+
override def convert(
84+
expr: Or,
85+
inputs: Seq[Attribute],
86+
binding: Boolean): Option[ExprOuterClass.Expr] = {
87+
createBinaryExpr(
88+
expr,
89+
expr.left,
90+
expr.right,
91+
inputs,
92+
binding,
93+
(builder, binaryExpr) => builder.setOr(binaryExpr))
94+
}
95+
}
96+
97+
object CometEqualTo extends CometExpressionSerde[EqualTo] {
98+
override def convert(
99+
expr: EqualTo,
100+
inputs: Seq[Attribute],
101+
binding: Boolean): Option[ExprOuterClass.Expr] = {
102+
createBinaryExpr(
103+
expr,
104+
expr.left,
105+
expr.right,
106+
inputs,
107+
binding,
108+
(builder, binaryExpr) => builder.setEq(binaryExpr))
109+
}
110+
}
111+
112+
object CometEqualNullSafe extends CometExpressionSerde[EqualNullSafe] {
113+
override def convert(
114+
expr: EqualNullSafe,
115+
inputs: Seq[Attribute],
116+
binding: Boolean): Option[ExprOuterClass.Expr] = {
117+
createBinaryExpr(
118+
expr,
119+
expr.left,
120+
expr.right,
121+
inputs,
122+
binding,
123+
(builder, binaryExpr) => builder.setEqNullSafe(binaryExpr))
124+
}
125+
}
126+
31127
object CometGreaterThan extends CometExpressionSerde[GreaterThan] {
32128
override def convert(
33129
expr: GreaterThan,
@@ -137,16 +233,6 @@ object CometIn extends CometExpressionSerde[In] {
137233
}
138234
}
139235

140-
object CometNotIn extends CometExpressionSerde[Not] {
141-
override def convert(
142-
expr: Not,
143-
inputs: Seq[Attribute],
144-
binding: Boolean): Option[ExprOuterClass.Expr] = {
145-
val inExpr = expr.child.asInstanceOf[In]
146-
ComparisonUtils.in(expr, inExpr.value, inExpr.list, inputs, binding, negate = true)
147-
}
148-
}
149-
150236
object CometInSet extends CometExpressionSerde[InSet] {
151237
override def convert(
152238
expr: InSet,

0 commit comments

Comments
 (0)