Skip to content

Commit afd2ae0

Browse files
feat: Add from_unixtime support (#1943)
Added from_unixtime support Added a unit test
1 parent 9817ceb commit afd2ae0

File tree

5 files changed

+101
-1
lines changed

5 files changed

+101
-1
lines changed

docs/source/user-guide/expressions.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,4 @@ The following Spark expressions are currently available. Any known compatibility
222222
| Coalesce | |
223223
| NormalizeNaNAndZero | |
224224
| ToPrettyString | |
225+
| FromUnixTime | Does not support format, supports only -8334601211038 <= sec <= 8210266876799 |

docs/spark_expressions_support.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@
175175
- [ ] dayofweek
176176
- [ ] dayofyear
177177
- [x] extract
178-
- [ ] from_unixtime
178+
- [x] from_unixtime
179179
- [ ] from_utc_timestamp
180180
- [ ] hour
181181
- [ ] last_day

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
8484
classOf[Chr] -> CometChr,
8585
classOf[InitCap] -> CometInitCap,
8686
classOf[BitLength] -> CometBitLength,
87+
classOf[FromUnixTime] -> CometFromUnixTime,
8788
classOf[Length] -> CometLength,
8889
classOf[StringInstr] -> CometStringInstr,
8990
classOf[StringRepeat] -> CometStringRepeat,
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, FromUnixTime, Literal}
23+
import org.apache.spark.sql.catalyst.util.TimestampFormatter
24+
25+
import org.apache.comet.CometSparkSessionExtensions.withInfo
26+
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto}
27+
28+
// TODO: DataFusion supports only -8334601211038 <= sec <= 8210266876799
29+
// https://github.com/apache/datafusion/issues/16594
30+
object CometFromUnixTime extends CometExpressionSerde with IncompatExpr {
31+
override def convert(
32+
expr: Expression,
33+
inputs: Seq[Attribute],
34+
binding: Boolean): Option[ExprOuterClass.Expr] = {
35+
expr match {
36+
case FromUnixTime(sec, format, timeZoneId) =>
37+
val secExpr = exprToProtoInternal(sec, inputs, binding)
38+
// TODO: DataFusion toChar does not support Spark datetime pattern format
39+
// https://github.com/apache/datafusion/issues/16577
40+
// https://github.com/apache/datafusion/issues/14536
41+
// After fixing these issues, use provided `format` instead of the manual replacement below
42+
val formatExpr = exprToProtoInternal(Literal("%Y-%m-%d %H:%M:%S"), inputs, binding)
43+
val timeZone = exprToProtoInternal(Literal(timeZoneId.orNull), inputs, binding)
44+
45+
if (format != Literal(TimestampFormatter.defaultPattern)) {
46+
withInfo(expr, "Datetime pattern format is unsupported")
47+
None
48+
} else if (secExpr.isDefined && formatExpr.isDefined) {
49+
val timestampExpr =
50+
scalarFunctionExprToProto("from_unixtime", Seq(secExpr, timeZone): _*)
51+
val optExpr = scalarFunctionExprToProto("to_char", Seq(timestampExpr, formatExpr): _*)
52+
optExprWithInfo(optExpr, expr, sec, format)
53+
} else {
54+
withInfo(expr, sec, format)
55+
None
56+
}
57+
}
58+
}
59+
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,6 +1605,45 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
16051605
}
16061606
}
16071607

1608+
test("from_unixtime") {
1609+
Seq(false, true).foreach { dictionary =>
1610+
withSQLConf(
1611+
"parquet.enable.dictionary" -> dictionary.toString,
1612+
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
1613+
val table = "test"
1614+
withTempDir { dir =>
1615+
val path = new Path(dir.toURI.toString, "test.parquet")
1616+
makeParquetFileAllPrimitiveTypes(
1617+
path,
1618+
dictionaryEnabled = dictionary,
1619+
-128,
1620+
128,
1621+
randomSize = 100)
1622+
withParquetTable(path.toString, table) {
1623+
// TODO: DataFusion supports only -8334601211038 <= sec <= 8210266876799
1624+
// https://github.com/apache/datafusion/issues/16594
1625+
// After fixing this issue, remove the where clause below
1626+
val where = "where _5 BETWEEN -8334601211038 AND 8210266876799"
1627+
checkSparkAnswerAndOperator(s"SELECT from_unixtime(_5) FROM $table $where")
1628+
checkSparkAnswerAndOperator(s"SELECT from_unixtime(_8) FROM $table $where")
1629+
// TODO: DataFusion toChar does not support Spark datetime pattern format
1630+
// https://github.com/apache/datafusion/issues/16577
1631+
// https://github.com/apache/datafusion/issues/14536
1632+
// After fixing these issues, change checkSparkAnswer to checkSparkAnswerAndOperator
1633+
checkSparkAnswer(s"SELECT from_unixtime(_5, 'yyyy') FROM $table $where")
1634+
checkSparkAnswer(s"SELECT from_unixtime(_8, 'yyyy') FROM $table $where")
1635+
withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") {
1636+
checkSparkAnswerAndOperator(s"SELECT from_unixtime(_5) FROM $table $where")
1637+
checkSparkAnswerAndOperator(s"SELECT from_unixtime(_8) FROM $table $where")
1638+
checkSparkAnswer(s"SELECT from_unixtime(_5, 'yyyy') FROM $table $where")
1639+
checkSparkAnswer(s"SELECT from_unixtime(_8, 'yyyy') FROM $table $where")
1640+
}
1641+
}
1642+
}
1643+
}
1644+
}
1645+
}
1646+
16081647
test("Decimal binary ops multiply is aligned to Spark") {
16091648
Seq(true, false).foreach { allowPrecisionLoss =>
16101649
withSQLConf(

0 commit comments

Comments
 (0)