Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
e6e374f
save
huan233usc Oct 9, 2025
9560950
save
huan233usc Oct 9, 2025
8b9fda1
save
huan233usc Oct 9, 2025
4f45983
save
huan233usc Oct 9, 2025
d6dc7ef
fix
huan233usc Oct 9, 2025
3a6472b
fix
huan233usc Oct 9, 2025
b55bfaa
fix
huan233usc Oct 10, 2025
29a9dbe
fix
huan233usc Oct 10, 2025
5cbf64f
fix
huan233usc Oct 10, 2025
9a6acdd
fix
huan233usc Oct 10, 2025
f8d4862
fix
huan233usc Oct 10, 2025
6c7c972
fix
huan233usc Oct 10, 2025
565f9cb
fix
huan233usc Oct 10, 2025
74a1f5c
fix
huan233usc Oct 10, 2025
c6c306d
fix
huan233usc Oct 10, 2025
dbf686b
fix
huan233usc Oct 10, 2025
2929d72
fix
huan233usc Oct 10, 2025
642a42a
fix
huan233usc Oct 11, 2025
e485cda
fix
huan233usc Oct 11, 2025
f6ef579
fix
huan233usc Oct 11, 2025
5f0dbc8
fix
huan233usc Oct 11, 2025
812ba1d
fix
huan233usc Oct 11, 2025
7dd8d18
fix
huan233usc Oct 11, 2025
1658b8f
fix
huan233usc Oct 12, 2025
743ca99
fix
huan233usc Oct 12, 2025
33f484e
Fix test working directory: use baseDirectory instead of Test/baseDir…
huan233usc Oct 12, 2025
6f833ef
Refactor: use delta-spark-v1's baseDirectory directly for better clarity
huan233usc Oct 12, 2025
5c05ad4
Fix: use delta-spark-v1's baseDirectory for all test paths
huan233usc Oct 12, 2025
fb98c0c
Add debug output for Test/javaOptions user.dir to diagnose GitHub Act…
huan233usc Oct 12, 2025
8c4dd7e
Add more debug output for forkOptions working directory
huan233usc Oct 12, 2025
a0af2a9
Fix: remove duplicate javaOptions - only add user.dir
huan233usc Oct 12, 2025
57312d9
Fix: TestParallelization should use Test/baseDirectory for workingDir…
huan233usc Oct 12, 2025
e639b18
fix
huan233usc Oct 13, 2025
92b6326
fix
huan233usc Oct 13, 2025
1238ef8
Fix: avoid duplicate symlinks in connectClient test setup
huan233usc Oct 13, 2025
021582e
Simplify connectClient symlink fix - remove try-catch
huan233usc Oct 13, 2025
89206cf
Use local delta-spark-v1 in kernelDefaults tests
huan233usc Oct 13, 2025
ad155d1
try minimize change
huan233usc Oct 13, 2025
7864c77
fix test
huan233usc Oct 13, 2025
8716b18
fix test
huan233usc Oct 13, 2025
86c0186
fix test
huan233usc Oct 13, 2025
3a19590
fix test
huan233usc Oct 13, 2025
a000aa8
fix test
huan233usc Oct 13, 2025
163b123
fix test
huan233usc Oct 13, 2025
a63fd0d
fix test
huan233usc Oct 13, 2025
680787e
Merge from master and resolve conflicts
huan233usc Oct 15, 2025
6c3f89b
merge
huan233usc Oct 15, 2025
98295d3
save
huan233usc Oct 15, 2025
6fea63c
save
huan233usc Oct 15, 2025
ba9f416
save
huan233usc Oct 15, 2025
42d09ae
save
huan233usc Oct 15, 2025
94dcf97
revert to a working version
huan233usc Oct 16, 2025
4f07eb9
update
huan233usc Oct 16, 2025
758d35e
Merge master: resolve conflicts in build.sbt and StreamingHelperTest.…
huan233usc Oct 16, 2025
322cb0b
fix
huan233usc Oct 16, 2025
573ed9e
simplify
huan233usc Oct 16, 2025
85173cc
simplify
huan233usc Oct 16, 2025
b52d0a8
fix comments
huan233usc Oct 16, 2025
7860a01
fix import
huan233usc Oct 17, 2025
6650ff7
fix import
huan233usc Oct 17, 2025
68802d9
remove unnecessary change
huan233usc Oct 17, 2025
4f48696
remove unnecessary change
huan233usc Oct 17, 2025
3479837
fix comments
huan233usc Oct 17, 2025
9a039ab
fix test
huan233usc Oct 17, 2025
43ced15
fix test
huan233usc Oct 17, 2025
64d5723
simplify
huan233usc Oct 17, 2025
8cceaed
simplify
huan233usc Oct 17, 2025
8afe5fd
simplify
huan233usc Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 261 additions & 55 deletions build.sbt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private long calculateMaxSplitBytes(SparkSession sparkSession) {
int minPartitionNum =
minPartitionNumOption.isDefined()
? ((Number) minPartitionNumOption.get()).intValue()
: sparkSession.leafNodeDefaultParallelism();
: sparkSession.sparkContext().defaultParallelism();
if (minPartitionNum <= 0) {
minPartitionNum = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ public void testGetActiveCommitAtTime_pastTimestamp(@TempDir File tempDir) throw
.history()
.getActiveCommitAtTime(
timestamp,
Option.empty(),
false /* canReturnLastCommit */,
true /* mustBeRecreatable */,
false /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -172,7 +171,6 @@ public void testGetActiveCommitAtTime_futureTimestamp_canReturnLast(@TempDir Fil
.history()
.getActiveCommitAtTime(
futureTimestamp,
Option.empty(),
true /* canReturnLastCommit */,
true /* mustBeRecreatable */,
false /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -204,7 +202,6 @@ public void testGetActiveCommitAtTime_futureTimestamp_notMustBeRecreatable(@Temp
.history()
.getActiveCommitAtTime(
futureTimestamp,
Option.empty(),
true /* canReturnLastCommit */,
false /* mustBeRecreatable */,
false /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -236,7 +233,6 @@ public void testGetActiveCommitAtTime_earlyTimestamp_canReturnEarliest(@TempDir
.history()
.getActiveCommitAtTime(
earlyTimestamp,
Option.empty(),
false /* canReturnLastCommit */,
true /* mustBeRecreatable */,
true /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -268,7 +264,6 @@ public void testGetActiveCommitAtTime_earlyTimestamp_notMustBeRecreatable_canRet
.history()
.getActiveCommitAtTime(
earlyTimestamp,
Option.empty(),
false /* canReturnLastCommit */,
false /* mustBeRecreatable */,
true /* canReturnEarliestCommit */);
Expand Down Expand Up @@ -352,13 +347,10 @@ public void testCheckVersionExists(
() ->
deltaLog
.history()
.checkVersionExists(
versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange));
.checkVersionExists(versionToCheck, mustBeRecreatable, allowOutOfRange));
} else {
streamingHelper.checkVersionExists(versionToCheck, mustBeRecreatable, allowOutOfRange);
deltaLog
.history()
.checkVersionExists(versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange);
deltaLog.history().checkVersionExists(versionToCheck, mustBeRecreatable, allowOutOfRange);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import io.delta.tables.DeltaTable
import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaTestImplicits.OptimisticTxnTestHelper
import org.apache.spark.sql.delta.util.FileNames

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -196,7 +195,7 @@ trait AbstractCheckpointV2ReadSuite extends AnyFunSuite with ExpressionTestUtils
val protocol = Protocol(3, 7, Some(Set("v2Checkpoint")), Some(supportedFeatures))
val add = AddFile(new Path("addfile").toUri.toString, Map.empty, 100L, 10L, dataChange = true)

log.startTransaction().commitManually(Seq(metadata, add): _*)
log.startTransaction().commitManuallyWithValidation(metadata, add)
log.upgradeProtocol(None, log.update(), protocol)
log.checkpoint(log.update())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import io.delta.kernel.utils.CloseableIterable.emptyIterable

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.actions.CommitInfo
import org.apache.spark.sql.delta.test.DeltaTestImplicits.OptimisticTxnTestHelper

import org.apache.hadoop.fs.Path
import org.scalatest.funsuite.AnyFunSuite
Expand Down Expand Up @@ -155,22 +154,21 @@ class ChecksumUtilsSuite extends AnyFunSuite with WriteUtils with LogReplayBaseS
val deltaLog = DeltaLog.forTable(spark, new Path(path))
deltaLog
.startTransaction()
.commitManually(
List(
CommitInfo(
time = 12345,
operation = "MANUAL UPDATE",
inCommitTimestamp = Some(12345),
operationParameters = Map.empty,
commandContext = Map.empty,
readVersion = Some(11),
isolationLevel = None,
isBlindAppend = None,
operationMetrics = None,
userMetadata = None,
tags = None,
txnId = None),
deltaLog.getSnapshotAt(11).allFiles.head().copy(dataChange = false).wrap.unwrap): _*)
.commitManuallyWithValidation(
CommitInfo(
time = 12345,
operation = "MANUAL UPDATE",
inCommitTimestamp = Some(12345),
operationParameters = Map.empty,
commandContext = Map.empty,
readVersion = Some(11),
isolationLevel = None,
isBlindAppend = None,
operationMetrics = None,
userMetadata = None,
tags = None,
txnId = None),
deltaLog.getSnapshotAt(11).allFiles.head().copy(dataChange = false).wrap.unwrap)
deleteChecksumFileForTableUsingHadoopFs(
table.getPath(engine).stripPrefix("file:"),
Seq(11, 12))
Expand All @@ -194,22 +192,21 @@ class ChecksumUtilsSuite extends AnyFunSuite with WriteUtils with LogReplayBaseS
val deltaLog = DeltaLog.forTable(spark, new Path(path))
deltaLog
.startTransaction()
.commitManually(
List(
CommitInfo(
time = 12345,
operation = "REPLACE TABLE",
inCommitTimestamp = Some(12345),
operationParameters = Map.empty,
commandContext = Map.empty,
readVersion = Some(11),
isolationLevel = None,
isBlindAppend = None,
operationMetrics = None,
userMetadata = None,
tags = None,
txnId = None),
deltaLog.getSnapshotAt(11).allFiles.head().remove.copy(size = None).wrap.unwrap): _*)
.commitManuallyWithValidation(
CommitInfo(
time = 12345,
operation = "REPLACE TABLE",
inCommitTimestamp = Some(12345),
operationParameters = Map.empty,
commandContext = Map.empty,
readVersion = Some(11),
isolationLevel = None,
isBlindAppend = None,
operationMetrics = None,
userMetadata = None,
tags = None,
txnId = None),
deltaLog.getSnapshotAt(11).allFiles.head().remove.copy(size = None).wrap.unwrap)
// Spark generated CRC from Spark doesn't include file size histogram
deleteChecksumFileForTableUsingHadoopFs(
table.getPath(engine).stripPrefix("file:"),
Expand Down Expand Up @@ -282,22 +279,21 @@ class ChecksumUtilsSuite extends AnyFunSuite with WriteUtils with LogReplayBaseS
val deltaLog = DeltaLog.forTable(spark, new Path(path))
deltaLog
.startTransaction()
.commitManually(
List(
deltaLog.getSnapshotAt(11).allFiles.head().remove.wrap.unwrap,
CommitInfo(
time = 12345,
operation = "REPLACE TABLE",
inCommitTimestamp = Some(12345),
operationParameters = Map.empty,
commandContext = Map.empty,
readVersion = Some(11),
isolationLevel = None,
isBlindAppend = None,
operationMetrics = None,
userMetadata = None,
tags = None,
txnId = None).wrap.unwrap): _*)
.commitManuallyWithValidation(
deltaLog.getSnapshotAt(11).allFiles.head().remove.wrap.unwrap,
CommitInfo(
time = 12345,
operation = "REPLACE TABLE",
inCommitTimestamp = Some(12345),
operationParameters = Map.empty,
commandContext = Map.empty,
readVersion = Some(11),
isolationLevel = None,
isBlindAppend = None,
operationMetrics = None,
userMetadata = None,
tags = None,
txnId = None).wrap.unwrap)
// Spark generated CRC from Spark doesn't include file size histogram
deleteChecksumFileForTableUsingHadoopFs(
table.getPath(engine).stripPrefix("file:"),
Expand All @@ -320,9 +316,8 @@ class ChecksumUtilsSuite extends AnyFunSuite with WriteUtils with LogReplayBaseS
val deltaLog = DeltaLog.forTable(spark, new Path(path))
deltaLog
.startTransaction()
.commitManually(
List(
deltaLog.getSnapshotAt(11).allFiles.head().remove.wrap.unwrap): _*)
.commitManuallyWithValidation(
deltaLog.getSnapshotAt(11).allFiles.head().remove.wrap.unwrap)
// Spark generated CRC from Spark doesn't include file size histogram
deleteChecksumFileForTableUsingHadoopFs(
table.getPath(engine).stripPrefix("file:"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,23 @@ abstract class AbstractDeltaTableWritesSuite extends AnyFunSuite with AbstractWr
engine,
tablePath,
testSchema)
DeltaTable.forPath(spark, tablePath)
.addFeatureSupport("testUnsupportedWriter")

// Use your new commitUnsafe API to write an unsupported writer feature
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.actions.Protocol

val deltaLog = DeltaLog.forTable(spark, tablePath)
val txn = deltaLog.startTransaction()

// Create Protocol action with unsupported writer feature
val protocolAction = Protocol(
minReaderVersion = 3,
minWriterVersion = 7,
readerFeatures = Some(Set.empty),
writerFeatures = Some(Set("testUnsupportedWriter")))

// Use your elegant API to commit directly to version 1
txn.commitUnsafe(tablePath, 1L, protocolAction)
val e = intercept[KernelException] {
getUpdateTxn(engine, tablePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import io.delta.kernel.utils.CloseableIterable.emptyIterable
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.RowId.{RowTrackingMetadataDomain => SparkRowTrackingMetadataDomain}
import org.apache.spark.sql.delta.actions.{DomainMetadata => SparkDomainMetadata}
import org.apache.spark.sql.delta.test.DeltaTestImplicits.OptimisticTxnTestHelper

import org.apache.hadoop.fs.Path
import org.scalatest.funsuite.AnyFunSuite
Expand Down Expand Up @@ -439,22 +438,20 @@ trait AbstractDomainMetadataSuite extends AnyFunSuite with AbstractWriteUtils
val deltaLog = DeltaLog.forTable(spark, new Path(tablePath))
deltaLog
.startTransaction()
.commitManually(
List(
SparkDomainMetadata("testDomain1", "{\"key1\":\"1\"}", removed = false),
SparkDomainMetadata("testDomain2", "", removed = false),
SparkDomainMetadata("testDomain3", "", removed = false)): _*)
.commitManuallyWithValidation(
SparkDomainMetadata("testDomain1", "{\"key1\":\"1\"}", removed = false),
SparkDomainMetadata("testDomain2", "", removed = false),
SparkDomainMetadata("testDomain3", "", removed = false))

// This will create 03.json and 03.checkpoint
spark.range(0, 2).write.format("delta").mode("append").save(tablePath)

// Manually commit domain metadata actions. This will create 04.json
deltaLog
.startTransaction()
.commitManually(
List(
SparkDomainMetadata("testDomain1", "{\"key1\":\"10\"}", removed = false),
SparkDomainMetadata("testDomain2", "", removed = true)): _*)
.commitManuallyWithValidation(
SparkDomainMetadata("testDomain1", "{\"key1\":\"10\"}", removed = false),
SparkDomainMetadata("testDomain2", "", removed = true))

// Use Delta Kernel to read the table's domain metadata and verify the result.
// We will need to read 1 checkpoint file and 1 log file to replay the table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import io.delta.kernel.internal.util.{FileNames, ManualClock, VectorUtils}

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.actions.{Action => SparkAction, AddCDCFile => SparkAddCDCFile, AddFile => SparkAddFile, CommitInfo => SparkCommitInfo, Metadata => SparkMetadata, Protocol => SparkProtocol, RemoveFile => SparkRemoveFile, SetTransaction => SparkSetTransaction}
import org.apache.spark.sql.delta.test.DeltaTestImplicits.OptimisticTxnTestHelper

import org.apache.hadoop.fs.{Path => HadoopPath}
import org.apache.spark.sql.functions.col
Expand Down Expand Up @@ -326,7 +325,7 @@ abstract class TableChangesSuite extends AnyFunSuite with TestUtils with WriteUt

val add1 = SparkAddFile("fake/path/1", Map.empty, 1, 1, dataChange = true)
val txn1 = log.startTransaction()
txn1.commitManually(metadata :: add1 :: Nil: _*)
txn1.commitManuallyWithValidation(metadata, add1)

val addCDC2 = SparkAddCDCFile(
"fake/path/2",
Expand All @@ -335,12 +334,12 @@ abstract class TableChangesSuite extends AnyFunSuite with TestUtils with WriteUt
Map("tag_foo" -> "tag_bar"))
val remove2 = SparkRemoveFile("fake/path/1", Some(100), dataChange = true)
val txn2 = log.startTransaction()
txn2.commitManually(addCDC2 :: remove2 :: Nil: _*)
txn2.commitManuallyWithValidation(addCDC2, remove2)

val setTransaction3 = SparkSetTransaction("fakeAppId", 3L, Some(200))
val txn3 = log.startTransaction()
val latestTableProtocol = log.snapshot.protocol
txn3.commitManually(latestTableProtocol :: setTransaction3 :: Nil: _*)
txn3.commitManuallyWithValidation(latestTableProtocol, setTransaction3)

// request subset of actions
testGetChangesVsSpark(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package io.delta.kernel.defaults.utils

import java.io.{File, FileNotFoundException}
import java.math.{BigDecimal => BigDecimalJ}
import java.nio.file.Files
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.{Files, Paths}
import java.util.{Optional, TimeZone, UUID}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -170,6 +171,38 @@ trait AbstractTestUtils
def toScala: Option[T] = if (optional.isPresent) Some(optional.get()) else None
}

/**
* Provides test-only apis to internal Delta Spark APIs.
*/
implicit class OptimisticTxnTestHelper(txn: org.apache.spark.sql.delta.OptimisticTransaction) {

/**
* Test only method to commit arbitrary actions to delta table.
*/
def commitManuallyWithValidation(actions: org.apache.spark.sql.delta.actions.Action*): Unit = {
txn.commit(actions.toSeq, org.apache.spark.sql.delta.DeltaOperations.ManualUpdate)
}

/**
* Test only method to unsafe commit - writes actions directly to transaction log.
* Note: This bypasses Delta Spark transaction logic.
*
* @param tablePath The path to the Delta table
* @param version The commit version number
* @param actions Sequence of Action objects to write
*/
def commitUnsafe(
tablePath: String,
version: Long,
actions: org.apache.spark.sql.delta.actions.Action*): Unit = {
val logPath = new org.apache.hadoop.fs.Path(tablePath, "_delta_log")
val commitFile = org.apache.spark.sql.delta.util.FileNames.unsafeDeltaFile(logPath, version)
val commitContent = actions.map(_.json + "\n").mkString.getBytes(UTF_8)
Files.write(Paths.get(commitFile.toString), commitContent)
Table.forPath(defaultEngine, tablePath).checksum(defaultEngine, version)
}
}

implicit object ResourceLoader {
lazy val classLoader: ClassLoader = ResourceLoader.getClass.getClassLoader
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog;

/**
* Delta Catalog implementation that can delegate to both V1 and V2 implementations.
* This class sits in delta-spark-shaded module and can access:
* - V1: org.apache.spark.sql.delta.* (full version with DeltaLog)
* - V2: io.delta.kernel.spark.*
*/
public class DeltaCatalog extends AbstractDeltaCatalog {

}

Loading
Loading