diff --git a/build.sbt b/build.sbt index de341c97bbe..fa34306f3b3 100644 --- a/build.sbt +++ b/build.sbt @@ -56,8 +56,10 @@ Global / default_scala_version := scala212 val LATEST_RELEASED_SPARK_VERSION = "3.5.7" val SPARK_MASTER_VERSION = "4.0.2-SNAPSHOT" val sparkVersion = settingKey[String]("Spark version") +val internalModuleNames = settingKey[Set[String]]("Internal module artifact names to exclude from POM") spark / sparkVersion := getSparkVersion() -kernelSpark / sparkVersion := getSparkVersion() +sparkV1 / sparkVersion := getSparkVersion() +sparkV2 / sparkVersion := getSparkVersion() connectCommon / sparkVersion := getSparkVersion() connectClient / sparkVersion := getSparkVersion() connectServer / sparkVersion := getSparkVersion() @@ -333,7 +335,7 @@ lazy val connectClient = (project in file("spark-connect/client")) if (!distributionDir.exists()) { val jarsDir = distributionDir / "jars" IO.createDirectory(jarsDir) - // Create symlinks for all dependencies. + // Create symlinks for all dependencies serverClassPath.distinct.foreach { entry => val jarFile = entry.data.toPath val linkedJarFile = jarsDir / entry.data.getName @@ -425,17 +427,29 @@ lazy val deltaSuiteGenerator = (project in file("spark/delta-suite-generator")) Test / baseDirectory := (ThisBuild / baseDirectory).value, ) -lazy val spark = (project in file("spark")) +// ============================================================ +// Module 1: sparkV1 (prod code only, no tests) +// ============================================================ +lazy val sparkV1 = (project in file("spark")) .dependsOn(storage) .enablePlugins(Antlr4Plugin) .disablePlugins(JavaFormatterPlugin, ScalafmtPlugin) .settings ( - name := "delta-spark", + name := "delta-spark-v1", commonSettings, scalaStyleSettings, - sparkMimaSettings, - releaseSettings, + skipReleaseSettings, // Internal module - not published to Maven crossSparkSettings(), + + // Export as JAR instead of classes directory. This prevents dependent projects + // (e.g., connectServer) from seeing multiple 'classes' directories with the same + // name in their classpath, which would cause FileAlreadyExistsException. + exportJars := true, + + // Tests are compiled in the final 'spark' module to avoid circular dependencies + Test / sources := Seq.empty, + Test / resources := Seq.empty, + libraryDependencies ++= Seq( // Adding test classifier seems to break transitive resolution of the core dependencies "org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided", @@ -462,6 +476,210 @@ lazy val spark = (project in file("spark")) Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, + // Hack to avoid errors related to missing repo-root/target/scala-2.12/classes/ + createTargetClassesDir := { + val dir = baseDirectory.value.getParentFile / "target" / "scala-2.12" / "classes" + Files.createDirectories(dir.toPath) + }, + Compile / compile := ((Compile / compile) dependsOn createTargetClassesDir).value, + // Generate the package object to provide the version information in runtime. + Compile / sourceGenerators += Def.task { + val file = (Compile / sourceManaged).value / "io" / "delta" / "package.scala" + IO.write(file, + s"""package io + | + |package object delta { + | val VERSION = "${version.value}" + |} + |""".stripMargin) + Seq(file) + }, + ) + +// ============================================================ +// Module 2: sparkV1Shaded (v1 without DeltaLog for v2 dependency) +// ============================================================ +lazy val sparkV1Shaded = (project in file("spark-v1-shaded")) + .dependsOn(sparkV1) + .dependsOn(storage) + .settings( + name := "delta-spark-v1-shaded", + commonSettings, + skipReleaseSettings, // Internal module - not published to Maven + exportJars := true, // Export as JAR to avoid classpath conflicts + + // No source code - just repackage sparkV1 without DeltaLog classes + Compile / sources := Seq.empty, + Test / sources := Seq.empty, + + // Repackage sparkV1 jar but exclude DeltaLog and related classes + Compile / packageBin / mappings := { + val v1Mappings = (sparkV1 / Compile / packageBin / mappings).value + + // Filter out DeltaLog, Snapshot, OptimisticTransaction classes + v1Mappings.filterNot { case (file, path) => + path.contains("org/apache/spark/sql/delta/DeltaLog") || + path.contains("org/apache/spark/sql/delta/Snapshot") || + path.contains("org/apache/spark/sql/delta/OptimisticTransaction") + } + }, + ) + +// ============================================================ +// Module 3: sparkV2 (kernel-spark based, depends on v1-shaded) +// ============================================================ +lazy val sparkV2 = (project in file("kernel-spark")) + .dependsOn(sparkV1Shaded) + .dependsOn(kernelApi) + .dependsOn(kernelDefaults) + .dependsOn(goldenTables % "test") + .settings( + name := "delta-spark-v2", + commonSettings, + javafmtCheckSettings, + skipReleaseSettings, // Internal module - not published to Maven + exportJars := true, // Export as JAR to avoid classpath conflicts + + Test / javaOptions ++= Seq("-ea"), + libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", + + // Test dependencies + "org.junit.jupiter" % "junit-jupiter-api" % "5.8.2" % "test", + "org.junit.jupiter" % "junit-jupiter-engine" % "5.8.2" % "test", + "org.junit.jupiter" % "junit-jupiter-params" % "5.8.2" % "test", + "net.aichler" % "jupiter-interface" % "0.11.1" % "test", + // Spark test classes for Scala/Java test utilities + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests", + // ScalaTest for test utilities (needed by Spark test classes) + "org.scalatest" %% "scalatest" % scalaTestVersion % "test" + ), + Test / testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a") + ) + + +// ============================================================ +// Module 4: delta-spark (final published module - combined v1+v2) +// ============================================================ +lazy val spark = (project in file("spark-combined")) + .dependsOn(sparkV1) + .dependsOn(sparkV2) + .dependsOn(storage) + .disablePlugins(JavaFormatterPlugin, ScalafmtPlugin) + .settings ( + name := "delta-spark", + commonSettings, + scalaStyleSettings, + sparkMimaSettings, + releaseSettings, // Published to Maven as delta-spark.jar + + // Set Test baseDirectory before crossSparkSettings() so it uses the correct directory + Test / baseDirectory := (sparkV1 / baseDirectory).value, + + // Test sources from spark/ directory (sparkV1's directory) + // MUST be set BEFORE crossSparkSettings() to avoid overwriting version-specific directories + Test / unmanagedSourceDirectories := { + val sparkDir = (sparkV1 / baseDirectory).value + Seq( + sparkDir / "src" / "test" / "scala", + sparkDir / "src" / "test" / "java" + ) + }, + Test / unmanagedResourceDirectories := Seq( + (sparkV1 / baseDirectory).value / "src" / "test" / "resources" + ), + + crossSparkSettings(), + + // MiMa should use the generated JAR (not classDirectory) because we merge classes at package time + mimaCurrentClassfiles := (Compile / packageBin).value, + + // Export as JAR to dependent projects (e.g., connectServer, connectClient). + // This prevents classpath conflicts from internal module 'classes' directories. + exportJars := true, + + // Internal module artifact names to exclude from published POM + internalModuleNames := Set("delta-spark-v1", "delta-spark-v1-shaded", "delta-spark-v2"), + + // Merge classes from internal modules (v1, v2, storage) into final JAR + // kernel modules are kept as separate JARs and listed as dependencies in POM + Compile / packageBin / mappings ++= { + val log = streams.value.log + + // Collect mappings from internal modules + val v1Mappings = (sparkV1 / Compile / packageBin / mappings).value + val v2Mappings = (sparkV2 / Compile / packageBin / mappings).value + val storageMappings = (storage / Compile / packageBin / mappings).value + + // Include Python files (from spark/ directory) + val pythonMappings = listPythonFiles(baseDirectory.value.getParentFile / "python") + + // Combine all mappings + val allMappings = v1Mappings ++ v2Mappings ++ storageMappings ++ pythonMappings + + // Detect duplicate class files + val classFiles = allMappings.filter(_._2.endsWith(".class")) + val duplicates = classFiles.groupBy(_._2).filter(_._2.size > 1) + + if (duplicates.nonEmpty) { + log.error(s"Found ${duplicates.size} duplicate class(es) in packageBin mappings:") + duplicates.foreach { case (className, entries) => + log.error(s" - $className:") + entries.foreach { case (file, path) => log.error(s" from: $file") } + } + sys.error("Duplicate classes found. This indicates overlapping code between sparkV1, sparkV2, and storage modules.") + } + + allMappings.distinct + }, + + // Exclude internal modules from published POM + pomPostProcess := { node => + val internalModules = internalModuleNames.value + import scala.xml._ + import scala.xml.transform._ + new RuleTransformer(new RewriteRule { + override def transform(n: Node): Seq[Node] = n match { + case e: Elem if e.label == "dependency" => + val artifactId = (e \ "artifactId").text + if (internalModules.contains(artifactId)) Seq.empty else Seq(n) + case _ => Seq(n) + } + }).transform(node).head + }, + + pomIncludeRepository := { _ => false }, + + // Filter internal modules from project dependencies + // This works together with pomPostProcess to ensure internal modules + // (sparkV1, sparkV2, sparkV1Shaded) are not listed as dependencies in POM + projectDependencies := { + val internalModules = internalModuleNames.value + projectDependencies.value.filterNot(dep => internalModules.contains(dep.name)) + }, + + libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", + "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided", + + "org.scalatest" %% "scalatest" % scalaTestVersion % "test", + "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", + "junit" % "junit" % "4.13.2" % "test", + "com.novocode" % "junit-interface" % "0.11" % "test", + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests", + "org.mockito" % "mockito-inline" % "4.11.0" % "test", + ), + Test / testOptions += Tests.Argument("-oDF"), Test / testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), @@ -485,24 +703,6 @@ lazy val spark = (project in file("spark")) // Required for testing table features see https://github.com/delta-io/delta/issues/1602 Test / envVars += ("DELTA_TESTING", "1"), - // Hack to avoid errors related to missing repo-root/target/scala-2.12/classes/ - createTargetClassesDir := { - val dir = baseDirectory.value.getParentFile / "target" / "scala-2.12" / "classes" - Files.createDirectories(dir.toPath) - }, - Compile / compile := ((Compile / compile) dependsOn createTargetClassesDir).value, - // Generate the package object to provide the version information in runtime. - Compile / sourceGenerators += Def.task { - val file = (Compile / sourceManaged).value / "io" / "delta" / "package.scala" - IO.write(file, - s"""package io - | - |package object delta { - | val VERSION = "${version.value}" - |} - |""".stripMargin) - Seq(file) - }, TestParallelization.settings, ) .configureUnidoc( @@ -590,6 +790,11 @@ lazy val kernelApi = (project in file("kernel/kernel-api")) javaOnlyReleaseSettings, javafmtCheckSettings, scalafmtCheckSettings, + + // Use unique classDirectory name to avoid conflicts in connectClient test setup + // This allows connectClient to create symlinks without FileAlreadyExistsException + Compile / classDirectory := target.value / "scala-2.12" / "kernel-api-classes", + Test / javaOptions ++= Seq("-ea"), libraryDependencies ++= Seq( "org.roaringbitmap" % "RoaringBitmap" % "0.9.25", @@ -683,6 +888,11 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults")) javaOnlyReleaseSettings, javafmtCheckSettings, scalafmtCheckSettings, + + // Use unique classDirectory name to avoid conflicts in connectClient test setup + // This allows connectClient to create symlinks without FileAlreadyExistsException + Compile / classDirectory := target.value / "scala-2.12" / "kernel-defaults-classes", + Test / javaOptions ++= Seq("-ea"), // This allows generating tables with unsupported test table features in delta-spark Test / envVars += ("DELTA_TESTING", "1"), @@ -697,6 +907,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults")) "commons-io" % "commons-io" % "2.8.0" % "test", "com.novocode" % "junit-interface" % "0.11" % "test", "org.slf4j" % "slf4j-log4j12" % "1.7.36" % "test", + // Removed external delta-spark dependency - now using local sparkV1 project // JMH dependencies allow writing micro-benchmarks for testing performance of components. // JMH has framework to define benchmarks and takes care of many common functionalities // such as warm runs, cold runs, defining benchmark parameter variables etc. @@ -715,42 +926,6 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults")) unidocSourceFilePatterns += SourceFilePattern("io/delta/kernel/"), ).configureUnidoc(docTitle = "Delta Kernel Defaults") - -lazy val kernelSpark = (project in file("kernel-spark")) - .dependsOn(kernelApi) - .dependsOn(kernelDefaults) - .dependsOn(spark % "compile->compile") - .dependsOn(goldenTables % "test") - .settings( - name := "kernel-spark", - commonSettings, - javafmtCheckSettings, - skipReleaseSettings, - Test / javaOptions ++= Seq("-ea"), - libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", - "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", - "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", - - // Using released delta-spark JAR instead of module dependency to break circular dependency - "io.delta" %% "delta-spark" % "3.3.2" % "test", - - // Spark test dependencies for QueryTest and other test utilities - // Spark version(3.5.6) matches delta-spark's version 3.3.2 - "org.apache.spark" %% "spark-sql" % "3.5.6" % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % "3.5.6" % "test" classifier "tests", - "org.apache.spark" %% "spark-catalyst" % "3.5.6" % "test" classifier "tests", - - "org.junit.jupiter" % "junit-jupiter-api" % "5.8.2" % "test", - "org.junit.jupiter" % "junit-jupiter-engine" % "5.8.2" % "test", - "org.junit.jupiter" % "junit-jupiter-params" % "5.8.2" % "test", - "net.aichler" % "jupiter-interface" % "0.11.1" % "test", - "org.scalatest" %% "scalatest" % scalaTestVersion % "test" - ), - Test / testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a") - ) - // TODO to enable unit doc for kernelSpark. - lazy val unity = (project in file("unity")) .enablePlugins(ScalafmtPlugin) .dependsOn(kernelApi % "compile->compile;test->test") @@ -1338,7 +1513,7 @@ val createTargetClassesDir = taskKey[Unit]("create target classes dir") // Don't use these groups for any other projects lazy val sparkGroup = project - .aggregate(spark, kernelSpark, contribs, storage, storageS3DynamoDB, sharing, hudi) + .aggregate(spark, sparkV1, sparkV1Shaded, sparkV2, contribs, storage, storageS3DynamoDB, sharing, hudi) .settings( // crossScalaVersions must be set to Nil on the aggregating project crossScalaVersions := Nil, diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java index 202f59085d3..1b51e13dff6 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java @@ -152,7 +152,7 @@ private long calculateMaxSplitBytes(SparkSession sparkSession) { int minPartitionNum = minPartitionNumOption.isDefined() ? ((Number) minPartitionNumOption.get()).intValue() - : sparkSession.leafNodeDefaultParallelism(); + : sparkSession.sparkContext().defaultParallelism(); if (minPartitionNum <= 0) { minPartitionNum = 1; } diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java index dcf47a88f4e..5933ba425a5 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java @@ -43,10 +43,10 @@ public void setUp(@TempDir File tempDir) { new SparkConf() .set("spark.sql.catalog.dsv2", "io.delta.kernel.spark.catalog.TestCatalog") .set("spark.sql.catalog.dsv2.base_path", tempDir.getAbsolutePath()) - .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set("spark.sql.extensions", "io.delta.sql.LegacyDeltaSparkSessionExtension") .set( "spark.sql.catalog.spark_catalog", - "org.apache.spark.sql.delta.catalog.DeltaCatalog") + "org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog") .setMaster("local[*]") .setAppName("Dsv2BasicTest"); spark = SparkSession.builder().config(conf).getOrCreate(); diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/SparkDsv2TestBase.java b/kernel-spark/src/test/java/io/delta/kernel/spark/SparkDsv2TestBase.java index e1de37a1147..2a388d79f0d 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/SparkDsv2TestBase.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/SparkDsv2TestBase.java @@ -32,10 +32,10 @@ public static void setUpSparkAndEngine() { SparkSession.builder() .master("local[*]") .appName("SparkKernelDsv2Tests") - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.extensions", "io.delta.sql.LegacyDeltaSparkSessionExtension") .config( "spark.sql.catalog.spark_catalog", - "org.apache.spark.sql.delta.catalog.DeltaCatalog") + "org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog") .getOrCreate(); defaultEngine = DefaultEngine.create(spark.sessionState().newHadoopConf()); } diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkGoldenTableTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkGoldenTableTest.java index 68561aac93c..5aa9236307a 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkGoldenTableTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkGoldenTableTest.java @@ -60,10 +60,10 @@ public void setUp(@TempDir File tempDir) { new SparkConf() .set("spark.sql.catalog.dsv2", "io.delta.kernel.spark.catalog.TestCatalog") .set("spark.sql.catalog.dsv2.base_path", tempDir.getAbsolutePath()) - .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set("spark.sql.extensions", "io.delta.sql.LegacyDeltaSparkSessionExtension") .set( "spark.sql.catalog.spark_catalog", - "org.apache.spark.sql.delta.catalog.DeltaCatalog") + "org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog") .setMaster("local[*]") .setAppName("SparkGoldenTableTest"); spark = SparkSession.builder().config(conf).getOrCreate(); diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/StreamingHelperTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/StreamingHelperTest.java index d1991ee1edb..210b9b68142 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/StreamingHelperTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/StreamingHelperTest.java @@ -140,7 +140,7 @@ public void testGetActiveCommitAtTime_pastTimestamp(@TempDir File tempDir) throw .history() .getActiveCommitAtTime( timestamp, - Option.empty() /* catalogTable */, + deltaLog.initialCatalogTable() /* catalogTableOpt */, false /* canReturnLastCommit */, true /* mustBeRecreatable */, false /* canReturnEarliestCommit */); @@ -172,7 +172,7 @@ public void testGetActiveCommitAtTime_futureTimestamp_canReturnLast(@TempDir Fil .history() .getActiveCommitAtTime( futureTimestamp, - Option.empty() /* catalogTable */, + deltaLog.initialCatalogTable() /* catalogTableOpt */, true /* canReturnLastCommit */, true /* mustBeRecreatable */, false /* canReturnEarliestCommit */); @@ -204,7 +204,7 @@ public void testGetActiveCommitAtTime_futureTimestamp_notMustBeRecreatable(@Temp .history() .getActiveCommitAtTime( futureTimestamp, - Option.empty() /* catalogTable */, + deltaLog.initialCatalogTable() /* catalogTableOpt */, true /* canReturnLastCommit */, false /* mustBeRecreatable */, false /* canReturnEarliestCommit */); @@ -236,7 +236,7 @@ public void testGetActiveCommitAtTime_earlyTimestamp_canReturnEarliest(@TempDir .history() .getActiveCommitAtTime( earlyTimestamp, - Option.empty() /* catalogTable */, + deltaLog.initialCatalogTable() /* catalogTableOpt */, false /* canReturnLastCommit */, true /* mustBeRecreatable */, true /* canReturnEarliestCommit */); @@ -268,7 +268,7 @@ public void testGetActiveCommitAtTime_earlyTimestamp_notMustBeRecreatable_canRet .history() .getActiveCommitAtTime( earlyTimestamp, - Option.empty() /* catalogTable */, + deltaLog.initialCatalogTable() /* catalogTableOpt */, false /* canReturnLastCommit */, false /* mustBeRecreatable */, true /* canReturnEarliestCommit */); @@ -353,12 +353,19 @@ public void testCheckVersionExists( deltaLog .history() .checkVersionExists( - versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange)); + versionToCheck, + deltaLog.initialCatalogTable() /* catalogTableOpt */, + mustBeRecreatable, + allowOutOfRange)); } else { streamingHelper.checkVersionExists(versionToCheck, mustBeRecreatable, allowOutOfRange); deltaLog .history() - .checkVersionExists(versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange); + .checkVersionExists( + versionToCheck, + deltaLog.initialCatalogTable() /* catalogTableOpt */, + mustBeRecreatable, + allowOutOfRange); } } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala index 8d3601a8fbd..c0a4013837d 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala @@ -99,7 +99,9 @@ trait AbstractTestUtils .appName("Spark Test Writer for Delta Kernel") .config("spark.master", "local") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") - .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog") // Set this conf to empty string so that the golden tables generated // using with the test-prefix (i.e. there is no DELTA_TESTING set) can still work .config(DeltaSQLConf.TEST_DV_NAME_PREFIX.key, "") diff --git a/project/TestParallelization.scala b/project/TestParallelization.scala index 769d4fbcec1..e656e62d6e3 100644 --- a/project/TestParallelization.scala +++ b/project/TestParallelization.scala @@ -54,6 +54,7 @@ object TestParallelization { Test / testGroupingStrategy := { val groupsCount = (Test / forkTestJVMCount).value val shard = (Test / shardId).value + // Use regular baseDirectory for target directory (not Test/baseDirectory) val baseJvmDir = baseDirectory.value MinShardGroupDurationStrategy(groupsCount, baseJvmDir, shard, defaultForkOptions.value) }, @@ -81,7 +82,9 @@ object TestParallelization { javaHome = javaHome.value, outputStrategy = outputStrategy.value, bootJars = Vector.empty, - workingDirectory = Some(baseDirectory.value), + // Use Test/baseDirectory instead of baseDirectory to support modules where these differ + // (e.g. spark-combined module where Test/baseDirectory points to spark/ source directory) + workingDirectory = Some((Test / baseDirectory).value), runJVMOptions = (Test / javaOptions).value.toVector, connectInput = connectInput.value, envVars = (Test / envVars).value diff --git a/spark-combined/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java b/spark-combined/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java new file mode 100644 index 00000000000..d7b6255c6ef --- /dev/null +++ b/spark-combined/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java @@ -0,0 +1,29 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog; + +/** + * Delta Catalog implementation that can delegate to both V1 and V2 implementations. + * This class sits in delta-spark (combined) module and can access: + * - V1: org.apache.spark.sql.delta.* (full version with DeltaLog) + * - V2: io.delta.kernel.spark.* + */ +public class DeltaCatalog extends AbstractDeltaCatalog { + +} + + diff --git a/spark-combined/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala b/spark-combined/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala new file mode 100644 index 00000000000..56f47fcf3f6 --- /dev/null +++ b/spark-combined/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala @@ -0,0 +1,37 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sql + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * Delta Spark Session Extension that can register both V1 and V2 implementations. + * This class sits in delta-spark (combined) module and can access: + * - V1: org.apache.spark.sql.delta.* (full version with DeltaLog) + * - V2: io.delta.kernel.spark.* + */ +class DeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension { + + /** + * NoOpRule for binary compatibility with Delta 3.3.0 + * This class must remain here to satisfy MiMa checks + */ + class NoOpRule extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan + } +} diff --git a/spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala b/spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala index 0b7dedc1196..d1833fcda90 100644 --- a/spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala +++ b/spark/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala @@ -80,7 +80,8 @@ import org.apache.spark.sql.internal.SQLConf * * @since 0.4.0 */ -class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) { +class LegacyDeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension +class AbstractDeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectParser { (_, parser) => new DeltaSqlParser(parser) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 05ea7bb0e08..08b8e9ea84b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TimeTravel import org.apache.spark.sql.delta.DataFrameUtils import org.apache.spark.sql.delta.DeltaErrors.{TemporallyUnstableInputException, TimestampEarlierThanCommitRetentionException} import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils -import org.apache.spark.sql.delta.catalog.DeltaCatalog +import org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.catalog.IcebergTablePlaceHolder import org.apache.spark.sql.delta.commands._ @@ -245,7 +245,7 @@ class DeltaAnalysis(session: SparkSession) case _ => protocol } - val newDeltaCatalog = new DeltaCatalog() + val newDeltaCatalog = new LegacyDeltaCatalog() val existingTableOpt = newDeltaCatalog.getExistingTableIfExists(catalogTableTarget.identifier) val newTable = newDeltaCatalog .verifyTableAndSolidify( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 62193e3b4c6..7009074e1f8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -20,12 +20,10 @@ package org.apache.spark.sql.delta import java.io.{FileNotFoundException, IOException} import java.nio.file.FileAlreadyExistsException import java.util.{ConcurrentModificationException, UUID} - import scala.collection.JavaConverters._ - -import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBySpec} +import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol, TableFeatureProtocolUtils} -import org.apache.spark.sql.delta.catalog.DeltaCatalog +import org.apache.spark.sql.delta.catalog.AbstractDeltaCatalog import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, DeltaGenerateCommand} import org.apache.spark.sql.delta.constraints.Constraints import org.apache.spark.sql.delta.hooks.AutoCompactType @@ -37,9 +35,8 @@ import org.apache.spark.sql.delta.redirect.RedirectState import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.JsonUtils -import io.delta.sql.DeltaSparkSessionExtension +import io.delta.sql.AbstractDeltaSparkSessionExtension import org.apache.hadoop.fs.{ChecksumException, Path} - import org.apache.spark.{SparkConf, SparkEnv, SparkException} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -1881,10 +1878,10 @@ trait DeltaErrorsBase val catalogImplConfig = SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key new DeltaAnalysisException( errorClass = "DELTA_CONFIGURE_SPARK_SESSION_WITH_EXTENSION_AND_CATALOG", - messageParameters = Array(classOf[DeltaSparkSessionExtension].getName, - catalogImplConfig, classOf[DeltaCatalog].getName, - classOf[DeltaSparkSessionExtension].getName, - catalogImplConfig, classOf[DeltaCatalog].getName), + messageParameters = Array(classOf[AbstractDeltaSparkSessionExtension].getName, + catalogImplConfig, classOf[AbstractDeltaCatalog].getName, + classOf[AbstractDeltaSparkSessionExtension].getName, + catalogImplConfig, classOf[AbstractDeltaCatalog].getName), cause = originalException) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala index dcdb6ea088e..51364afad6f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala @@ -67,7 +67,9 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType} * A Catalog extension which can properly handle the interaction between the HiveMetaStore and * Delta tables. It delegates all operations DataSources other than Delta to the SparkCatalog. */ -class DeltaCatalog extends DelegatingCatalogExtension +class LegacyDeltaCatalog extends AbstractDeltaCatalog + +class AbstractDeltaCatalog extends DelegatingCatalogExtension with StagingTableCatalog with SupportsPathIdentifier with DeltaLogging { @@ -933,7 +935,7 @@ class DeltaCatalog extends DelegatingCatalogExtension * A trait for handling table access through delta.`/some/path`. This is a stop-gap solution * until PathIdentifiers are implemented in Apache Spark. */ -trait SupportsPathIdentifier extends TableCatalog { self: DeltaCatalog => +trait SupportsPathIdentifier extends TableCatalog { self: AbstractDeltaCatalog => private def supportSQLOnFile: Boolean = spark.sessionState.conf.runSQLonFile diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/implicits/package.scala b/spark/src/main/scala/org/apache/spark/sql/delta/implicits/package.scala index 3548c7e766d..80408134395 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/implicits/package.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/implicits/package.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.AddFile -import org.apache.spark.sql.delta.implicits.RichSparkClasses import org.apache.spark.sql.delta.util.DeltaEncoders import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -package object implicits extends DeltaEncoders with RichSparkClasses { +package object implicits extends DeltaEncoders with implicits.RichSparkClasses { // Define a few implicit classes to provide the `toDF` method. These classes are not using generic // types to avoid touching Scala reflection. implicit class RichAddFileSeq(files: Seq[AddFile]) { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index 31e6afdb064..ca566b86e74 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -29,7 +29,7 @@ import scala.sys.process.Process import org.apache.spark.sql.delta.DeltaErrors.generateDocsLink import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION} -import org.apache.spark.sql.delta.catalog.DeltaCatalog +import org.apache.spark.sql.delta.catalog.AbstractDeltaCatalog import org.apache.spark.sql.delta.constraints.CharVarcharConstraint import org.apache.spark.sql.delta.constraints.Constraints import org.apache.spark.sql.delta.constraints.Constraints.NotNull @@ -38,7 +38,7 @@ import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, Inva import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils -import io.delta.sql.DeltaSparkSessionExtension +import io.delta.sql.AbstractDeltaSparkSessionExtension import org.apache.hadoop.fs.Path import org.json4s.JString import org.scalatest.GivenWhenThen @@ -1977,9 +1977,9 @@ trait DeltaErrorsSuiteBase } checkError(e, "DELTA_CONFIGURE_SPARK_SESSION_WITH_EXTENSION_AND_CATALOG", "56038", Map( - "sparkSessionExtensionName" -> classOf[DeltaSparkSessionExtension].getName, + "sparkSessionExtensionName" -> classOf[AbstractDeltaSparkSessionExtension].getName, "catalogKey" -> SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, - "catalogClassName" -> classOf[DeltaCatalog].getName + "catalogClassName" -> classOf[AbstractDeltaCatalog].getName )) } {