Skip to content

Commit 9fa3259

Browse files
Compile with Spark master snapshot and fix compile errors
1 parent f71fef7 commit 9fa3259

File tree

17 files changed

+129
-64
lines changed

17 files changed

+129
-64
lines changed

build.sbt

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ val all_scala_versions = Seq(scala212, scala213)
3232
// sbt 'set default_scala_version := 2.13.8' [commands]
3333
// FIXME Why not use scalaVersion?
3434
val default_scala_version = settingKey[String]("Default Scala version")
35-
Global / default_scala_version := scala212
35+
Global / default_scala_version := scala213
36+
// TODO set scala only to 2.13 for spark but keep 2.12 for other projects?
3637

3738
// Dependent library versions
38-
val sparkVersion = "3.5.0"
39+
val sparkVersion = "4.0.0-SNAPSHOT"
3940
val flinkVersion = "1.16.1"
4041
val hadoopVersion = "3.3.4"
4142
val scalaTestVersion = "3.2.15"
@@ -117,11 +118,11 @@ lazy val spark = (project in file("spark"))
117118
"org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests",
118119
),
119120
// For adding staged Spark RC versions, Ex:
120-
// resolvers += "Apche Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/",
121+
resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/",
121122
Compile / packageBin / mappings := (Compile / packageBin / mappings).value ++
122123
listPythonFiles(baseDirectory.value.getParentFile / "python"),
123124

124-
Antlr4 / antlr4Version:= "4.9.3",
125+
Antlr4 / antlr4Version:= "4.13.1",
125126
Antlr4 / antlr4PackageName := Some("io.delta.sql.parser"),
126127
Antlr4 / antlr4GenListener := true,
127128
Antlr4 / antlr4GenVisitor := true,
@@ -344,6 +345,7 @@ val icebergSparkRuntimeArtifactName = {
344345
s"iceberg-spark-runtime-$expMaj.$expMin"
345346
}
346347

348+
/*
347349
lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar"))
348350
// delta-iceberg depends on delta-spark! So, we need to include it during our test.
349351
.dependsOn(spark % "test")
@@ -359,6 +361,7 @@ lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar"))
359361
"org.apache.spark" %% "spark-core" % sparkVersion % "test"
360362
)
361363
)
364+
*/
362365

363366
val deltaIcebergSparkIncludePrefixes = Seq(
364367
// We want everything from this package
@@ -371,6 +374,7 @@ val deltaIcebergSparkIncludePrefixes = Seq(
371374
"org/apache/spark/sql/delta/commands/convert/IcebergTable"
372375
)
373376

377+
/*
374378
// Build using: build/sbt clean icebergShaded/compile iceberg/compile
375379
// It will fail the first time, just re-run it.
376380
// scalastyle:off println
@@ -445,6 +449,7 @@ lazy val iceberg = (project in file("iceberg"))
445449
assemblyPackageScala / assembleArtifact := false
446450
)
447451
// scalastyle:on println
452+
*/
448453

449454
lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs")
450455

@@ -1114,7 +1119,8 @@ val createTargetClassesDir = taskKey[Unit]("create target classes dir")
11141119

11151120
// Don't use these groups for any other projects
11161121
lazy val sparkGroup = project
1117-
.aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing)
1122+
.aggregate(spark, contribs, storage, storageS3DynamoDB, sharing)
1123+
// .aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing)
11181124
.settings(
11191125
// crossScalaVersions must be set to Nil on the aggregating project
11201126
crossScalaVersions := Nil,

spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
522522
private def createUnresolvedTable(
523523
tableName: Seq[String],
524524
commandName: String): UnresolvedTable = {
525-
UnresolvedTable(tableName, commandName, relationTypeMismatchHint = None)
525+
UnresolvedTable(tableName, commandName)
526526
}
527527

528528
// Build the text of the CHECK constraint expression. The user-specified whitespace is in the

spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.annotation._
2727
import org.apache.spark.sql.SparkSession
2828
import org.apache.spark.sql.catalyst.TableIdentifier
2929
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LogicalPlan, ReplaceTable}
30+
import org.apache.spark.sql.catalyst.plans.logical.ColumnDefinition
3031
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3132
import org.apache.spark.sql.connector.expressions.Transform
3233
import org.apache.spark.sql.execution.SQLExecution
@@ -343,15 +344,15 @@ class DeltaTableBuilder private[tables](
343344
val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table)
344345
CreateTable(
345346
unresolvedTable,
346-
StructType(columns.toSeq),
347+
columns.map(ColumnDefinition.fromV1Column(_, spark.sessionState.sqlParser)).toSeq,
347348
partitioning,
348349
tableSpec,
349350
ifNotExists)
350351
case ReplaceTableOptions(orCreate) =>
351352
val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table)
352353
ReplaceTable(
353354
unresolvedTable,
354-
StructType(columns.toSeq),
355+
columns.map(ColumnDefinition.fromV1Column(_, spark.sessionState.sqlParser)).toSeq,
355356
partitioning,
356357
tableSpec,
357358
orCreate)

spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeltaUpdateTable.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
package org.apache.spark.sql.catalyst.plans.logical
1818

19-
import org.apache.spark.sql.AnalysisException
19+
import org.apache.spark.sql.delta.DeltaAnalysisException
20+
2021
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, ExtractValue, GetStructField}
2122

2223
/**
@@ -66,7 +67,7 @@ object DeltaUpdateTable {
6667

6768
def fail(extraMsg: String): Nothing = {
6869
val msg = Option(errMsg).map(_ + " - ").getOrElse("") + extraMsg
69-
throw new AnalysisException(msg)
70+
throw new DeltaAnalysisException(msg)
7071
}
7172

7273
def extractRecursively(expr: Expression): Seq[String] = expr match {

spark/src/main/scala/org/apache/spark/sql/delta/ColumnWithDefaultExprUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ object ColumnWithDefaultExprUtils extends DeltaLogging {
220220
incrementalExecution.currentBatchId,
221221
incrementalExecution.prevOffsetSeqMetadata,
222222
incrementalExecution.offsetSeqMetadata,
223-
incrementalExecution.watermarkPropagator
223+
incrementalExecution.watermarkPropagator,
224+
incrementalExecution.isFirstBatch
224225
)
225226
newIncrementalExecution.executedPlan // Force the lazy generation of execution plan
226227

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,14 @@ trait DeltaErrorsBase
245245
startPosition: Option[Int] = None,
246246
plan: Option[LogicalPlan] = None,
247247
cause: Option[Throwable] = None): AnalysisException = {
248-
new ExtendedAnalysisException(msg, line, startPosition, plan, cause)
248+
if (plan.isEmpty) {
249+
new DeltaAnalysisException(msg, line, startPosition, cause)
250+
} else {
251+
new ExtendedAnalysisException(
252+
new DeltaAnalysisException(msg, line, startPosition, cause),
253+
plan.get
254+
)
255+
}
249256
}
250257

251258
def notNullColumnMissingException(constraint: Constraints.NotNull): Throwable = {
@@ -283,7 +290,7 @@ trait DeltaErrorsBase
283290
}
284291

285292
def invalidConstraintName(name: String): AnalysisException = {
286-
new AnalysisException(s"Cannot use '$name' as the name of a CHECK constraint.")
293+
new DeltaAnalysisException(s"Cannot use '$name' as the name of a CHECK constraint.")
287294
}
288295

289296
def nonexistentConstraint(constraintName: String, tableName: String): AnalysisException = {
@@ -1084,7 +1091,7 @@ trait DeltaErrorsBase
10841091
}
10851092

10861093
def bloomFilterInvalidParameterValueException(message: String): Throwable = {
1087-
new AnalysisException(
1094+
new DeltaAnalysisException(
10881095
s"Cannot create bloom filter index, invalid parameter value: $message")
10891096
}
10901097

@@ -1209,7 +1216,7 @@ trait DeltaErrorsBase
12091216
def prettyMap(m: Map[String, String]): String = {
12101217
m.map(e => s"${e._1}=${e._2}").mkString("[", ", ", "]")
12111218
}
1212-
new AnalysisException(
1219+
new DeltaAnalysisException(
12131220
s"""You are trying to convert a table which already has a delta log where the table
12141221
|properties in the catalog don't match the configuration in the delta log.
12151222
|Table properties in catalog: ${prettyMap(tableProperties)}
@@ -1387,7 +1394,7 @@ trait DeltaErrorsBase
13871394
def restoreTimestampBeforeEarliestException(
13881395
userTimestamp: String,
13891396
earliestTimestamp: String): Throwable = {
1390-
new AnalysisException(
1397+
new DeltaAnalysisException(
13911398
s"Cannot restore table to timestamp ($userTimestamp) as it is before the earliest version " +
13921399
s"available. Please use a timestamp after ($earliestTimestamp)"
13931400
)
@@ -1566,7 +1573,7 @@ trait DeltaErrorsBase
15661573
}
15671574

15681575
def viewNotSupported(operationName: String): Throwable = {
1569-
new AnalysisException(s"Operation $operationName can not be performed on a view")
1576+
new DeltaAnalysisException(s"Operation $operationName can not be performed on a view")
15701577
}
15711578

15721579
def postCommitHookFailedException(
@@ -1720,7 +1727,7 @@ trait DeltaErrorsBase
17201727
}
17211728

17221729
def generatedColumnsNonDeterministicExpression(expr: Expression): Throwable = {
1723-
new AnalysisException(
1730+
new DeltaAnalysisException(
17241731
s"Found ${expr.sql}. A generated column cannot use a non deterministic expression")
17251732
}
17261733

@@ -2056,7 +2063,7 @@ trait DeltaErrorsBase
20562063
columnName: String,
20572064
constraints: Map[String, String]): Throwable = {
20582065
val plural = if (constraints.size > 1) "s" else ""
2059-
new AnalysisException(
2066+
new DeltaAnalysisException(
20602067
s"""
20612068
|Cannot $operation column $columnName because this column is referenced by the following
20622069
| check constraint$plural:\n\t${constraints.mkString("\n\t")}
@@ -2068,7 +2075,7 @@ trait DeltaErrorsBase
20682075
columnName: String,
20692076
fields: Seq[StructField]): Throwable = {
20702077
val plural = if (fields.size > 1) "s" else ""
2071-
new AnalysisException(
2078+
new DeltaAnalysisException(
20722079
s"""
20732080
|Cannot $operation column $columnName because this column is referenced by the following
20742081
| generated column$plural:\n\t${fields.map(_.name).mkString("\n\t")}
@@ -2397,7 +2404,7 @@ trait DeltaErrorsBase
23972404
hasStart: Boolean,
23982405
hasStep: Boolean,
23992406
hasInsert: Boolean): Throwable = {
2400-
new AnalysisException(s"Inconsistent IDENTITY metadata for column $colName " +
2407+
new DeltaAnalysisException(s"Inconsistent IDENTITY metadata for column $colName " +
24012408
s"detected: $hasStart, $hasStep, $hasInsert")
24022409
}
24032410

@@ -3417,7 +3424,7 @@ class MetadataMismatchErrorBuilder {
34173424
}
34183425

34193426
def finalizeAndThrow(conf: SQLConf): Unit = {
3420-
throw new AnalysisException(bits.mkString("\n"))
3427+
throw new DeltaAnalysisException(bits.mkString("\n"))
34213428
}
34223429
}
34233430

spark/src/main/scala/org/apache/spark/sql/delta/DeltaSharedExceptions.scala

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,75 @@ import scala.collection.JavaConverters._
2020

2121
import org.antlr.v4.runtime.ParserRuleContext
2222

23+
import org.apache.spark.QueryContext
2324
import org.apache.spark.sql.AnalysisException
2425
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
2526
import org.apache.spark.sql.catalyst.trees.Origin
2627

28+
// Spark's AnalysisException constructor is no longer public. This means all calls
29+
// with AnalysisException(msg: String) are invalid.
30+
// For now we adjust DeltaAnalysisException to expose a constructor that allows this. Otherwise,
31+
// we should consider changing all of those exceptions to use an errorClass with the public
32+
// AnalysisException constructors (this can just be INTERNAL_ERROR)
2733
class DeltaAnalysisException(
34+
message: String,
35+
line: Option[Int] = None,
36+
startPosition: Option[Int] = None,
37+
cause: Option[Throwable] = None,
38+
errorClass: Option[String] = None,
39+
messageParameters: Map[String, String] = Map.empty,
40+
context: Array[QueryContext] = Array.empty)
41+
extends AnalysisException(
42+
message,
43+
line,
44+
startPosition,
45+
cause,
46+
errorClass,
47+
messageParameters,
48+
context)
49+
with DeltaThrowable {
50+
51+
def this(
52+
errorClass: String,
53+
messageParameters: Array[String]) =
54+
this(
55+
message = DeltaThrowableHelper.getMessage(errorClass, messageParameters),
56+
messageParameters = DeltaThrowableHelper
57+
.getParameterNames(errorClass, errorSubClass = null)
58+
.zip(messageParameters)
59+
.toMap,
60+
errorClass = Some(errorClass)
61+
)
62+
63+
def this(
2864
errorClass: String,
2965
messageParameters: Array[String],
30-
cause: Option[Throwable] = None,
31-
origin: Option[Origin] = None)
32-
extends AnalysisException(
33-
message = DeltaThrowableHelper.getMessage(errorClass, messageParameters),
34-
messageParameters = DeltaThrowableHelper
66+
cause: Option[Throwable]) =
67+
this(
68+
message = DeltaThrowableHelper.getMessage(errorClass, messageParameters),
69+
messageParameters = DeltaThrowableHelper
3570
.getParameterNames(errorClass, errorSubClass = null)
3671
.zip(messageParameters)
3772
.toMap,
38-
errorClass = Some(errorClass),
39-
line = origin.flatMap(_.line),
40-
startPosition = origin.flatMap(_.startPosition),
41-
context = origin.map(_.getQueryContext).getOrElse(Array.empty),
42-
cause = cause)
43-
with DeltaThrowable {
44-
def getMessageParametersArray: Array[String] = messageParameters
73+
errorClass = Some(errorClass),
74+
cause = cause)
75+
76+
def this(
77+
errorClass: String,
78+
messageParameters: Array[String],
79+
cause: Option[Throwable],
80+
origin: Option[Origin]) =
81+
this(
82+
message = DeltaThrowableHelper.getMessage(errorClass, messageParameters),
83+
messageParameters = DeltaThrowableHelper
84+
.getParameterNames(errorClass, errorSubClass = null)
85+
.zip(messageParameters)
86+
.toMap,
87+
errorClass = Some(errorClass),
88+
line = origin.flatMap(_.line),
89+
startPosition = origin.flatMap(_.startPosition),
90+
context = origin.map(_.getQueryContext).getOrElse(Array.empty),
91+
cause = cause)
4592
}
4693

4794
class DeltaIllegalArgumentException(

spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ object UnresolvedDeltaPathOrIdentifier {
617617
(path, tableIdentifier) match {
618618
case (Some(p), None) => UnresolvedPathBasedDeltaTable(p, Map.empty, cmd)
619619
case (None, Some(t)) =>
620-
UnresolvedTable(t.nameParts, cmd, None)
620+
UnresolvedTable(t.nameParts, cmd)
621621
case _ => throw new IllegalArgumentException(
622622
s"Exactly one of path or tableIdentifier must be provided to $cmd")
623623
}
@@ -639,7 +639,7 @@ object UnresolvedPathOrIdentifier {
639639
cmd: String): LogicalPlan = {
640640
(path, tableIdentifier) match {
641641
case (_, Some(t)) =>
642-
UnresolvedTable(t.nameParts, cmd, None)
642+
UnresolvedTable(t.nameParts, cmd)
643643
case (Some(p), None) => UnresolvedPathBasedTable(p, Map.empty, cmd)
644644
case _ => throw new IllegalArgumentException(
645645
s"At least one of path or tableIdentifier must be provided to $cmd")

spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ object DeltaTableV2 {
329329

330330
/** Resolves a table identifier into a DeltaTableV2, leveraging standard v2 table resolution. */
331331
def apply(spark: SparkSession, tableId: TableIdentifier, cmd: String): DeltaTableV2 = {
332-
resolve(spark, UnresolvedTable(tableId.nameParts, cmd, None), cmd)
332+
resolve(spark, UnresolvedTable(tableId.nameParts, cmd), cmd)
333333
}
334334

335335
/** Applies standard v2 table resolution to an unresolved Delta table plan node */

spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ trait MergeIntoCommandBase extends LeafRunnableCommand
121121
// queries that add void columns.
122122
val newNullColumn = SchemaUtils.findNullTypeColumn(migratedSchema.get)
123123
if (newNullColumn.isDefined) {
124-
throw new AnalysisException(
124+
throw new DeltaAnalysisException(
125125
s"""Cannot add column '${newNullColumn.get}' with type 'void'. Please explicitly specify a
126126
|non-void type.""".stripMargin.replaceAll("\n", " ")
127127
)

0 commit comments

Comments
 (0)