-
Notifications
You must be signed in to change notification settings - Fork 269
feat: Support reverse function with ArrayType input #2481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| withInfo(l, "Length on BinaryType is not supported") | ||
| None | ||
| case r @ Reverse(child) if child.dataType.isInstanceOf[ArrayType] => | ||
| convert(r, CometArrayReverse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use convert(expr, CometScalarFunction("array_reverse"))?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use
convert(expr, CometScalarFunction("array_reve
Ignore this comment, I'm not sure which way we lean. cc @andygrove
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2481 +/- ##
============================================
+ Coverage 56.12% 58.42% +2.29%
- Complexity 976 1436 +460
============================================
Files 119 146 +27
Lines 11743 13519 +1776
Branches 2251 2352 +101
============================================
+ Hits 6591 7898 +1307
- Misses 4012 4392 +380
- Partials 1140 1229 +89 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| for (fieldName <- fieldNames) { | ||
| sql(s"SELECT $fieldName as a FROM t1") | ||
| .createOrReplaceTempView("t2") | ||
| checkSparkAnswer(sql("SELECT reverse(a) FROM t2")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would the test check strings as well like in
https://spark.apache.org/docs/latest/api/sql/#reverse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CometStringExpressionSuite file contains a unit test for the reverse function with a string input.
datafusion-comet/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala
Lines 166 to 175 in 8295024
| test("length, reverse, instr, replace, translate") { | |
| val table = "test" | |
| withTable(table) { | |
| sql(s"create table $table(col string) using parquet") | |
| sql( | |
| s"insert into $table values('Spark SQL '), (NULL), (''), ('苹果手机'), ('Spark SQL '), (NULL), (''), ('苹果手机')") | |
| checkSparkAnswerAndOperator("select length(col), reverse(col), instr(col, 'SQL'), instr(col, '手机'), replace(col, 'SQL', '123')," + | |
| s" replace(col, 'SQL'), replace(col, '手机', '平板'), translate(col, 'SL苹', '123') from $table") | |
| } | |
| } |
…e.scala Co-authored-by: Oleks V <[email protected]>
|
I think it is unrelated test failed, restarting jobs |
spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
Outdated
Show resolved
Hide resolved
| withTempView("1") { | ||
| val path = new Path(dir.toURI.toString, "test.parquet") | ||
| makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) | ||
| spark.read.parquet(path.toString).createOrReplaceTempView("t1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| spark.read.parquet(path.toString).createOrReplaceTempView("t1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why do we need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, why we need withTempView in this test if view would be created with
spark.read.parquet(path.toString).createOrReplaceTempView("t1");
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply - I was on holiday for Chinese National Day.
This happens because we need to isolate views between tests within the same Suite, similar to what we do with SQLConf withSQLConf(...).
- A view created by the method
createOrReplaceTempViewhas the same lifetime as the Spark session that created the Dataset.
the comments of the `createOrReplaceTempView` method.
/**
* Creates a local temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*/
- Tests within the same
Suiteshare the same session.
You can verify this by following these steps:
- Add the following code to the file
CometArrayExpressionSuiteon themainbranch:test("show temp view") { spark.sql("show tables").show(truncate = false) spark.sql("select * from t1").show }
- Run the suite.
./mvnw test -DargLine="-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Djava.library.path=/path/to/arrow-datafusion-comet/native/target/release" -Dtest=none -Dsuites="org.apache.comet.CometArrayExpressionSuite"
output
Run starting. Expected test count is: 28
CometArrayExpressionSuite:
25/10/02 23:00:22 INFO core/src/lib.rs: Comet native library version 0.11.0 initialized
- array_remove - integer (4 seconds, 229 milliseconds)
- array_remove - test all types (native Parquet reader) (2 seconds, 871 milliseconds)
- array_remove - test all types (convert from Parquet) (4 seconds, 397 milliseconds)
- array_remove - fallback for unsupported type struct (196 milliseconds)
- array_append (2 seconds, 592 milliseconds)
- array_prepend (2 seconds, 318 milliseconds)
- ArrayInsert (1 second, 794 milliseconds)
- ArrayInsertUnsupportedArgs (267 milliseconds)
- array_contains - int values (395 milliseconds)
- array_contains - test all types (native Parquet reader) (5 seconds, 378 milliseconds)
- array_contains - array literals (1 second, 635 milliseconds)
- array_contains - test all types (convert from Parquet) (2 seconds, 886 milliseconds)
- array_distinct (1 second, 619 milliseconds)
- array_union (1 second, 639 milliseconds)
- array_max (2 seconds, 140 milliseconds)
- array_min (2 seconds, 92 milliseconds)
- array_intersect (1 second, 161 milliseconds)
- array_join (1 second, 218 milliseconds)
- arrays_overlap (1 second, 20 milliseconds)
- array_compact (1 second, 34 milliseconds)
- array_except - basic test (only integer values) (1 second, 204 milliseconds)
- array_except - test all types (native Parquet reader) (1 second, 762 milliseconds)
- array_except - test all types (convert from Parquet) (2 seconds, 682 milliseconds)
- array_repeat (1 second, 526 milliseconds)
- flatten - test all types (native Parquet reader) (1 second, 809 milliseconds)
- flatten - test all types (convert from Parquet) (2 seconds, 858 milliseconds)
- array literals (396 milliseconds)
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| |t1 |true |
| |t2 |true |
| |t3 |true |
+---------+---------+-----------+
- temp view *** FAILED *** (139 milliseconds)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1122.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1122.0 (TID 3714) (172.31.25.39 executor driver): org.apache.spark.SparkFileNotFoundException: File file:/Users/fchen/Project/arrow-datafusion-comet/spark/target/tmp/spark-aa0feecf-5344-4e11-a30c-e9defa6e093d/test.parquet does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:781)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:222)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
…e.scala Co-authored-by: Oleks V <[email protected]>
comphead
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, thanks @cfmcgrady
I triggered the failed the job, waiting for CI, other than that, I think the PR is good to go
* Support reverse function with ArrayType input * nit * refactor ut * assert * fix ci * Update spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala Co-authored-by: Oleks V <[email protected]> * Update spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala Co-authored-by: Oleks V <[email protected]> --------- Co-authored-by: Oleks V <[email protected]>
Which issue does this PR close?
Closes #2478.
Rationale for this change
What changes are included in this PR?
Support
reversefunction withArrayTypeinputHow are these changes tested?
Added UT