-
Notifications
You must be signed in to change notification settings - Fork 278
fix: Fallback to Spark for lpad/rpad for unsupported arguments & fix negative length handling #2630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
e365ac3
10c960e
0ebd143
3ffa0df
fccb7e6
aeca95b
cb536a8
432f68b
3bda19c
7adfff4
fb3cdeb
d8ac26a
e1f7849
9951885
ed3648e
d0efb26
ca151c2
40130f4
6a7f4eb
1ce35d0
af0bb81
27cc67e
170b700
a265595
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -162,6 +162,16 @@ object CometRLike extends CometExpressionSerde[RLike] { | |
|
|
||
| object CometStringRPad extends CometExpressionSerde[StringRPad] { | ||
|
|
||
| override def getSupportLevel(expr: StringRPad): SupportLevel = { | ||
| if (expr.str.isInstanceOf[Literal]) { | ||
| return Unsupported(Some("Scalar values are not supported for the str argument")) | ||
| } | ||
| if (!expr.pad.isInstanceOf[Literal]) { | ||
| return Unsupported(Some("Only scalar values are supported for the pad argument")) | ||
| } | ||
| Compatible() | ||
| } | ||
|
|
||
| override def convert( | ||
| expr: StringRPad, | ||
| inputs: Seq[Attribute], | ||
|
|
@@ -177,21 +187,16 @@ object CometStringRPad extends CometExpressionSerde[StringRPad] { | |
|
|
||
| object CometStringLPad extends CometExpressionSerde[StringLPad] { | ||
|
|
||
| /** | ||
| * Convert a Spark expression into a protocol buffer representation that can be passed into | ||
| * native code. | ||
| * | ||
| * @param expr | ||
| * The Spark expression. | ||
| * @param inputs | ||
| * The input attributes. | ||
| * @param binding | ||
| * Whether the attributes are bound (this is only relevant in aggregate expressions). | ||
| * @return | ||
| * Protocol buffer representation, or None if the expression could not be converted. In this | ||
| * case it is expected that the input expression will have been tagged with reasons why it | ||
| * could not be converted. | ||
| */ | ||
| override def getSupportLevel(expr: StringLPad): SupportLevel = { | ||
| if (expr.str.isInstanceOf[Literal]) { | ||
| return Unsupported(Some("Scalar values are not supported for the str argument")) | ||
| } | ||
| if (!expr.pad.isInstanceOf[Literal]) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know if we'll ever hit this. As far as I can see (in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spark doesn't require pad to be a literal:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose this is a good example of the benefit of fuzz testing (which is how this issue was discovered). The fuzzer will generate test cases that most developers would not consider. It does seem unlikely that anyone would want to use a column for the pad value, but I suppose it is possible that someone may have that requirement. |
||
| return Unsupported(Some("Only scalar values are supported for the pad argument")) | ||
| } | ||
| Compatible() | ||
| } | ||
|
|
||
| override def convert( | ||
| expr: StringLPad, | ||
| inputs: Seq[Attribute], | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,12 +19,81 @@ | |
|
|
||
| package org.apache.comet | ||
|
|
||
| import scala.util.Random | ||
|
|
||
| import org.apache.parquet.hadoop.ParquetOutputFormat | ||
| import org.apache.spark.sql.{CometTestBase, DataFrame} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.{DataTypes, StructField, StructType} | ||
|
|
||
| import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} | ||
|
|
||
| class CometStringExpressionSuite extends CometTestBase { | ||
|
|
||
| test("lpad") { | ||
| testPadding("lpad") | ||
| } | ||
|
|
||
| test("rpad") { | ||
| testPadding("rpad") | ||
| } | ||
|
|
||
| private def testPadding(expr: String): Unit = { | ||
| val r = new Random(42) | ||
| val schema = StructType( | ||
| Seq( | ||
| StructField("str", DataTypes.StringType, nullable = true), | ||
| StructField("len", DataTypes.IntegerType, nullable = true), | ||
| StructField("pad", DataTypes.StringType, nullable = true))) | ||
| // scalastyle:off | ||
| val edgeCases = Seq( | ||
| "é", // unicode 'e\\u{301}' | ||
| "é", // unicode '\\u{e9}' | ||
| "తెలుగు") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity, what makes this an edge case?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first two were added in #772 to make sure Comet was consistent with Spark even though Rust and Java have different ways of representing unicode and graphemes. |
||
| // scalastyle:on | ||
| val df = FuzzDataGenerator.generateDataFrame( | ||
| r, | ||
| spark, | ||
| schema, | ||
| 1000, | ||
| DataGenOptions(maxStringLength = 6, customStrings = edgeCases)) | ||
| df.createOrReplaceTempView("t1") | ||
|
|
||
| // test all combinations of scalar and array arguments | ||
| for (str <- Seq("'hello'", "str")) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spark doc says it also supports binary string input: e.g
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, thanks. That opens up another set of issues! 😭
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added separate tests for binary inputs |
||
| for (len <- Seq("6", "-6", "0", "len % 10")) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| for (pad <- Seq(Some("'x'"), Some("'zzz'"), Some("pad"), None)) { | ||
| val sql = pad match { | ||
| case Some(p) => | ||
| // 3 args | ||
| s"SELECT $str, $len, $expr($str, $len, $p) FROM t1 ORDER BY str, len, pad" | ||
| case _ => | ||
| // 2 args (default pad of ' ') | ||
| s"SELECT $str, $len, $expr($str, $len) FROM t1 ORDER BY str, len, pad" | ||
| } | ||
| val isLiteralStr = str == "'hello'" | ||
| val isLiteralLen = !len.contains("len") | ||
| val isLiteralPad = !pad.contains("pad") | ||
| if (isLiteralStr && isLiteralLen && isLiteralPad) { | ||
| // all arguments are literal, so Spark constant folding will kick in | ||
| // and pad function will not be evaluated by Comet | ||
| checkSparkAnswer(sql) | ||
| } else if (isLiteralStr) { | ||
| checkSparkAnswerAndFallbackReason( | ||
| sql, | ||
| "Scalar values are not supported for the str argument") | ||
| } else if (!isLiteralPad) { | ||
| checkSparkAnswerAndFallbackReason( | ||
| sql, | ||
| "Only scalar values are supported for the pad argument") | ||
| } else { | ||
| checkSparkAnswerAndOperator(sql) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("Various String scalar functions") { | ||
| val table = "names" | ||
| withTable(table) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its better to put happy path first in
ifstmt for compute intensive parts, so CPU won't have to execute eagerly instructions and then fall it backThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I have updated this.