Skip to content

Commit 029829c

Browse files
[Delta] Upgrade to User-Facing Error with Proper Error Class in Time Travel (delta-io#4500)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Upgraded TemporallyUnstableInputException to user-facing error with proper error class. Reused the existing error class `DELTA_TIMESTAMP_GREATER_THAN_COMMIT`, which is currently being used by streaming reads. Make `DeltaErrors.timestampGreaterThanLatestCommit` also return `TemporallyUnstableInputException`. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Updated existing UTs ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No.
1 parent 5ccb599 commit 029829c

File tree

6 files changed

+74
-38
lines changed

6 files changed

+74
-38
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,8 +430,8 @@ class DeltaAnalysis(session: SparkSession)
430430
)
431431
case tUnstable: TemporallyUnstableInputException =>
432432
throw DeltaErrors.restoreTimestampGreaterThanLatestException(
433-
tUnstable.userTimestamp.toString,
434-
tUnstable.commitTs.toString
433+
tUnstable.userTs.toString,
434+
tUnstable.lastCommitTs.toString
435435
)
436436
}
437437
// TODO: Fetch the table version from deltaLog.update().version to guarantee freshness.

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,13 +1497,10 @@ trait DeltaErrorsBase
14971497
""".stripMargin)
14981498

14991499
def timestampGreaterThanLatestCommit(
1500-
userTimestamp: java.sql.Timestamp,
1501-
commitTs: java.sql.Timestamp,
1502-
timestampString: String): Throwable = {
1503-
new DeltaAnalysisException(
1504-
errorClass = "DELTA_TIMESTAMP_GREATER_THAN_COMMIT",
1505-
messageParameters = Array(s"$userTimestamp", s"$commitTs", timestampString)
1506-
)
1500+
userTs: java.sql.Timestamp,
1501+
lastCommitTs: java.sql.Timestamp,
1502+
maximumTsStr: String): Throwable = {
1503+
TemporallyUnstableInputException(userTs, lastCommitTs, maximumTsStr)
15071504
}
15081505

15091506
def timestampInvalid(expr: Expression): Throwable = {
@@ -1521,15 +1518,12 @@ trait DeltaErrorsBase
15211518
}
15221519

15231520
case class TemporallyUnstableInputException(
1524-
userTimestamp: java.sql.Timestamp,
1525-
commitTs: java.sql.Timestamp,
1526-
timestampString: String,
1527-
commitVersion: Long) extends AnalysisException(
1528-
s"""The provided timestamp: $userTimestamp is after the latest commit timestamp of
1529-
|$commitTs. If you wish to query this version of the table, please either provide
1530-
|the version with "VERSION AS OF $commitVersion" or use the exact timestamp
1531-
|of the last commit: "TIMESTAMP AS OF '$timestampString'".
1532-
""".stripMargin)
1521+
userTs: java.sql.Timestamp,
1522+
lastCommitTs: java.sql.Timestamp,
1523+
maximumTsStr: String)
1524+
extends DeltaAnalysisException(
1525+
errorClass = "DELTA_TIMESTAMP_GREATER_THAN_COMMIT",
1526+
messageParameters = Array(s"$userTs", s"$lastCommitTs", maximumTsStr))
15331527

15341528
def restoreVersionNotExistException(
15351529
userVersion: Long,

spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,7 @@ class DeltaHistoryManager(
327327
throw DeltaErrors.TimestampEarlierThanCommitRetentionException(timestamp, commitTs, tsString)
328328
} else if (commit.version == latestVersion && !canReturnLastCommit) {
329329
if (commit.timestamp < time) {
330-
throw DeltaErrors.TemporallyUnstableInputException(
331-
timestamp, commitTs, tsString, commit.version)
330+
throw DeltaErrors.timestampGreaterThanLatestCommit(timestamp, commitTs, tsString)
332331
}
333332
}
334333
commit

spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -368,17 +368,31 @@ trait DeltaTimeTravelTests extends QueryTest
368368
.select($"ts".cast("string")).as[String].collect()
369369
.map(i => s"'$i'")
370370

371-
val e1 = intercept[AnalysisException] {
371+
val e1 = intercept[DeltaErrors.TemporallyUnstableInputException] {
372372
sql(s"select count(*) from ${timestampAsOf(tblName, ts(0))}").collect()
373373
}
374-
assert(e1.getMessage.contains("VERSION AS OF 0"))
375-
assert(e1.getMessage.contains("TIMESTAMP AS OF '2018-10-24 14:14:18'"))
374+
checkError(
375+
e1,
376+
"DELTA_TIMESTAMP_GREATER_THAN_COMMIT",
377+
sqlState = "42816",
378+
parameters = Map(
379+
"providedTimestamp" -> "2018-10-24 14:24:18.0",
380+
"tableName" -> "2018-10-24 14:14:18.0",
381+
"maximumTimestamp" -> "2018-10-24 14:14:18")
382+
)
376383

377-
val e2 = intercept[AnalysisException] {
384+
val e2 = intercept[DeltaErrors.TemporallyUnstableInputException] {
378385
sql(s"select count(*) from ${timestampAsOf(tblName, start + 10.minutes)}").collect()
379386
}
380-
assert(e2.getMessage.contains("VERSION AS OF 0"))
381-
assert(e2.getMessage.contains("TIMESTAMP AS OF '2018-10-24 14:14:18'"))
387+
checkError(
388+
e2,
389+
"DELTA_TIMESTAMP_GREATER_THAN_COMMIT",
390+
sqlState = "42816",
391+
parameters = Map(
392+
"providedTimestamp" -> "2018-10-24 14:24:18.0",
393+
"tableName" -> "2018-10-24 14:14:18.0",
394+
"maximumTimestamp" -> "2018-10-24 14:14:18")
395+
)
382396

383397
checkAnswer(
384398
sql(s"select count(*) from ${timestampAsOf(tblName, "'2018-10-24 14:14:18'")}"),

spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,19 @@ class DeltaTimeTravelSuite extends QueryTest
209209
assert(history.getActiveCommitAtTime(start + (i * 20 + 10).minutes, true).version === i)
210210
}
211211

212-
val e = intercept[AnalysisException] {
212+
val e = intercept[DeltaErrors.TemporallyUnstableInputException] {
213213
// This is 20 minutes after the last commit
214214
history.getActiveCommitAtTime(start + 200.minutes, false)
215215
}
216-
assert(e.getMessage.contains("after the latest commit timestamp"))
216+
checkError(
217+
e,
218+
"DELTA_TIMESTAMP_GREATER_THAN_COMMIT",
219+
sqlState = "42816",
220+
parameters = Map(
221+
"providedTimestamp" -> "2018-10-24 17:34:18.0",
222+
"tableName" -> "2018-10-24 17:14:18.0",
223+
"maximumTimestamp" -> "2018-10-24 17:14:18")
224+
)
217225
assert(history.getActiveCommitAtTime(start + 180.minutes, true).version === 9)
218226

219227
val e2 = intercept[AnalysisException] {
@@ -522,18 +530,32 @@ class DeltaTimeTravelSuite extends QueryTest
522530
// Simulate getting the timestamp directly from Spark SQL
523531
val ts = getSparkFormattedTimestamps(start + 10.minutes)
524532

525-
val e1 = intercept[AnalysisException] {
533+
val e1 = intercept[DeltaErrors.TemporallyUnstableInputException] {
526534
spark.read.format("delta").option("timestampAsOf", ts.head).load(tblLoc).collect()
527535
}
528-
assert(e1.getMessage.contains("VERSION AS OF 0"))
529-
assert(e1.getMessage.contains("TIMESTAMP AS OF '2018-10-24 14:14:18'"))
536+
checkError(
537+
e1,
538+
"DELTA_TIMESTAMP_GREATER_THAN_COMMIT",
539+
sqlState = "42816",
540+
parameters = Map(
541+
"providedTimestamp" -> "2018-10-24 14:24:18.0",
542+
"tableName" -> "2018-10-24 14:14:18.0",
543+
"maximumTimestamp" -> "2018-10-24 14:14:18")
544+
)
530545

531-
val e2 = intercept[AnalysisException] {
546+
val e2 = intercept[DeltaErrors.TemporallyUnstableInputException] {
532547
spark.read.format("delta").load(identifierWithTimestamp(tblLoc, start + 10.minutes))
533548
.collect()
534549
}
535-
assert(e2.getMessage.contains("VERSION AS OF 0"))
536-
assert(e2.getMessage.contains("TIMESTAMP AS OF '2018-10-24 14:14:18'"))
550+
checkError(
551+
e2,
552+
"DELTA_TIMESTAMP_GREATER_THAN_COMMIT",
553+
sqlState = "42816",
554+
parameters = Map(
555+
"providedTimestamp" -> "2018-10-24 14:24:18.0",
556+
"tableName" -> "2018-10-24 14:14:18.0",
557+
"maximumTimestamp" -> "2018-10-24 14:14:18")
558+
)
537559

538560
checkAnswer(
539561
spark.read.format("delta").option("timestampAsOf", "2018-10-24 14:14:18")
@@ -771,9 +793,16 @@ class DeltaTimeTravelSuite extends QueryTest
771793
val ex2 = intercept[DeltaErrors.TemporallyUnstableInputException] {
772794
spark.sql(s"SELECT * from $tableName FOR TIMESTAMP AS OF '2018-10-24 20:14:18'")
773795
}
774-
assert(ex2.getMessage contains
775-
"The provided timestamp: 2018-10-24 20:14:18.0 is after the " +
776-
"latest commit timestamp of\n2018-10-24 14:34:18.0")
796+
797+
checkError(
798+
ex2,
799+
"DELTA_TIMESTAMP_GREATER_THAN_COMMIT",
800+
sqlState = "42816",
801+
parameters = Map(
802+
"providedTimestamp" -> "2018-10-24 20:14:18.0",
803+
"tableName" -> "2018-10-24 14:34:18.0",
804+
"maximumTimestamp" -> "2018-10-24 14:34:18")
805+
)
777806
}
778807
}
779808
}

spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ class InCommitTimestampSuite
654654
catalogTableOpt = None,
655655
canReturnLastCommit = false)
656656
}
657-
assert(e.getMessage.contains("The provided timestamp:") && e.getMessage.contains("is after"))
657+
assert(e.getMessage.contains("The provided timestamp") && e.getMessage.contains("is after"))
658658
}
659659
}
660660

0 commit comments

Comments
 (0)