Skip to content

Commit 57d9bd1

Browse files
authored
[Spark] Use JVM timezone for parsing in normalized partition values (delta-io#6047)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Use the JVM timezone for parsing the partition values in the new function `normalizedAddFiles` to stay consistent with the WRITE path. Follow up of PR delta-io#5987. The previous PR introduced a behavior change since it parses timestamps without a timezone identifier using the session timezone instead of the JVM timezone. The WRITE path always parses timestamps without a timezone identifier using the JVM timezone in [DelayedCommitProtocol](https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/files/DelayedCommitProtocol.scala#L177). By using the JVM timezone on READ now as well we preseve the original behavior. ## How was this patch tested? Unit tests ## Does this PR introduce _any_ user-facing changes? No
1 parent 959deac commit 57d9bd1

File tree

2 files changed

+195
-145
lines changed

2 files changed

+195
-145
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ case class AddFile(
904904
val typedPartitionValueLiterals = PartitionUtils.parsePartitionValues(
905905
partitionValues,
906906
partitionSchema,
907-
timeZone,
907+
java.util.TimeZone.getDefault.getID,
908908
validatePartitionColumns = true)
909909

910910
val stringNormalizedPartitionValues = typedPartitionValueLiterals.map {

spark/src/test/scala/org/apache/spark/sql/delta/actions/AddFileSuite.scala

Lines changed: 194 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ import org.apache.spark.sql.types._
3030
class AddFileSuite extends SparkFunSuite with SharedSparkSession with DeltaSQLCommandTest
3131
with QueryErrorsBase {
3232

33+
private def withJvmTimeZone[T](tzId: String)(block: => T): T = {
34+
val originalTz = java.util.TimeZone.getDefault
35+
try {
36+
java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone(tzId))
37+
block
38+
} finally {
39+
java.util.TimeZone.setDefault(originalTz)
40+
}
41+
}
42+
3343
private def createAddFileWithPartitionValue(partitionValues: Map[String, String]): AddFile = {
3444
AddFile(
3545
path = "test.parquet",
@@ -102,31 +112,33 @@ class AddFileSuite extends SparkFunSuite with SharedSparkSession with DeltaSQLCo
102112
for (enableNormalization <- BOOLEAN_DOMAIN) {
103113
test("normalizedPartitionValues for UTC timestamps partitions with different string formats, " +
104114
s"enableNormalization=$enableNormalization") {
105-
withSQLConf(
106-
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key ->
107-
enableNormalization.toString,
108-
"spark.sql.session.timeZone" -> "UTC") {
109-
withTempDir { tempDir =>
110-
// Create empty Delta table with tsCol as partition column
111-
spark.createDataFrame(
112-
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
113-
StructType(Seq(
114-
StructField("data", StringType),
115-
StructField("tsCol", TimestampType)
116-
))
117-
).write.format("delta").partitionBy("tsCol").save(tempDir.getCanonicalPath)
118-
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
119-
120-
val fileNonUtc = createAddFileWithPartitionValue(Map("tsCol" -> "2000-01-01 12:00:00"))
121-
val fileUtc =
122-
createAddFileWithPartitionValue(Map("tsCol" -> "2000-01-01T12:00:00.000000Z"))
123-
val normalizedNonUtc = fileNonUtc.normalizedPartitionValues(spark, deltaTxn)
124-
val normalizedUtc = fileUtc.normalizedPartitionValues(spark, deltaTxn)
125-
126-
if (enableNormalization) {
127-
assert(normalizedNonUtc == normalizedUtc)
128-
} else {
129-
assert(normalizedNonUtc != normalizedUtc)
115+
withJvmTimeZone("UTC") {
116+
withSQLConf(
117+
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key ->
118+
enableNormalization.toString,
119+
"spark.sql.session.timeZone" -> "UTC") {
120+
withTempDir { tempDir =>
121+
// Create empty Delta table with tsCol as partition column
122+
spark.createDataFrame(
123+
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
124+
StructType(Seq(
125+
StructField("data", StringType),
126+
StructField("tsCol", TimestampType)
127+
))
128+
).write.format("delta").partitionBy("tsCol").save(tempDir.getCanonicalPath)
129+
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
130+
131+
val fileNonUtc = createAddFileWithPartitionValue(Map("tsCol" -> "2000-01-01 12:00:00"))
132+
val fileUtc =
133+
createAddFileWithPartitionValue(Map("tsCol" -> "2000-01-01T12:00:00.000000Z"))
134+
val normalizedNonUtc = fileNonUtc.normalizedPartitionValues(spark, deltaTxn)
135+
val normalizedUtc = fileUtc.normalizedPartitionValues(spark, deltaTxn)
136+
137+
if (enableNormalization) {
138+
assert(normalizedNonUtc == normalizedUtc)
139+
} else {
140+
assert(normalizedNonUtc != normalizedUtc)
141+
}
130142
}
131143
}
132144
}
@@ -189,44 +201,46 @@ class AddFileSuite extends SparkFunSuite with SharedSparkSession with DeltaSQLCo
189201
for (enableNormalization <- BOOLEAN_DOMAIN) {
190202
test("normalizedPartitionValues with mixed timestamp and non-timestamp partitions, " +
191203
s"enableNormalization=$enableNormalization") {
192-
withSQLConf(
193-
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key ->
194-
enableNormalization.toString,
195-
"spark.sql.session.timeZone" -> "UTC"
196-
) {
197-
withTempDir { tempDir =>
198-
spark.createDataFrame(
199-
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
200-
StructType(Seq(
201-
StructField("data", StringType),
202-
StructField("tsCol", TimestampType),
203-
StructField("strCol", StringType),
204-
StructField("intCol", IntegerType)
205-
))
206-
).write.format("delta")
207-
.partitionBy("tsCol", "strCol", "intCol")
208-
.save(tempDir.getCanonicalPath)
209-
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
210-
211-
val file1 = createAddFileWithPartitionValue(
212-
Map("tsCol" -> "2000-01-01 12:00:00", "strCol" -> "value", "intCol" -> "42"))
213-
val file2 = createAddFileWithPartitionValue(
214-
Map("tsCol" -> "2000-01-01T12:00:00.000000Z", "strCol" -> "value", "intCol" -> "42"))
215-
val normalized1 = file1.normalizedPartitionValues(spark, deltaTxn)
216-
val normalized2 = file2.normalizedPartitionValues(spark, deltaTxn)
217-
218-
if (enableNormalization) {
219-
// Timestamp columns should normalize to same value (same microseconds)
220-
assert(normalized1("tsCol") == normalized2("tsCol"))
221-
// Non-timestamp columns should be typed literals
222-
assert(normalized1("strCol") == Literal("value"))
223-
assert(normalized1("intCol") == Literal.create(42, IntegerType))
224-
} else {
225-
// Without normalization the partition values are different string literals
226-
assert(normalized1 != normalized2)
227-
// Normalized partition values should be string literals of original values
228-
assert(normalized1("tsCol") == Literal("2000-01-01 12:00:00"))
229-
assert(normalized2("tsCol") == Literal("2000-01-01T12:00:00.000000Z"))
204+
withJvmTimeZone("UTC") {
205+
withSQLConf(
206+
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key ->
207+
enableNormalization.toString,
208+
"spark.sql.session.timeZone" -> "UTC"
209+
) {
210+
withTempDir { tempDir =>
211+
spark.createDataFrame(
212+
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
213+
StructType(Seq(
214+
StructField("data", StringType),
215+
StructField("tsCol", TimestampType),
216+
StructField("strCol", StringType),
217+
StructField("intCol", IntegerType)
218+
))
219+
).write.format("delta")
220+
.partitionBy("tsCol", "strCol", "intCol")
221+
.save(tempDir.getCanonicalPath)
222+
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
223+
224+
val file1 = createAddFileWithPartitionValue(
225+
Map("tsCol" -> "2000-01-01 12:00:00", "strCol" -> "value", "intCol" -> "42"))
226+
val file2 = createAddFileWithPartitionValue(
227+
Map("tsCol" -> "2000-01-01T12:00:00.000000Z", "strCol" -> "value", "intCol" -> "42"))
228+
val normalized1 = file1.normalizedPartitionValues(spark, deltaTxn)
229+
val normalized2 = file2.normalizedPartitionValues(spark, deltaTxn)
230+
231+
if (enableNormalization) {
232+
// Timestamp columns should normalize to same value (same microseconds)
233+
assert(normalized1("tsCol") == normalized2("tsCol"))
234+
// Non-timestamp columns should be typed literals
235+
assert(normalized1("strCol") == Literal("value"))
236+
assert(normalized1("intCol") == Literal.create(42, IntegerType))
237+
} else {
238+
// Without normalization the partition values are different string literals
239+
assert(normalized1 != normalized2)
240+
// Normalized partition values should be string literals of original values
241+
assert(normalized1("tsCol") == Literal("2000-01-01 12:00:00"))
242+
assert(normalized2("tsCol") == Literal("2000-01-01T12:00:00.000000Z"))
243+
}
230244
}
231245
}
232246
}
@@ -412,27 +426,29 @@ class AddFileSuite extends SparkFunSuite with SharedSparkSession with DeltaSQLCo
412426
}
413427

414428
test("normalizedPartitionValues with a non UTC session time zone gets converted to UTC") {
415-
withSQLConf(
416-
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key -> "true",
417-
"spark.sql.session.timeZone" -> "Europe/Berlin" // UTC + 1 in winter time
418-
) {
419-
withTempDir { tempDir =>
420-
spark.createDataFrame(
421-
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
422-
StructType(Seq(
423-
StructField("data", StringType),
424-
StructField("tsCol", TimestampType)
425-
))
426-
).write.format("delta").partitionBy("tsCol").save(tempDir.getCanonicalPath)
427-
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
429+
withJvmTimeZone("Europe/Berlin") {
430+
withSQLConf(
431+
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key -> "true",
432+
"spark.sql.session.timeZone" -> "Europe/Berlin" // UTC + 1 in winter time
433+
) {
434+
withTempDir { tempDir =>
435+
spark.createDataFrame(
436+
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
437+
StructType(Seq(
438+
StructField("data", StringType),
439+
StructField("tsCol", TimestampType)
440+
))
441+
).write.format("delta").partitionBy("tsCol").save(tempDir.getCanonicalPath)
442+
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
428443

429-
val file = createAddFileWithPartitionValue(Map("tsCol" -> "2000-01-01 12:00:00"))
430-
// The normalized timestamp should be 11:00 UTC
431-
// Parsed in Europe/Berlin (UTC+1), so 12:00 Berlin = 11:00 UTC
432-
val normalizedTimestamp = file.normalizedPartitionValues(spark, deltaTxn)("tsCol")
444+
val file = createAddFileWithPartitionValue(Map("tsCol" -> "2000-01-01 12:00:00"))
445+
// The normalized timestamp should be 11:00 UTC
446+
// Parsed in Europe/Berlin (UTC+1), so 12:00 Berlin = 11:00 UTC
447+
val normalizedTimestamp = file.normalizedPartitionValues(spark, deltaTxn)("tsCol")
433448

434-
assert(normalizedTimestamp == timestampLiteral("2000-01-01 12:00:00", "Europe/Berlin"))
435-
assert(normalizedTimestamp == timestampLiteral("2000-01-01 11:00:00", "UTC"))
449+
assert(normalizedTimestamp == timestampLiteral("2000-01-01 12:00:00", "Europe/Berlin"))
450+
assert(normalizedTimestamp == timestampLiteral("2000-01-01 11:00:00", "UTC"))
451+
}
436452
}
437453
}
438454
}
@@ -474,75 +490,109 @@ class AddFileSuite extends SparkFunSuite with SharedSparkSession with DeltaSQLCo
474490
}
475491

476492
test("normalizedPartitionValues with missing leading zeroes in timestamp are accepted") {
477-
withSQLConf(
478-
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key -> "true",
479-
"spark.sql.session.timeZone" -> "UTC"
480-
) {
481-
withTempDir { tempDir =>
482-
spark.createDataFrame(
483-
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
484-
StructType(Seq(
485-
StructField("data", StringType),
486-
StructField("tsCol", TimestampType)
487-
))
488-
).write.format("delta").partitionBy("tsCol").save(tempDir.getCanonicalPath)
489-
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
493+
withJvmTimeZone("UTC") {
494+
withSQLConf(
495+
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key -> "true",
496+
"spark.sql.session.timeZone" -> "UTC"
497+
) {
498+
withTempDir { tempDir =>
499+
spark.createDataFrame(
500+
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
501+
StructType(Seq(
502+
StructField("data", StringType),
503+
StructField("tsCol", TimestampType)
504+
))
505+
).write.format("delta").partitionBy("tsCol").save(tempDir.getCanonicalPath)
506+
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
490507

491-
def getNormalizedTimestamp(tsValue: String): Literal =
492-
createAddFileWithPartitionValue(Map("tsCol" -> tsValue))
493-
.normalizedPartitionValues(spark, deltaTxn)("tsCol")
494-
495-
// Missing leading zero in hours: "1:00:00" vs "01:00:00"
496-
val hoursWithout = getNormalizedTimestamp("2000-01-01 1:00:00")
497-
val hoursWith = getNormalizedTimestamp("2000-01-01 01:00:00")
498-
val expectedHours = timestampLiteral("2000-01-01 01:00:00", "UTC")
499-
assert(hoursWithout == hoursWith)
500-
assert(hoursWith == expectedHours)
501-
502-
// Missing leading zero in minutes: "01:2:00" vs "01:02:00"
503-
val minutesWithout = getNormalizedTimestamp("2000-01-01 01:2:00")
504-
val minutesWith = getNormalizedTimestamp("2000-01-01 01:02:00")
505-
val expectedMinutes = timestampLiteral("2000-01-01 01:02:00", "UTC")
506-
assert(minutesWithout == minutesWith)
507-
assert(minutesWith == expectedMinutes)
508-
509-
// Missing leading zero in seconds: "01:02:3" vs "01:02:03"
510-
val secondsWithout = getNormalizedTimestamp("2000-01-01 01:02:3")
511-
val secondsWith = getNormalizedTimestamp("2000-01-01 01:02:03")
512-
val expectedSeconds = timestampLiteral("2000-01-01 01:02:03", "UTC")
513-
assert(secondsWithout == secondsWith)
514-
assert(secondsWith == expectedSeconds)
515-
516-
// All missing leading zeroes: "1:2:3" vs "01:02:03"
517-
val allWithout = getNormalizedTimestamp("2000-01-01 1:2:3")
518-
val allWith = getNormalizedTimestamp("2000-01-01 01:02:03")
519-
val expectedAll = timestampLiteral("2000-01-01 01:02:03", "UTC")
520-
assert(allWithout == allWith)
521-
assert(allWith == expectedAll)
508+
def getNormalizedTimestamp(tsValue: String): Literal =
509+
createAddFileWithPartitionValue(Map("tsCol" -> tsValue))
510+
.normalizedPartitionValues(spark, deltaTxn)("tsCol")
511+
512+
// Missing leading zero in hours: "1:00:00" vs "01:00:00"
513+
val hoursWithout = getNormalizedTimestamp("2000-01-01 1:00:00")
514+
val hoursWith = getNormalizedTimestamp("2000-01-01 01:00:00")
515+
val expectedHours = timestampLiteral("2000-01-01 01:00:00", "UTC")
516+
assert(hoursWithout == hoursWith)
517+
assert(hoursWith == expectedHours)
518+
519+
// Missing leading zero in minutes: "01:2:00" vs "01:02:00"
520+
val minutesWithout = getNormalizedTimestamp("2000-01-01 01:2:00")
521+
val minutesWith = getNormalizedTimestamp("2000-01-01 01:02:00")
522+
val expectedMinutes = timestampLiteral("2000-01-01 01:02:00", "UTC")
523+
assert(minutesWithout == minutesWith)
524+
assert(minutesWith == expectedMinutes)
525+
526+
// Missing leading zero in seconds: "01:02:3" vs "01:02:03"
527+
val secondsWithout = getNormalizedTimestamp("2000-01-01 01:02:3")
528+
val secondsWith = getNormalizedTimestamp("2000-01-01 01:02:03")
529+
val expectedSeconds = timestampLiteral("2000-01-01 01:02:03", "UTC")
530+
assert(secondsWithout == secondsWith)
531+
assert(secondsWith == expectedSeconds)
532+
533+
// All missing leading zeroes: "1:2:3" vs "01:02:03"
534+
val allWithout = getNormalizedTimestamp("2000-01-01 1:2:3")
535+
val allWith = getNormalizedTimestamp("2000-01-01 01:02:03")
536+
val expectedAll = timestampLiteral("2000-01-01 01:02:03", "UTC")
537+
assert(allWithout == allWith)
538+
assert(allWith == expectedAll)
539+
}
522540
}
523541
}
524542
}
525543

526544
test("normalizedPartitionValues with ISO 8601 format with T separator but no time zone") {
527-
withSQLConf(
528-
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key -> "true",
529-
"spark.sql.session.timeZone" -> "Europe/Berlin" // UTC + 1 in winter time
530-
) {
531-
withTempDir { tempDir =>
532-
spark.createDataFrame(
533-
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
534-
StructType(Seq(
535-
StructField("data", StringType),
536-
StructField("tsCol", TimestampType)
537-
))
538-
).write.format("delta").partitionBy("tsCol").save(tempDir.getCanonicalPath)
539-
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
545+
withJvmTimeZone("Europe/Berlin") {
546+
withSQLConf(
547+
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key -> "true",
548+
"spark.sql.session.timeZone" -> "Europe/Berlin" // UTC + 1 in winter time
549+
) {
550+
withTempDir { tempDir =>
551+
spark.createDataFrame(
552+
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
553+
StructType(Seq(
554+
StructField("data", StringType),
555+
StructField("tsCol", TimestampType)
556+
))
557+
).write.format("delta").partitionBy("tsCol").save(tempDir.getCanonicalPath)
558+
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
540559

541-
// ISO 8601 format with 'T' separator but no time zone should use the session time zone
542-
val file = createAddFileWithPartitionValue(Map("tsCol" -> "2000-01-01T12:00:00"))
543-
// The normalized timestamp should be 11:00 UTC (12:00 Berlin = 11:00 UTC)
544-
val normalized = file.normalizedPartitionValues(spark, deltaTxn)
545-
assert(normalized("tsCol") == timestampLiteral("2000-01-01 12:00:00", "Europe/Berlin"))
560+
// ISO 8601 format with 'T' separator but no time zone should use the JVM time zone.
561+
val file = createAddFileWithPartitionValue(Map("tsCol" -> "2000-01-01T12:00:00"))
562+
// The normalized timestamp should be 11:00 UTC (12:00 Berlin = 11:00 UTC)
563+
val normalized = file.normalizedPartitionValues(spark, deltaTxn)
564+
assert(normalized("tsCol") == timestampLiteral("2000-01-01 12:00:00", "Europe/Berlin"))
565+
}
566+
}
567+
}
568+
}
569+
570+
test("normalizedPartitionValues should also use the JVM timezone on read") {
571+
withJvmTimeZone("America/Los_Angeles") {
572+
withSQLConf(
573+
DeltaSQLConf.DELTA_NORMALIZE_PARTITION_VALUES_ON_READ.key -> "true",
574+
"spark.sql.session.timeZone" -> "UTC"
575+
) {
576+
withTempDir { tempDir =>
577+
spark.createDataFrame(
578+
spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
579+
StructType(Seq(
580+
StructField("data", StringType),
581+
StructField("tsCol", TimestampType)
582+
))
583+
).write.format("delta").partitionBy("tsCol").save(tempDir.getCanonicalPath)
584+
val deltaTxn = DeltaLog.forTable(spark, tempDir.getCanonicalPath).startTransaction()
585+
586+
// ON WRITE we use the JVM timezone, parsing this as an America/Los_Angeles timestamp.
587+
val file = createAddFileWithPartitionValue(Map("tsCol" -> "2000-01-01 12:00:00"))
588+
val normalized = file.normalizedPartitionValues(spark, deltaTxn)
589+
590+
// ON READ we also need to use the JVM timezone again, reading it again as an
591+
// America/Los_Angeles timestamp.
592+
assert(
593+
normalized("tsCol") == timestampLiteral("2000-01-01 12:00:00", "America/Los_Angeles"))
594+
assert(normalized("tsCol") != timestampLiteral("2000-01-01 12:00:00", "UTC"))
595+
}
546596
}
547597
}
548598
}

0 commit comments

Comments
 (0)