-
Notifications
You must be signed in to change notification settings - Fork 278
chore: Finish refactoring expression serde out of QueryPlanSerde
#2791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
6ec8b1f
051cbc1
a88d16f
97c42c2
9adfadd
84d131d
ab0b737
3b8b693
6431ead
0858b89
63e4b49
036dd87
4a711ee
13dcb0c
f81663f
550640a
0e64efc
762f9a8
ccc2b3c
12c9bd6
1fc6cfc
7a47b9d
52c1272
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.comet.serde | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.execution.ScalarSubquery | ||
|
|
||
| import org.apache.comet.CometSparkSessionExtensions.withInfo | ||
| import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataType} | ||
|
|
||
| object CometScalarSubquery extends CometExpressionSerde[ScalarSubquery] { | ||
| override def convert( | ||
| expr: ScalarSubquery, | ||
| inputs: Seq[Attribute], | ||
| binding: Boolean): Option[ExprOuterClass.Expr] = { | ||
| if (supportedDataType(expr.dataType)) { | ||
| val dataType = serializeDataType(expr.dataType) | ||
| if (dataType.isEmpty) { | ||
| withInfo(expr, s"Scalar subquery returns unsupported datatype ${expr.dataType}") | ||
| return None | ||
| } | ||
|
|
||
| val builder = ExprOuterClass.Subquery | ||
| .newBuilder() | ||
| .setId(expr.exprId.id) | ||
| .setDatatype(dataType.get) | ||
| Some(ExprOuterClass.Expr.newBuilder().setSubquery(builder).build()) | ||
| } else { | ||
| withInfo(expr, s"Unsupported data type: ${expr.dataType}") | ||
| None | ||
| } | ||
|
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.comet.serde | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, BloomFilterMightContain, KnownFloatingPointNormalized, MakeDecimal, UnscaledValue} | ||
| import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero | ||
| import org.apache.spark.sql.types.{DecimalType, LongType} | ||
|
|
||
| import org.apache.comet.CometSparkSessionExtensions.withInfo | ||
| import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProtoWithReturnType, serializeDataType} | ||
|
|
||
| object CometUnscaledValue extends CometExpressionSerde[UnscaledValue] { | ||
| override def convert( | ||
| expr: UnscaledValue, | ||
| inputs: Seq[Attribute], | ||
| binding: Boolean): Option[ExprOuterClass.Expr] = { | ||
| val childExpr = exprToProtoInternal(expr.child, inputs, binding) | ||
| val optExpr = | ||
| scalarFunctionExprToProtoWithReturnType("unscaled_value", LongType, false, childExpr) | ||
| optExprWithInfo(optExpr, expr, expr.child) | ||
|
|
||
| } | ||
| } | ||
| object CometKnownFloatingPointNormalized | ||
| extends CometExpressionSerde[KnownFloatingPointNormalized] { | ||
|
|
||
| override def getSupportLevel(expr: KnownFloatingPointNormalized): SupportLevel = { | ||
| expr.child match { | ||
| case _: NormalizeNaNAndZero => Compatible() | ||
| case _ => Unsupported() | ||
andygrove marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| override def convert( | ||
| expr: KnownFloatingPointNormalized, | ||
| inputs: Seq[Attribute], | ||
| binding: Boolean): Option[ExprOuterClass.Expr] = { | ||
|
|
||
| val wrapped = expr.child.asInstanceOf[NormalizeNaNAndZero].child | ||
|
|
||
| val dataType = serializeDataType(wrapped.dataType) | ||
| if (dataType.isEmpty) { | ||
| withInfo(wrapped, s"Unsupported datatype ${wrapped.dataType}") | ||
| return None | ||
| } | ||
| val ex = exprToProtoInternal(wrapped, inputs, binding) | ||
| ex.map { child => | ||
andygrove marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val builder = ExprOuterClass.NormalizeNaNAndZero | ||
| .newBuilder() | ||
| .setChild(child) | ||
| .setDatatype(dataType.get) | ||
| ExprOuterClass.Expr.newBuilder().setNormalizeNanAndZero(builder).build() | ||
| } | ||
andygrove marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| object CometMakeDecimal extends CometExpressionSerde[MakeDecimal] { | ||
| override def convert( | ||
| expr: MakeDecimal, | ||
| inputs: Seq[Attribute], | ||
| binding: Boolean): Option[ExprOuterClass.Expr] = { | ||
| val childExpr = exprToProtoInternal(expr.child, inputs, binding) | ||
| val optExpr = scalarFunctionExprToProtoWithReturnType( | ||
| "make_decimal", | ||
| DecimalType(expr.precision, expr.scale), | ||
| failOnError = !expr.nullOnOverflow, | ||
| childExpr) | ||
| optExprWithInfo(optExpr, expr, expr.child) | ||
|
|
||
| } | ||
| } | ||
|
|
||
| object CometBloomFilterMightContain extends CometExpressionSerde[BloomFilterMightContain] { | ||
|
|
||
| override def convert( | ||
| expr: BloomFilterMightContain, | ||
| inputs: Seq[Attribute], | ||
| binding: Boolean): Option[ExprOuterClass.Expr] = { | ||
|
|
||
| val bloomFilter = expr.left | ||
| val value = expr.right | ||
| val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs, binding) | ||
| val valueExpr = exprToProtoInternal(value, inputs, binding) | ||
| if (bloomFilterExpr.isDefined && valueExpr.isDefined) { | ||
| val builder = ExprOuterClass.BloomFilterMightContain.newBuilder() | ||
| builder.setBloomFilter(bloomFilterExpr.get) | ||
| builder.setValue(valueExpr.get) | ||
| Some( | ||
| ExprOuterClass.Expr | ||
| .newBuilder() | ||
| .setBloomFilterMightContain(builder) | ||
| .build()) | ||
| } else { | ||
| withInfo(expr, bloomFilter, value) | ||
| None | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,15 +21,34 @@ package org.apache.comet.serde | |
|
|
||
| import java.util.Locale | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Expression, InitCap, Length, Like, Literal, Lower, RegExpReplace, RLike, StringLPad, StringRepeat, StringRPad, Substring, Upper} | ||
| import org.apache.spark.sql.types.{BinaryType, DataTypes, LongType, StringType} | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Expression, InitCap, Length, Like, Literal, Lower, RegExpReplace, Reverse, RLike, StringLPad, StringRepeat, StringRPad, Substring, Upper} | ||
| import org.apache.spark.sql.types.{ArrayType, BinaryType, DataTypes, LongType, StringType} | ||
|
|
||
| import org.apache.comet.CometConf | ||
| import org.apache.comet.CometSparkSessionExtensions.withInfo | ||
| import org.apache.comet.expressions.{CometCast, CometEvalMode, RegExp} | ||
| import org.apache.comet.serde.ExprOuterClass.Expr | ||
| import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} | ||
|
|
||
| object CometReverse extends CometScalarFunction[Reverse]("reverse") { | ||
|
||
|
|
||
| override def getSupportLevel(expr: Reverse): SupportLevel = { | ||
| if (expr.child.dataType.isInstanceOf[ArrayType]) { | ||
| CometArrayReverse.getSupportLevel(expr) | ||
| } else { | ||
| Compatible() | ||
| } | ||
| } | ||
|
|
||
| override def convert(expr: Reverse, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { | ||
| if (expr.child.dataType.isInstanceOf[ArrayType]) { | ||
| CometArrayReverse.convert(expr, inputs, binding) | ||
| } else { | ||
| super.convert(expr, inputs, binding) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| object CometStringRepeat extends CometExpressionSerde[StringRepeat] { | ||
|
|
||
| override def convert( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.