Skip to content

Commit b08e8ba

Browse files
authored
[UniForm] Convert stats for TIMESTAMP data type in converting iceberg metadata to delta (delta-io#4339)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> ## Description While converting iceberg metadata to delta metadata, convert stats for timestamp type in the fast path ## How was this patch tested? UTs
1 parent 203f869 commit b08e8ba

File tree

3 files changed

+192
-16
lines changed

3 files changed

+192
-16
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergStatsUtils.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ import org.apache.iceberg.types.Types.{
3535
MapType => IcebergMapType,
3636
NestedField,
3737
StringType => IcebergStringType,
38-
StructType => IcebergStructType
38+
StructType => IcebergStructType,
39+
TimestampType => IcebergTimestampType
3940
}
4041
import org.apache.iceberg.util.DateTimeUtil
4142

@@ -56,7 +57,7 @@ object IcebergStatsUtils extends DeltaLogging {
5657
TypeID.DOUBLE,
5758
TypeID.DATE,
5859
// TypeID.TIME,
59-
// TypeID.TIMESTAMP,
60+
TypeID.TIMESTAMP,
6061
// TypeID.TIMESTAMP_NANO,
6162
TypeID.STRING,
6263
// TypeID.UUID,
@@ -182,6 +183,9 @@ object IcebergStatsUtils extends DeltaLogging {
182183
case (_: IcebergDateType, bb: ByteBuffer) =>
183184
val daysFromEpoch = Conversions.fromByteBuffer(ftype, bb).asInstanceOf[Int]
184185
DateTimeUtil.dateFromDays(daysFromEpoch).toString
186+
case (tsType: IcebergTimestampType, bb: ByteBuffer) =>
187+
val microts = Conversions.fromByteBuffer(tsType, bb).asInstanceOf[JLong]
188+
microTimestampToString(microts, tsType)
185189
case (_, bb: ByteBuffer) =>
186190
Conversions.fromByteBuffer(ftype, bb)
187191
case _ => throw new IllegalArgumentException("unable to deserialize unknown values")
@@ -221,4 +225,15 @@ object IcebergStatsUtils extends DeltaLogging {
221225
)
222226
)
223227
}
228+
229+
private def microTimestampToString(
230+
microTS: JLong, tsType: IcebergTimestampType): String = {
231+
// iceberg timestamptz will have shouldAdjustToUTC() as true
232+
if (tsType.shouldAdjustToUTC()) {
233+
DateTimeUtil.microsToIsoTimestamptz(microTS)
234+
} else {
235+
// iceberg timestamp doesn't need to adjust to UTC
236+
DateTimeUtil.microsToIsoTimestamp(microTS)
237+
}
238+
}
224239
}

iceberg/src/test/scala/org/apache/spark/sql/delta/CloneIcebergSuite.scala

Lines changed: 163 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ package org.apache.spark.sql.delta
1818

1919
// scalastyle:off import.ordering.noEmptyLine
2020
import java.sql.Date
21-
import java.time.LocalDate
21+
import java.sql.Timestamp
22+
import java.time.LocalDateTime
2223
import java.time.LocalTime
24+
import java.time.format.DateTimeFormatter
25+
import java.util.TimeZone
2326

2427
import scala.collection.JavaConverters._
2528
import scala.util.Try
@@ -38,8 +41,9 @@ import org.apache.iceberg.types.Types.NestedField
3841
import org.apache.spark.SparkConf
3942
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
4043
import org.apache.spark.sql.catalyst.TableIdentifier
41-
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{stringToDate, toJavaDate}
44+
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, microsToLocalDateTime, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone, toJavaDate, toJavaTimestamp}
4245
import org.apache.spark.sql.functions.{col, expr, from_json, lit, struct, substring}
46+
import org.apache.spark.sql.internal.SQLConf
4347
import org.apache.spark.sql.types.{Decimal, DecimalType, LongType, StringType, StructField, StructType, TimestampType}
4448
import org.apache.spark.unsafe.types.UTF8String
4549
// scalastyle:on import.ordering.noEmptyLine
@@ -554,12 +558,12 @@ trait CloneIcebergSuiteBase extends QueryTest
554558
val filesRead =
555559
getFilesRead(spark, deltaLog, predicate, checkEmptyUnusedFilters = false)
556560
try {
557-
assert(filesRead.size == expectedFilesReadNum)
558-
assert(filesRead.map(_.partitionValues.head._2).toSet ==
559-
expectedFilesReadIndices.map(_.toString))
560561
checkAnswer(
561562
spark.sql(s"select * from $cloneTable where $predicate"), df.where(predicate)
562563
)
564+
assert(filesRead.size == expectedFilesReadNum)
565+
assert(filesRead.map(_.partitionValues.head._2).toSet ==
566+
expectedFilesReadIndices.map(_.toString))
563567
} catch {
564568
case e: Throwable =>
565569
throw new RuntimeException(
@@ -597,7 +601,7 @@ trait CloneIcebergSuiteBase extends QueryTest
597601
expectedFilesReadIndices = Set(2)
598602
)
599603
),
600-
mode
604+
mode = mode
601605
)
602606
}
603607

