Skip to content

Commit 68f127b

Browse files
authored
feat: add partial support for date_format expression (#3201)
1 parent 1009e98 commit 68f127b

File tree

4 files changed

+223
-3
lines changed

4 files changed

+223
-3
lines changed

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ These settings can be used to determine which parts of the plan are accelerated
234234
| `spark.comet.expression.CreateArray.enabled` | Enable Comet acceleration for `CreateArray` | true |
235235
| `spark.comet.expression.CreateNamedStruct.enabled` | Enable Comet acceleration for `CreateNamedStruct` | true |
236236
| `spark.comet.expression.DateAdd.enabled` | Enable Comet acceleration for `DateAdd` | true |
237+
| `spark.comet.expression.DateFormatClass.enabled` | Enable Comet acceleration for `DateFormatClass` | true |
237238
| `spark.comet.expression.DateSub.enabled` | Enable Comet acceleration for `DateSub` | true |
238239
| `spark.comet.expression.DayOfMonth.enabled` | Enable Comet acceleration for `DayOfMonth` | true |
239240
| `spark.comet.expression.DayOfWeek.enabled` | Enable Comet acceleration for `DayOfWeek` | true |

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
185185

186186
private val temporalExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
187187
classOf[DateAdd] -> CometDateAdd,
188+
classOf[DateFormatClass] -> CometDateFormat,
188189
classOf[DateSub] -> CometDateSub,
189190
classOf[UnixDate] -> CometUnixDate,
190191
classOf[FromUnixTime] -> CometFromUnixTime,

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ package org.apache.comet.serde
2121

2222
import java.util.Locale
2323

24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, WeekDay, WeekOfYear, Year}
25-
import org.apache.spark.sql.types.{DateType, IntegerType}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, WeekDay, WeekOfYear, Year}
25+
import org.apache.spark.sql.types.{DateType, IntegerType, StringType}
2626
import org.apache.spark.unsafe.types.UTF8String
2727

2828
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -381,3 +381,103 @@ object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] {
381381
}
382382
}
383383
}
384+
385+
/**
386+
* Converts Spark DateFormatClass expression to DataFusion's to_char function.
387+
*
388+
* Spark uses Java SimpleDateFormat patterns while DataFusion uses strftime patterns. This
389+
* implementation supports a whitelist of common format strings that can be reliably mapped
390+
* between the two systems.
391+
*/
392+
object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
393+
394+
/**
395+
* Mapping from Spark SimpleDateFormat patterns to strftime patterns. Only formats in this map
396+
* are supported.
397+
*/
398+
val supportedFormats: Map[String, String] = Map(
399+
// Full date formats
400+
"yyyy-MM-dd" -> "%Y-%m-%d",
401+
"yyyy/MM/dd" -> "%Y/%m/%d",
402+
"yyyy-MM-dd HH:mm:ss" -> "%Y-%m-%d %H:%M:%S",
403+
"yyyy/MM/dd HH:mm:ss" -> "%Y/%m/%d %H:%M:%S",
404+
// Date components
405+
"yyyy" -> "%Y",
406+
"yy" -> "%y",
407+
"MM" -> "%m",
408+
"dd" -> "%d",
409+
// Time formats
410+
"HH:mm:ss" -> "%H:%M:%S",
411+
"HH:mm" -> "%H:%M",
412+
"HH" -> "%H",
413+
"mm" -> "%M",
414+
"ss" -> "%S",
415+
// Combined formats
416+
"yyyyMMdd" -> "%Y%m%d",
417+
"yyyyMM" -> "%Y%m",
418+
// Month and day names
419+
"EEEE" -> "%A",
420+
"EEE" -> "%a",
421+
"MMMM" -> "%B",
422+
"MMM" -> "%b",
423+
// 12-hour time
424+
"hh:mm:ss a" -> "%I:%M:%S %p",
425+
"hh:mm a" -> "%I:%M %p",
426+
"h:mm a" -> "%-I:%M %p",
427+
// ISO formats
428+
"yyyy-MM-dd'T'HH:mm:ss" -> "%Y-%m-%dT%H:%M:%S")
429+
430+
override def getSupportLevel(expr: DateFormatClass): SupportLevel = {
431+
// Check timezone - only UTC is fully compatible
432+
val timezone = expr.timeZoneId.getOrElse("UTC")
433+
val isUtc = timezone == "UTC" || timezone == "Etc/UTC"
434+
435+
expr.right match {
436+
case Literal(fmt: UTF8String, _) =>
437+
val format = fmt.toString
438+
if (supportedFormats.contains(format)) {
439+
if (isUtc) {
440+
Compatible()
441+
} else {
442+
Incompatible(Some(s"Non-UTC timezone '$timezone' may produce different results"))
443+
}
444+
} else {
445+
Unsupported(
446+
Some(
447+
s"Format '$format' is not supported. Supported formats: " +
448+
supportedFormats.keys.mkString(", ")))
449+
}
450+
case _ =>
451+
Unsupported(Some("Only literal format strings are supported"))
452+
}
453+
}
454+
455+
override def convert(
456+
expr: DateFormatClass,
457+
inputs: Seq[Attribute],
458+
binding: Boolean): Option[ExprOuterClass.Expr] = {
459+
// Get the format string - must be a literal for us to map it
460+
val strftimeFormat = expr.right match {
461+
case Literal(fmt: UTF8String, _) =>
462+
supportedFormats.get(fmt.toString)
463+
case _ => None
464+
}
465+
466+
strftimeFormat match {
467+
case Some(format) =>
468+
val childExpr = exprToProtoInternal(expr.left, inputs, binding)
469+
val formatExpr = exprToProtoInternal(Literal(format), inputs, binding)
470+
471+
val optExpr = scalarFunctionExprToProtoWithReturnType(
472+
"to_char",
473+
StringType,
474+
false,
475+
childExpr,
476+
formatExpr)
477+
optExprWithInfo(optExpr, expr, expr.left, expr.right)
478+
case None =>
479+
withInfo(expr, expr.left, expr.right)
480+
None
481+
}
482+
}
483+
}

spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2626
import org.apache.spark.sql.internal.SQLConf
2727
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
2828

29-
import org.apache.comet.serde.{CometTruncDate, CometTruncTimestamp}
29+
import org.apache.comet.serde.{CometDateFormat, CometTruncDate, CometTruncTimestamp}
3030
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
3131

3232
class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
@@ -123,6 +123,124 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH
123123
FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions())
124124
}
125125

126+
test("date_format with timestamp column") {
127+
// Filter out formats with embedded quotes that need special handling
128+
val supportedFormats = CometDateFormat.supportedFormats.keys.toSeq
129+
.filterNot(_.contains("'"))
130+
131+
createTimestampTestData.createOrReplaceTempView("tbl")
132+
133+
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
134+
for (format <- supportedFormats) {
135+
checkSparkAnswerAndOperator(s"SELECT c0, date_format(c0, '$format') from tbl order by c0")
136+
}
137+
// Test ISO format with embedded quotes separately using double-quoted string
138+
checkSparkAnswerAndOperator(
139+
"SELECT c0, date_format(c0, \"yyyy-MM-dd'T'HH:mm:ss\") from tbl order by c0")
140+
}
141+
}
142+
143+
test("date_format with specific format strings") {
144+
// Test specific format strings with explicit timestamp data
145+
createTimestampTestData.createOrReplaceTempView("tbl")
146+
147+
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
148+
// Date formats
149+
checkSparkAnswerAndOperator("SELECT c0, date_format(c0, 'yyyy-MM-dd') from tbl order by c0")
150+
checkSparkAnswerAndOperator("SELECT c0, date_format(c0, 'yyyy/MM/dd') from tbl order by c0")
151+
152+
// Time formats
153+
checkSparkAnswerAndOperator("SELECT c0, date_format(c0, 'HH:mm:ss') from tbl order by c0")
154+
checkSparkAnswerAndOperator("SELECT c0, date_format(c0, 'HH:mm') from tbl order by c0")
155+
156+
// Combined formats
157+
checkSparkAnswerAndOperator(
158+
"SELECT c0, date_format(c0, 'yyyy-MM-dd HH:mm:ss') from tbl order by c0")
159+
160+
// Day/month names
161+
checkSparkAnswerAndOperator("SELECT c0, date_format(c0, 'EEEE') from tbl order by c0")
162+
checkSparkAnswerAndOperator("SELECT c0, date_format(c0, 'MMMM') from tbl order by c0")
163+
164+
// 12-hour time
165+
checkSparkAnswerAndOperator("SELECT c0, date_format(c0, 'hh:mm:ss a') from tbl order by c0")
166+
167+
// ISO format (use double single-quotes to escape the literal T)
168+
checkSparkAnswerAndOperator(
169+
"SELECT c0, date_format(c0, \"yyyy-MM-dd'T'HH:mm:ss\") from tbl order by c0")
170+
}
171+
}
172+
173+
test("date_format with literal timestamp") {
174+
// Test specific literal timestamp formats
175+
// Disable constant folding to ensure Comet actually executes the expression
176+
withSQLConf(
177+
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
178+
SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
179+
"org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
180+
checkSparkAnswerAndOperator(
181+
"SELECT date_format(TIMESTAMP '2024-03-15 14:30:45', 'yyyy-MM-dd')")
182+
checkSparkAnswerAndOperator(
183+
"SELECT date_format(TIMESTAMP '2024-03-15 14:30:45', 'yyyy-MM-dd HH:mm:ss')")
184+
checkSparkAnswerAndOperator(
185+
"SELECT date_format(TIMESTAMP '2024-03-15 14:30:45', 'HH:mm:ss')")
186+
checkSparkAnswerAndOperator("SELECT date_format(TIMESTAMP '2024-03-15 14:30:45', 'EEEE')")
187+
checkSparkAnswerAndOperator(
188+
"SELECT date_format(TIMESTAMP '2024-03-15 14:30:45', 'hh:mm:ss a')")
189+
}
190+
}
191+
192+
test("date_format with null") {
193+
withSQLConf(
194+
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
195+
SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
196+
"org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
197+
checkSparkAnswerAndOperator("SELECT date_format(CAST(NULL AS TIMESTAMP), 'yyyy-MM-dd')")
198+
}
199+
}
200+
201+
test("date_format unsupported format falls back to Spark") {
202+
createTimestampTestData.createOrReplaceTempView("tbl")
203+
204+
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
205+
// Unsupported format string
206+
checkSparkAnswerAndFallbackReason(
207+
"SELECT c0, date_format(c0, 'yyyy-MM-dd EEEE') from tbl order by c0",
208+
"Format 'yyyy-MM-dd EEEE' is not supported")
209+
}
210+
}
211+
212+
test("date_format with non-UTC timezone falls back to Spark") {
213+
createTimestampTestData.createOrReplaceTempView("tbl")
214+
215+
val nonUtcTimezones =
216+
Seq("America/New_York", "America/Los_Angeles", "Europe/London", "Asia/Tokyo")
217+
218+
for (tz <- nonUtcTimezones) {
219+
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
220+
// Non-UTC timezones should fall back to Spark as Incompatible
221+
checkSparkAnswerAndFallbackReason(
222+
"SELECT c0, date_format(c0, 'yyyy-MM-dd HH:mm:ss') from tbl order by c0",
223+
s"Non-UTC timezone '$tz' may produce different results")
224+
}
225+
}
226+
}
227+
228+
test("date_format with non-UTC timezone works when allowIncompatible is enabled") {
229+
createTimestampTestData.createOrReplaceTempView("tbl")
230+
231+
val nonUtcTimezones = Seq("America/New_York", "Europe/London", "Asia/Tokyo")
232+
233+
for (tz <- nonUtcTimezones) {
234+
withSQLConf(
235+
SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz,
236+
"spark.comet.expr.DateFormatClass.allowIncompatible" -> "true") {
237+
// With allowIncompatible enabled, Comet will execute the expression
238+
// Results may differ from Spark but should not throw errors
239+
checkSparkAnswer("SELECT c0, date_format(c0, 'yyyy-MM-dd') from tbl order by c0")
240+
}
241+
}
242+
}
243+
126244
test("unix_date") {
127245
val r = new Random(42)
128246
val schema = StructType(Seq(StructField("c0", DataTypes.DateType, true)))

0 commit comments

Comments
 (0)