Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
e6e374f
save
huan233usc Oct 9, 2025
9560950
save
huan233usc Oct 9, 2025
8b9fda1
save
huan233usc Oct 9, 2025
4f45983
save
huan233usc Oct 9, 2025
d6dc7ef
fix
huan233usc Oct 9, 2025
3a6472b
fix
huan233usc Oct 9, 2025
b55bfaa
fix
huan233usc Oct 10, 2025
29a9dbe
fix
huan233usc Oct 10, 2025
5cbf64f
fix
huan233usc Oct 10, 2025
9a6acdd
fix
huan233usc Oct 10, 2025
f8d4862
fix
huan233usc Oct 10, 2025
6c7c972
fix
huan233usc Oct 10, 2025
565f9cb
fix
huan233usc Oct 10, 2025
74a1f5c
fix
huan233usc Oct 10, 2025
c6c306d
fix
huan233usc Oct 10, 2025
dbf686b
fix
huan233usc Oct 10, 2025
2929d72
fix
huan233usc Oct 10, 2025
642a42a
fix
huan233usc Oct 11, 2025
e485cda
fix
huan233usc Oct 11, 2025
f6ef579
fix
huan233usc Oct 11, 2025
5f0dbc8
fix
huan233usc Oct 11, 2025
812ba1d
fix
huan233usc Oct 11, 2025
7dd8d18
fix
huan233usc Oct 11, 2025
1658b8f
fix
huan233usc Oct 12, 2025
743ca99
fix
huan233usc Oct 12, 2025
33f484e
Fix test working directory: use baseDirectory instead of Test/baseDir…
huan233usc Oct 12, 2025
6f833ef
Refactor: use delta-spark-v1's baseDirectory directly for better clarity
huan233usc Oct 12, 2025
5c05ad4
Fix: use delta-spark-v1's baseDirectory for all test paths
huan233usc Oct 12, 2025
fb98c0c
Add debug output for Test/javaOptions user.dir to diagnose GitHub Act…
huan233usc Oct 12, 2025
8c4dd7e
Add more debug output for forkOptions working directory
huan233usc Oct 12, 2025
a0af2a9
Fix: remove duplicate javaOptions - only add user.dir
huan233usc Oct 12, 2025
57312d9
Fix: TestParallelization should use Test/baseDirectory for workingDir…
huan233usc Oct 12, 2025
e639b18
fix
huan233usc Oct 13, 2025
92b6326
fix
huan233usc Oct 13, 2025
1238ef8
Fix: avoid duplicate symlinks in connectClient test setup
huan233usc Oct 13, 2025
021582e
Simplify connectClient symlink fix - remove try-catch
huan233usc Oct 13, 2025
89206cf
Use local delta-spark-v1 in kernelDefaults tests
huan233usc Oct 13, 2025
ad155d1
try minimize change
huan233usc Oct 13, 2025
7864c77
fix test
huan233usc Oct 13, 2025
8716b18
fix test
huan233usc Oct 13, 2025
86c0186
fix test
huan233usc Oct 13, 2025
3a19590
fix test
huan233usc Oct 13, 2025
a000aa8
fix test
huan233usc Oct 13, 2025
163b123
fix test
huan233usc Oct 13, 2025
a63fd0d
fix test
huan233usc Oct 13, 2025
680787e
Merge from master and resolve conflicts
huan233usc Oct 15, 2025
6c3f89b
merge
huan233usc Oct 15, 2025
98295d3
save
huan233usc Oct 15, 2025
6fea63c
save
huan233usc Oct 15, 2025
ba9f416
save
huan233usc Oct 15, 2025
42d09ae
save
huan233usc Oct 15, 2025
94dcf97
revert to a working version
huan233usc Oct 16, 2025
4f07eb9
update
huan233usc Oct 16, 2025
758d35e
Merge master: resolve conflicts in build.sbt and StreamingHelperTest.…
huan233usc Oct 16, 2025
322cb0b
fix
huan233usc Oct 16, 2025
573ed9e
simplify
huan233usc Oct 16, 2025
85173cc
simplify
huan233usc Oct 16, 2025
b52d0a8
fix comments
huan233usc Oct 16, 2025
7860a01
fix import
huan233usc Oct 17, 2025
6650ff7
fix import
huan233usc Oct 17, 2025
68802d9
remove unnecessary change
huan233usc Oct 17, 2025
4f48696
remove unnecessary change
huan233usc Oct 17, 2025
3479837
fix comments
huan233usc Oct 17, 2025
9a039ab
fix test
huan233usc Oct 17, 2025
43ced15
fix test
huan233usc Oct 17, 2025
64d5723
simplify
huan233usc Oct 17, 2025
8cceaed
simplify
huan233usc Oct 17, 2025
8afe5fd
simplify
huan233usc Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 236 additions & 61 deletions build.sbt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private long calculateMaxSplitBytes(SparkSession sparkSession) {
int minPartitionNum =
minPartitionNumOption.isDefined()
? ((Number) minPartitionNumOption.get()).intValue()
: sparkSession.leafNodeDefaultParallelism();
: sparkSession.sparkContext().defaultParallelism();
if (minPartitionNum <= 0) {
minPartitionNum = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public void setUp(@TempDir File tempDir) {
new SparkConf()
.set("spark.sql.catalog.dsv2", "io.delta.kernel.spark.catalog.TestCatalog")
.set("spark.sql.catalog.dsv2.base_path", tempDir.getAbsolutePath())
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.extensions", "io.delta.sql.LegacyDeltaSparkSessionExtension")
.set(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
"org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog")
.setMaster("local[*]")
.setAppName("Dsv2BasicTest");
spark = SparkSession.builder().config(conf).getOrCreate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public static void setUpSparkAndEngine() {
SparkSession.builder()
.master("local[*]")
.appName("SparkKernelDsv2Tests")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.extensions", "io.delta.sql.LegacyDeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
"org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog")
.getOrCreate();
defaultEngine = DefaultEngine.create(spark.sessionState().newHadoopConf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ public void setUp(@TempDir File tempDir) {
new SparkConf()
.set("spark.sql.catalog.dsv2", "io.delta.kernel.spark.catalog.TestCatalog")
.set("spark.sql.catalog.dsv2.base_path", tempDir.getAbsolutePath())
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.extensions", "io.delta.sql.LegacyDeltaSparkSessionExtension")
.set(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
"org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog")
.setMaster("local[*]")
.setAppName("SparkGoldenTableTest");
spark = SparkSession.builder().config(conf).getOrCreate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testGetActiveCommitAtTime_pastTimestamp(@TempDir File tempDir) throw
.history()
.getActiveCommitAtTime(
timestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
false /* canReturnLastCommit */,
true /* mustBeRecreatable */,
false /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -172,7 +172,7 @@ public void testGetActiveCommitAtTime_futureTimestamp_canReturnLast(@TempDir Fil
.history()
.getActiveCommitAtTime(
futureTimestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
true /* canReturnLastCommit */,
true /* mustBeRecreatable */,
false /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testGetActiveCommitAtTime_futureTimestamp_notMustBeRecreatable(@Temp
.history()
.getActiveCommitAtTime(
futureTimestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
true /* canReturnLastCommit */,
false /* mustBeRecreatable */,
false /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testGetActiveCommitAtTime_earlyTimestamp_canReturnEarliest(@TempDir
.history()
.getActiveCommitAtTime(
earlyTimestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
false /* canReturnLastCommit */,
true /* mustBeRecreatable */,
true /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -268,7 +268,7 @@ public void testGetActiveCommitAtTime_earlyTimestamp_notMustBeRecreatable_canRet
.history()
.getActiveCommitAtTime(
earlyTimestamp,
Option.empty() /* catalogTable */,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
false /* canReturnLastCommit */,
false /* mustBeRecreatable */,
true /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -353,12 +353,19 @@ public void testCheckVersionExists(
deltaLog
.history()
.checkVersionExists(
versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange));
versionToCheck,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
mustBeRecreatable,
allowOutOfRange));
} else {
streamingHelper.checkVersionExists(versionToCheck, mustBeRecreatable, allowOutOfRange);
deltaLog
.history()
.checkVersionExists(versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange);
.checkVersionExists(
versionToCheck,
deltaLog.initialCatalogTable() /* catalogTableOpt */,
mustBeRecreatable,
allowOutOfRange);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ trait AbstractTestUtils
.appName("Spark Test Writer for Delta Kernel")
.config("spark.master", "local")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
// Set this conf to empty string so that the golden tables generated
// using with the test-prefix (i.e. there is no DELTA_TESTING set) can still work
.config(DeltaSQLConf.TEST_DV_NAME_PREFIX.key, "")
Expand Down
5 changes: 4 additions & 1 deletion project/TestParallelization.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ object TestParallelization {
Test / testGroupingStrategy := {
val groupsCount = (Test / forkTestJVMCount).value
val shard = (Test / shardId).value
// Use regular baseDirectory for target directory (not Test/baseDirectory)
val baseJvmDir = baseDirectory.value
MinShardGroupDurationStrategy(groupsCount, baseJvmDir, shard, defaultForkOptions.value)
},
Expand Down Expand Up @@ -81,7 +82,9 @@ object TestParallelization {
javaHome = javaHome.value,
outputStrategy = outputStrategy.value,
bootJars = Vector.empty,
workingDirectory = Some(baseDirectory.value),
// Use Test/baseDirectory instead of baseDirectory to support modules where these differ
// (e.g. spark-combined module where Test/baseDirectory points to spark/ source directory)
workingDirectory = Some((Test / baseDirectory).value),
runJVMOptions = (Test / javaOptions).value.toVector,
connectInput = connectInput.value,
envVars = (Test / envVars).value
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a README.md under spark-combined?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, shall we call it spark-unified instead of spark-combined?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, we can have

  • spark
  • sparkV1
  • sparkV2

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog;

/**
* Delta Catalog implementation that can delegate to both V1 and V2 implementations.
* This class sits in delta-spark (combined) module and can access:
* - V1: org.apache.spark.sql.delta.* (full version with DeltaLog)
* - V2: io.delta.kernel.spark.*
*/
public class DeltaCatalog extends AbstractDeltaCatalog {

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sql

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

/**
* Delta Spark Session Extension that can register both V1 and V2 implementations.
* This class sits in delta-spark (combined) module and can access:
* - V1: org.apache.spark.sql.delta.* (full version with DeltaLog)
* - V2: io.delta.kernel.spark.*
*/
class DeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension {

/**
* NoOpRule for binary compatibility with Delta 3.3.0
* This class must remain here to satisfy MiMa checks
*/
class NoOpRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ import org.apache.spark.sql.internal.SQLConf
*
* @since 0.4.0
*/
class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
class LegacyDeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension
class AbstractDeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a blank line between the two classes.
Also, add comment for each classs

override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (_, parser) =>
new DeltaSqlParser(parser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TimeTravel
import org.apache.spark.sql.delta.DataFrameUtils
import org.apache.spark.sql.delta.DeltaErrors.{TemporallyUnstableInputException, TimestampEarlierThanCommitRetentionException}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.catalog.LegacyDeltaCatalog
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.catalog.IcebergTablePlaceHolder
import org.apache.spark.sql.delta.commands._
Expand Down Expand Up @@ -245,7 +245,7 @@ class DeltaAnalysis(session: SparkSession)
case _ =>
protocol
}
val newDeltaCatalog = new DeltaCatalog()
val newDeltaCatalog = new LegacyDeltaCatalog()
val existingTableOpt = newDeltaCatalog.getExistingTableIfExists(catalogTableTarget.identifier)
val newTable = newDeltaCatalog
.verifyTableAndSolidify(
Expand Down
17 changes: 7 additions & 10 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ package org.apache.spark.sql.delta
import java.io.{FileNotFoundException, IOException}
import java.nio.file.FileAlreadyExistsException
import java.util.{ConcurrentModificationException, UUID}

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBySpec}
import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.catalog.AbstractDeltaCatalog
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, DeltaGenerateCommand}
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.AutoCompactType
Expand All @@ -37,9 +35,8 @@ import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sql.DeltaSparkSessionExtension
import io.delta.sql.AbstractDeltaSparkSessionExtension
import org.apache.hadoop.fs.{ChecksumException, Path}

import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -1881,10 +1878,10 @@ trait DeltaErrorsBase
val catalogImplConfig = SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key
new DeltaAnalysisException(
errorClass = "DELTA_CONFIGURE_SPARK_SESSION_WITH_EXTENSION_AND_CATALOG",
messageParameters = Array(classOf[DeltaSparkSessionExtension].getName,
catalogImplConfig, classOf[DeltaCatalog].getName,
classOf[DeltaSparkSessionExtension].getName,
catalogImplConfig, classOf[DeltaCatalog].getName),
messageParameters = Array(classOf[AbstractDeltaSparkSessionExtension].getName,
catalogImplConfig, classOf[AbstractDeltaCatalog].getName,
classOf[AbstractDeltaSparkSessionExtension].getName,
catalogImplConfig, classOf[AbstractDeltaCatalog].getName),
cause = originalException)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
* A Catalog extension which can properly handle the interaction between the HiveMetaStore and
* Delta tables. It delegates all operations DataSources other than Delta to the SparkCatalog.
*/
class DeltaCatalog extends DelegatingCatalogExtension
class LegacyDeltaCatalog extends AbstractDeltaCatalog

class AbstractDeltaCatalog extends DelegatingCatalogExtension
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add comments

with StagingTableCatalog
with SupportsPathIdentifier
with DeltaLogging {
Expand Down Expand Up @@ -933,7 +935,7 @@ class DeltaCatalog extends DelegatingCatalogExtension
* A trait for handling table access through delta.`/some/path`. This is a stop-gap solution
* until PathIdentifiers are implemented in Apache Spark.
*/
trait SupportsPathIdentifier extends TableCatalog { self: DeltaCatalog =>
trait SupportsPathIdentifier extends TableCatalog { self: AbstractDeltaCatalog =>

private def supportSQLOnFile: Boolean = spark.sessionState.conf.runSQLonFile

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.implicits.RichSparkClasses
import org.apache.spark.sql.delta.util.DeltaEncoders

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

package object implicits extends DeltaEncoders with RichSparkClasses {
package object implicits extends DeltaEncoders with implicits.RichSparkClasses {
// Define a few implicit classes to provide the `toDF` method. These classes are not using generic
// types to avoid touching Scala reflection.
implicit class RichAddFileSeq(files: Seq[AddFile]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.sys.process.Process
import org.apache.spark.sql.delta.DeltaErrors.generateDocsLink
import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION}
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.catalog.AbstractDeltaCatalog
import org.apache.spark.sql.delta.constraints.CharVarcharConstraint
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.constraints.Constraints.NotNull
Expand All @@ -38,7 +38,7 @@ import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, Inva
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import io.delta.sql.DeltaSparkSessionExtension
import io.delta.sql.AbstractDeltaSparkSessionExtension
import org.apache.hadoop.fs.Path
import org.json4s.JString
import org.scalatest.GivenWhenThen
Expand Down Expand Up @@ -1977,9 +1977,9 @@ trait DeltaErrorsSuiteBase
}

checkError(e, "DELTA_CONFIGURE_SPARK_SESSION_WITH_EXTENSION_AND_CATALOG", "56038", Map(
"sparkSessionExtensionName" -> classOf[DeltaSparkSessionExtension].getName,
"sparkSessionExtensionName" -> classOf[AbstractDeltaSparkSessionExtension].getName,
"catalogKey" -> SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
"catalogClassName" -> classOf[DeltaCatalog].getName
"catalogClassName" -> classOf[AbstractDeltaCatalog].getName
))
}
{
Expand Down
Loading