@@ -620,7 +624,7 @@ trait CloneIcebergSuiteBase extends QueryTest
620624
expectedFilesReadIndices = Set(1)
621625
)
622626
),
623-
mode
627+
mode = mode
624628
)
625629
}
626630

@@ -643,7 +647,7 @@ trait CloneIcebergSuiteBase extends QueryTest
643647
expectedFilesReadIndices = Set(1)
644648
)
645649
),
646-
mode
650+
mode = mode
647651
)
648652
}
649653

@@ -680,7 +684,7 @@ trait CloneIcebergSuiteBase extends QueryTest
680684
expectedFilesReadIndices = Set()
681685
)
682686
),
683-
mode
687+
mode = mode
684688
)
685689
}
686690

@@ -725,8 +729,157 @@ trait CloneIcebergSuiteBase extends QueryTest
725729
expectedFilesReadIndices = Set(4, 5)
726730
)
727731
),
728-
mode
732+
mode = mode
733+
)
734+
}
735+
736+
// Exactly on minutes
737+
testClone("Convert Iceberg timestamptz type - 1") { mode =>
738+
testStatsConversionAndDataSkipping(
739+
icebergDataType = "timestamp", // spark timestamp => iceberg timestamptz
740+
tableData = Seq(
741+
toTimestamp("1908-03-15 10:1:17")
742+
),
743+
extractFunc = row => {
744+
timestamptzExtracter(row, pattern = "yyyy-MM-dd'T'HH:mm:ssXXX")
745+
},
746+
expectedStats = Seq("1908-03-15T10:01:17+00:00"),
747+
dataSkippingTestParams = Seq(
748+
DataSkippingTestParam(
749+
predicate = "col2 > TIMESTAMP'1908-03-15T10:01:18+00:00'",
750+
expectedFilesReadNum = 0,
751+
expectedFilesReadIndices = Set()
752+
),
753+
DataSkippingTestParam(
754+
predicate = "col2 <= TIMESTAMP'1908-03-15T10:01:17+00:00'",
755+
expectedFilesReadNum = 1,
756+
expectedFilesReadIndices = Set(1)
757+
)
758+
),
759+
mode = mode
760+
)
761+
}
762+
763+
// Fractional time
764+
testClone("Convert Iceberg timestamptz type - 2") { mode =>
765+
testStatsConversionAndDataSkipping(
766+
icebergDataType = "timestamp", // spark timestamp => iceberg timestamptz
767+
tableData = Seq(
768+
toTimestamp("1997-12-11 5:40:19.23349")
769+
),
770+
extractFunc = row => {
771+
timestamptzExtracter(row, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSSSXXX")
772+
},
773+
expectedStats = Seq("1997-12-11T05:40:19.23349+00:00"),
774+
dataSkippingTestParams = Seq(
775+
DataSkippingTestParam(
776+
predicate = "col2 > TIMESTAMP'1997-12-11T05:40:19.233+00:00'",
777+
expectedFilesReadNum = 1,
778+
expectedFilesReadIndices = Set(1)
779+
),
780+
DataSkippingTestParam(
781+
predicate = "col2 <= TIMESTAMP'1997-12-11T05:40:19.10+00:00'",
782+
expectedFilesReadNum = 0,
783+
expectedFilesReadIndices = Set()
784+
)
785+
),
786+
mode = mode
787+
)
788+
}
789+
790+
// Customized timezone
791+
testClone("Convert Iceberg timestamptz type - 3") { mode =>
792+
testStatsConversionAndDataSkipping(
793+
icebergDataType = "timestamp", // spark timestamp => iceberg timestamptz
794+
tableData = Seq(
795+
toTimestamp("2077-11-11 3:23:11.23456+02:15")
796+
),
797+
extractFunc = row => {
798+
timestamptzExtracter(row, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSSSXXX")
799+
},
800+
expectedStats = Seq("2077-11-11T01:08:11.23456+00:00"),
801+
dataSkippingTestParams = Seq(
802+
DataSkippingTestParam(
803+
predicate = "col2 > TIMESTAMP'2077-11-11T03:23:11.23456+02:16'",
804+
expectedFilesReadNum = 1,
805+
expectedFilesReadIndices = Set(1)
806+
),
807+
DataSkippingTestParam(
808+
predicate = "col2 < TIMESTAMP'2077-11-11T03:23:11.23456+02:16'",
809+
expectedFilesReadNum = 0,
810+
expectedFilesReadIndices = Set()
811+
)
812+
),
813+
mode = mode
814+
)
815+
}
816+
817+
// Exactly on minutes
818+
testClone("Convert Iceberg timestamp type - 1") { mode =>
819+
testStatsConversionAndDataSkipping(
820+
icebergDataType = "timestamp_ntz", // spark timestamp_ntz => iceberg timestamp
821+
tableData = Seq(
822+
toTimestampNTZ("2024-01-02T02:04:05.123456")
823+
),
824+
extractFunc = row => {
825+
row.get(0).asInstanceOf[LocalDateTime].toString
826+
},
827+
expectedStats = Seq("2024-01-02T02:04:05.123456"),
828+
dataSkippingTestParams = Seq(
829+
DataSkippingTestParam(
830+
predicate = "col2 > TIMESTAMP'2024-01-02T02:04:04.123456'",
831+
expectedFilesReadNum = 1,
832+
expectedFilesReadIndices = Set(1)
833+
)
834+
),
835+
mode = mode
836+
)
837+
}
838+
839+
// Fractional time
840+
testClone("Convert Iceberg timestamp type - 2") { mode =>
841+
testStatsConversionAndDataSkipping(
842+
icebergDataType = "timestamp_ntz", // spark timestamp_ntz => iceberg timestamp
843+
tableData = Seq(
844+
toTimestampNTZ("1712-4-29T06:23:49.12")
845+
),
846+
extractFunc = row => {
847+
row.get(0).asInstanceOf[LocalDateTime].toString
848+
.replaceAll("0+$", "") // remove trailing zeros
849+
},
850+
expectedStats = Seq("1712-04-29T06:23:49.12"),
851+
dataSkippingTestParams = Seq(
852+
DataSkippingTestParam(
853+
predicate = "col2 > TIMESTAMP'1712-04-29T06:23:49.11'",
854+
expectedFilesReadNum = 1,
855+
expectedFilesReadIndices = Set(1)
856+
)
857+
),
858+
mode = mode
859+
)
860+
}
861+
862+
private def toTimestamp(timestamp: String): Timestamp = {
863+
toJavaTimestamp(stringToTimestamp(UTF8String.fromString(timestamp),
864+
getZoneId(SQLConf.get.sessionLocalTimeZone)).get)
865+
}
866+
867+
private def toTimestampNTZ(timestampNTZ: String): LocalDateTime = {
868+
microsToLocalDateTime(
869+
stringToTimestampWithoutTimeZone(
870+
UTF8String.fromString(timestampNTZ)
871+
).get
872+
)
873+
}
874+
875+
private def timestamptzExtracter(row: Row, pattern: String): String = {
876+
val ts = row.getTimestamp(0).toLocalDateTime.atZone(
877+
getZoneId(TimeZone.getDefault.getID)
729878
)
879+
ts.withZoneSameInstant(getZoneId(SQLConf.get.sessionLocalTimeZone))
880+
.format(DateTimeFormatter.ofPattern(pattern))
881+
.replace("UTC", "+00:00")
882+
.replace("Z", "+00:00")
730883
}
731884
}
732885

iceberg/src/test/scala/org/apache/spark/sql/delta/commands/convert/IcebergStatsUtilsSuite.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class IcebergStatsUtilsSuite extends SparkFunSuite {
117117
assertResult(expectedStatsObj)(actualStatsObj)
118118
}
119119

120-
test("stats conversion from timestamp 64 is disabled") {
120+
test("stats conversion for decimal and timestamp") {
121121
val icebergSchema = new Schema(10, Seq[NestedField](
122122
NestedField.required(1, "col_ts", TimestampType.withZone),
123123
NestedField.required(2, "col_tsnz", TimestampType.withoutZone),
@@ -152,9 +152,17 @@ class IcebergStatsUtilsSuite extends SparkFunSuite {
152152
assertResult(
153153
JsonUtils.fromJson[StatsObject](
154154
"""{"numRecords":1251,
155-
|"maxValues":{"col_decimal":9.99999},
156-
|"minValues":{"col_decimal":3.44141},
157-
|"nullCount":{"col_decimal":31}}""".stripMargin))(
155+
|"maxValues":{
156+
| "col_ts":"2024-12-17T00:22:59+00:00",
157+
| "col_tsnz":"2024-12-17T00:22:59",
158+
| "col_decimal":9.99999
159+
| },
160+
|"minValues":{
161+
| "col_ts":"2024-12-16T23:32:59+00:00",
162+
| "col_tsnz":"2024-12-16T23:32:59",
163+
| "col_decimal":3.44141
164+
| },
165+
|"nullCount":{"col_ts":20,"col_tsnz":10,"col_decimal":31}}""".stripMargin))(
158166
JsonUtils.fromJson[StatsObject](deltaStats))
159167
}
160168

0 commit comments

Comments
 (0)