Skip to content

Commit 94ca968

Browse files
authored
feat: Implement ToPrettyString (#1921)
1 parent 4b8782a commit 94ca968

File tree

6 files changed

+162
-2
lines changed

6 files changed

+162
-2
lines changed

docs/source/user-guide/expressions.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,10 @@ The following Spark expressions are currently available. Any known compatibility
208208
## Other
209209

210210
| Expression | Notes |
211-
| ----------------------- | ------------------------------------------------------------------------------- |
211+
|-------------------------| ------------------------------------------------------------------------------- |
212212
| Cast | See compatibility guide for list of supported cast expressions and known issues |
213213
| BloomFilterMightContain | |
214214
| ScalarSubquery | |
215215
| Coalesce | |
216216
| NormalizeNaNAndZero | |
217+
| ToPrettyString | |

native/core/src/execution/planner.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,25 @@ impl PhysicalPlanner {
739739
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
740740
Ok(Arc::new(ToJson::new(child, &expr.timezone)))
741741
}
742+
ExprStruct::ToPrettyString(expr) => {
743+
let mut spark_cast_options =
744+
SparkCastOptions::new(EvalMode::Try, &expr.timezone, true);
745+
let null_string = "NULL";
746+
spark_cast_options.null_string = null_string.to_string();
747+
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
748+
let cast = Arc::new(Cast::new(
749+
Arc::clone(&child),
750+
DataType::Utf8,
751+
spark_cast_options,
752+
));
753+
Ok(Arc::new(IfExpr::new(
754+
Arc::new(IsNullExpr::new(child)),
755+
Arc::new(Literal::new(ScalarValue::Utf8(Some(
756+
null_string.to_string(),
757+
)))),
758+
cast,
759+
)))
760+
}
742761
ExprStruct::ListExtract(expr) => {
743762
let child =
744763
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;

native/proto/src/proto/expr.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ message Expr {
8282
GetArrayStructFields get_array_struct_fields = 57;
8383
ArrayInsert array_insert = 58;
8484
MathExpr integral_divide = 59;
85+
ToPrettyString to_pretty_string = 60;
8586
}
8687
}
8788

@@ -282,6 +283,11 @@ message ToJson {
282283
bool ignore_null_fields = 6;
283284
}
284285

286+
message ToPrettyString {
287+
Expr child = 1;
288+
string timezone = 2;
289+
}
290+
285291
message Hour {
286292
Expr child = 1;
287293
string timezone = 2;

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,8 @@ pub struct SparkCastOptions {
812812
/// We also use the cast logic for adapting Parquet schemas, so this flag is used
813813
/// for that use case
814814
pub is_adapting_schema: bool,
815+
/// String to use to represent null values
816+
pub null_string: String,
815817
}
816818

817819
impl SparkCastOptions {
@@ -822,6 +824,7 @@ impl SparkCastOptions {
822824
allow_incompat,
823825
allow_cast_unsigned_ints: false,
824826
is_adapting_schema: false,
827+
null_string: "null".to_string(),
825828
}
826829
}
827830

@@ -832,6 +835,7 @@ impl SparkCastOptions {
832835
allow_incompat,
833836
allow_cast_unsigned_ints: false,
834837
is_adapting_schema: false,
838+
null_string: "null".to_string(),
835839
}
836840
}
837841
}
@@ -1141,7 +1145,7 @@ fn casts_struct_to_string(
11411145
str.push_str(", ");
11421146
}
11431147
if field.is_null(row_index) {
1144-
str.push_str("null");
1148+
str.push_str(&spark_cast_options.null_string);
11451149
} else {
11461150
str.push_str(field.value(row_index));
11471151
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,42 @@ object QueryPlanSerde extends Logging with CometExprShim {
850850
None
851851
}
852852

853+
// ToPrettyString is new in Spark 3.5
854+
case _
855+
if expr.getClass.getSimpleName == "ToPrettyString" && expr
856+
.isInstanceOf[UnaryExpression] && expr.isInstanceOf[TimeZoneAwareExpression] =>
857+
val child = expr.asInstanceOf[UnaryExpression].child
858+
val timezoneId = expr.asInstanceOf[TimeZoneAwareExpression].timeZoneId
859+
860+
handleCast(
861+
expr,
862+
child,
863+
inputs,
864+
binding,
865+
DataTypes.StringType,
866+
timezoneId,
867+
CometEvalMode.TRY) match {
868+
case Some(_) =>
869+
exprToProtoInternal(child, inputs, binding) match {
870+
case Some(p) =>
871+
val toPrettyString = ExprOuterClass.ToPrettyString
872+
.newBuilder()
873+
.setChild(p)
874+
.setTimezone(timezoneId.getOrElse("UTC"))
875+
.build()
876+
Some(
877+
ExprOuterClass.Expr
878+
.newBuilder()
879+
.setToPrettyString(toPrettyString)
880+
.build())
881+
case _ =>
882+
withInfo(expr, child)
883+
None
884+
}
885+
case None =>
886+
None
887+
}
888+
853889
case StructsToJson(options, child, timezoneId) =>
854890
if (options.nonEmpty) {
855891
withInfo(expr, "StructsToJson with options is not supported")
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql
21+
22+
import org.apache.comet.CometConf
23+
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible}
24+
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
25+
import org.apache.commons.io.FileUtils
26+
import org.apache.spark.sql.catalyst.TableIdentifier
27+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
28+
import org.apache.spark.sql.catalyst.expressions.{Alias, ToPrettyString}
29+
import org.apache.spark.sql.catalyst.plans.logical.Project
30+
import org.apache.spark.sql.internal.SQLConf
31+
import org.apache.spark.sql.types.DataTypes
32+
33+
import java.io.File
34+
import java.text.SimpleDateFormat
35+
import scala.util.Random
36+
37+
class CometToPrettyStringSuite extends CometTestBase {
38+
39+
private var filename: String = null
40+
41+
/**
42+
* We use Asia/Kathmandu because it has a non-zero number of minutes as the offset, so is an
43+
* interesting edge case. Also, this timezone tends to be different from the default system
44+
* timezone.
45+
*
46+
* Represents UTC+5:45
47+
*/
48+
private val defaultTimezone = "Asia/Kathmandu"
49+
50+
override def beforeAll(): Unit = {
51+
super.beforeAll()
52+
val tempDir = System.getProperty("java.io.tmpdir")
53+
filename = s"$tempDir/CometFuzzTestSuite_${System.currentTimeMillis()}.parquet"
54+
val random = new Random(42)
55+
withSQLConf(
56+
CometConf.COMET_ENABLED.key -> "false",
57+
SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {
58+
val options =
59+
DataGenOptions(
60+
generateArray = true,
61+
generateStruct = true,
62+
generateMap = true,
63+
generateNegativeZero = false,
64+
// override base date due to known issues with experimental scans
65+
baseDate =
66+
new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime)
67+
ParquetGenerator.makeParquetFile(random, spark, filename, 1000, options)
68+
}
69+
}
70+
71+
protected override def afterAll(): Unit = {
72+
super.afterAll()
73+
FileUtils.deleteDirectory(new File(filename))
74+
}
75+
76+
test("ToPrettyString") {
77+
val df = spark.read.parquet(filename)
78+
df.createOrReplaceTempView("t1")
79+
val table = spark.sessionState.catalog.lookupRelation(TableIdentifier("t1"))
80+
81+
for (field <- df.schema.fields) {
82+
val col = field.name
83+
val prettyExpr = Alias(ToPrettyString(UnresolvedAttribute(col)), s"pretty_$col")()
84+
val plan = Project(Seq(prettyExpr), table)
85+
val analyzed = spark.sessionState.analyzer.execute(plan)
86+
val result: DataFrame = Dataset.ofRows(spark, analyzed)
87+
CometCast.isSupported(field.dataType, DataTypes.StringType, Some(spark.sessionState.conf.sessionLocalTimeZone), CometEvalMode.TRY) match {
88+
case _: Compatible => checkSparkAnswerAndOperator(result)
89+
case _ => checkSparkAnswer(result)
90+
}
91+
}
92+
}
93+
94+
}

0 commit comments

Comments
 (0)