Skip to content

Commit cb36530

Browse files
LukasRupprechtLukas Rupprechtallisonport-db
authored
[Spark] Fixes options-based time travel with timestamps (delta-io#4706)
#### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR fixes a bug that occurs when trying to time travel on a Delta table using Spark's dataframe API and timestamps, e.g. through spark.read.option("timestampAsOf", "some-timestamp").table("some-table"). For some input timestamps, this can lead to the time travel specification being defined twice (once through the DeltaTableV2 and once through the options), which is not allowed. To work around this problem, there is a check for allowing two time travel specifications in case they are equal. However, in cases where the timestamp in the options is specified with microsecond precision or without any milliseconds, the equality check fails because the time travel specification in the DeltaTableV2 adds `.0` milliseconds by default but drops the microseconds and so the timestamp in the time travel specification stored as part of DeltaTableV2 is different from the one defined in the options. The fix in this PR normalizes the comparison to ensure that timestamps from both specifications are compared correctly. ## How was this patch tested? Added unit tests. ## Does this PR introduce _any_ user-facing changes? No --------- Co-authored-by: Lukas Rupprecht <[email protected]> Co-authored-by: Allison Portis <[email protected]>
1 parent 449f07e commit cb36530

File tree

5 files changed

+44
-1
lines changed

5 files changed

+44
-1
lines changed

spark/src/main/scala-spark-3.5/shims/DeltaTimeTravelSpecShims.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.apache.spark.sql.delta
1818

19+
import org.apache.spark.sql.SparkSession
20+
1921
object DeltaTimeTravelSpecShims {
2022

2123
/**
@@ -36,6 +38,7 @@ object DeltaTimeTravelSpecShims {
3638
* @param newSpecOpt: The new [[DeltaTimeTravelSpec]] to be applied to the table
3739
*/
3840
def validateTimeTravelSpec(
41+
spark: SparkSession,
3942
currSpecOpt: Option[DeltaTimeTravelSpec],
4043
newSpecOpt: Option[DeltaTimeTravelSpec]): Unit = {
4144
if (currSpecOpt.nonEmpty && newSpecOpt.nonEmpty) {

spark/src/main/scala-spark-master/shims/DeltaTimeTravelSpecShims.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.apache.spark.sql.delta
1818

19+
import org.apache.spark.sql.SparkSession
20+
1921
object DeltaTimeTravelSpecShims {
2022

2123
/**
@@ -36,10 +38,13 @@ object DeltaTimeTravelSpecShims {
3638
* @param newSpecOpt: The new [[DeltaTimeTravelSpec]] to be applied to the table
3739
*/
3840
def validateTimeTravelSpec(
41+
spark: SparkSession,
3942
currSpecOpt: Option[DeltaTimeTravelSpec],
4043
newSpecOpt: Option[DeltaTimeTravelSpec]): Unit = (currSpecOpt, newSpecOpt) match {
4144
case (Some(currSpec), Some(newSpec))
42-
if currSpec.version != newSpec.version || currSpec.timestamp != newSpec.timestamp =>
45+
if currSpec.version != newSpec.version ||
46+
currSpec.getTimestampOpt(spark.sessionState.conf).map(_.getTime) !=
47+
newSpec.getTimestampOpt(spark.sessionState.conf).map(_.getTime) =>
4348
throw DeltaErrors.multipleTimeTravelSyntaxUsed
4449
case _ =>
4550
}

spark/src/main/scala/org/apache/spark/sql/delta/DeltaTimeTravelSpec.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ case class DeltaTimeTravelSpec(
7070
}
7171
DateTimeUtils.toJavaTimestamp(castResult.asInstanceOf[java.lang.Long])
7272
}
73+
74+
/**
75+
* Compute the timestamp to use for time travelling the relation from the given expression for
76+
* the given time zone if this spec has a timestamp defined.
77+
*/
78+
def getTimestampOpt(conf: SQLConf): Option[Timestamp] = {
79+
timestamp.map(_ => getTimestamp(conf))
80+
}
7381
}
7482

7583
object DeltaTimeTravelSpec {

spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ class DeltaTableV2 private[delta](
332332

333333
// Spark 4.0 and 3.5 handle time travel options differently.
334334
DeltaTimeTravelSpecShims.validateTimeTravelSpec(
335+
spark,
335336
currSpecOpt = timeTravelOpt,
336337
newSpecOpt = ttSpec)
337338

spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,32 @@ class DeltaTimeTravelSuite extends QueryTest
822822
Row(1) :: Row(1) :: Row(2) :: Nil)
823823
}
824824
}
825+
826+
test("Dataframe-based time travel works with different timestamp precisions") {
827+
val tblName = "test_tab"
828+
withTable(tblName) {
829+
sql(s"CREATE TABLE spark_catalog.default.$tblName (a int) USING DELTA")
830+
// Ensure that the current timestamp is different from the one in the table.
831+
Thread.sleep(1000)
832+
// Microsecond precision timestamp.
833+
val current_time_micros = spark.sql("SELECT current_timestamp() as ts")
834+
.select($"ts".cast("string"))
835+
.head().getString(0)
836+
// Millisecond precision timestamp.
837+
val current_time_millis = new Timestamp(System.currentTimeMillis())
838+
// Second precision timestamp.
839+
val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
840+
val current_time_seconds = sdf.format(new java.sql.Timestamp(System.currentTimeMillis()))
841+
842+
sql(s"INSERT INTO spark_catalog.default.$tblName VALUES (1)")
843+
checkAnswer(spark.read.option("timestampAsOf", current_time_micros)
844+
.table(s"spark_catalog.default.$tblName"), Seq.empty)
845+
checkAnswer(spark.read.option("timestampAsOf", current_time_millis.toString)
846+
.table(s"spark_catalog.default.$tblName"), Seq.empty)
847+
checkAnswer(spark.read.option("timestampAsOf", current_time_seconds)
848+
.table(s"spark_catalog.default.$tblName"), Seq.empty)
849+
}
850+
}
825851
}
826852

827853
class DeltaTimeTravelWithCatalogOwnedBatch1Suite extends DeltaTimeTravelSuite {

0 commit comments

Comments
 (0)