Skip to content

Commit 0c162c3

Browse files
committed
Merge branch 'master' into fix-conflict
2 parents 9751281 + be6898c commit 0c162c3

File tree

11 files changed

+279
-36
lines changed

11 files changed

+279
-36
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import io.delta.kernel.internal.replay.ConflictChecker;
4747
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
4848
import io.delta.kernel.internal.rowtracking.RowTracking;
49+
import io.delta.kernel.internal.stats.FileSizeHistogram;
4950
import io.delta.kernel.internal.tablefeatures.TableFeatures;
5051
import io.delta.kernel.internal.util.*;
5152
import io.delta.kernel.internal.util.Clock;
@@ -597,8 +598,9 @@ private Optional<CRCInfo> buildPostCommitCrcInfoIfCurrentCrcAvailable(
597598
metricsResult.getTotalAddFilesSizeInBytes(),
598599
metricsResult.getNumAddFiles(),
599600
Optional.of(txnId.toString()),
600-
Optional.empty() // once we support writing CRC populate here
601-
));
601+
metricsResult
602+
.getTableFileSizeHistogram()
603+
.map(FileSizeHistogram::fromFileSizeHistogramResult)));
602604
}
603605

604606
return currentCrcInfo
@@ -610,12 +612,16 @@ private Optional<CRCInfo> buildPostCommitCrcInfoIfCurrentCrcAvailable(
610612
commitAtVersion,
611613
metadata,
612614
protocol,
613-
// TODO: handle RemoveFiles for calculating table size and num of files.
614-
lastCrcInfo.getTableSizeBytes() + metricsResult.getTotalAddFilesSizeInBytes(),
615-
lastCrcInfo.getNumFiles() + metricsResult.getNumAddFiles(),
615+
lastCrcInfo.getTableSizeBytes()
616+
+ metricsResult.getTotalAddFilesSizeInBytes()
617+
- metricsResult.getTotalRemoveFilesSizeInBytes(),
618+
lastCrcInfo.getNumFiles()
619+
+ metricsResult.getNumAddFiles()
620+
- metricsResult.getNumRemoveFiles(),
616621
Optional.of(txnId.toString()),
617-
Optional.empty() // once we support writing CRC populate here
618-
));
622+
metricsResult
623+
.getTableFileSizeHistogram()
624+
.map(FileSizeHistogram::fromFileSizeHistogramResult)));
619625
}
620626

621627
/**

kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/CRCInfo.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,9 @@ public Row toRow() {
172172

173173
// Add optional fields
174174
txnId.ifPresent(txn -> values.put(getSchemaIndex(TXN_ID), txn));
175-
// TODO write fileSizeHistogram here
175+
fileSizeHistogram.ifPresent(
176+
fileSizeHistogram ->
177+
values.put(getSchemaIndex(FILE_SIZE_HISTOGRAM), fileSizeHistogram.toRow()));
176178
return new GenericRow(CRC_FILE_SCHEMA, values);
177179
}
178180

kernel/kernel-api/src/main/java/io/delta/kernel/internal/stats/FileSizeHistogram.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,14 @@ private static long[] createDefaultBinBoundaries() {
160160
return boundaries;
161161
}
162162

163-
// TODO factory from FileSizeHistogramResult
163+
public static FileSizeHistogram fromFileSizeHistogramResult(
164+
FileSizeHistogramResult fileSizeHistogramResult) {
165+
requireNonNull(fileSizeHistogramResult);
166+
return new FileSizeHistogram(
167+
fileSizeHistogramResult.getSortedBinBoundaries(),
168+
fileSizeHistogramResult.getFileCounts(),
169+
fileSizeHistogramResult.getTotalBytes());
170+
}
164171

165172
////////////////////////////////////
166173
// Member variables and methods //

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ChecksumSimpleComparisonSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ class ChecksumSimpleComparisonSuite extends DeltaTableWriteSuiteBase with TestUt
125125
crcInfo.getTableSizeBytes,
126126
crcInfo.getNumFiles,
127127
Optional.empty(),
128-
// This will be empty since neither Kernel nor Spark writes fileSizeHistogram to CRCInfo yet
129-
crcInfo.getFileSizeHistogram)
128+
// TODO: check file size histogram once https://github.com/delta-io/delta/pull/3907 merged.
129+
Optional.empty())
130130
}
131131
}
132132

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.defaults
17+
18+
import java.util.{Collections, Optional}
19+
20+
import scala.jdk.CollectionConverters.{asJavaIteratorConverter, mapAsJavaMapConverter, seqAsJavaListConverter, setAsJavaSetConverter}
21+
22+
import io.delta.kernel.{Table, Transaction, TransactionCommitResult}
23+
import io.delta.kernel.data.Row
24+
import io.delta.kernel.engine.Engine
25+
import io.delta.kernel.internal.{InternalScanFileUtils, SnapshotImpl, TableConfig, TableImpl}
26+
import io.delta.kernel.internal.actions.{AddFile, GenerateIcebergCompatActionUtils, RemoveFile}
27+
import io.delta.kernel.internal.checksum.ChecksumReader
28+
import io.delta.kernel.internal.fs.Path
29+
import io.delta.kernel.internal.stats.FileSizeHistogram
30+
import io.delta.kernel.internal.util.Utils.toCloseableIterator
31+
import io.delta.kernel.utils.{CloseableIterable, DataFileStatus}
32+
import io.delta.kernel.utils.CloseableIterable.inMemoryIterable
33+
34+
/**
35+
* Functional e2e test suite for verifying file stats collection in CRC are correct.
36+
*/
37+
class ChecksumStatsSuite extends DeltaTableWriteSuiteBase {
38+
39+
test("Check stats in checksum are correct") {
40+
withTempDirAndEngine { (tablePath, engine) =>
41+
// Currently only table with IcebergWriterCompatV1 could easily
42+
// support both add/remove files.
43+
val tableProperties = Map(
44+
TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED.getKey -> "true")
45+
createEmptyTable(engine, tablePath, testSchema, tableProperties = tableProperties)
46+
val expectedFileSizeHistogram = FileSizeHistogram.createDefaultHistogram()
47+
48+
val dataFiles = Map("file1.parquet" -> 100L, "file2.parquet" -> 100802L)
49+
addFiles(engine, tablePath, dataFiles, expectedFileSizeHistogram)
50+
checkCrcCorrect(
51+
engine,
52+
tablePath,
53+
version = 1,
54+
expectedFileCount = 2,
55+
expectedTableSize = 100902,
56+
expectedFileSizeHistogram = expectedFileSizeHistogram)
57+
58+
removeFiles(
59+
engine,
60+
tablePath,
61+
Map("file1.parquet" -> 100),
62+
expectedFileSizeHistogram)
63+
checkCrcCorrect(
64+
engine,
65+
tablePath,
66+
version = 2,
67+
expectedFileCount = 1,
68+
expectedTableSize = 100902 - 100,
69+
expectedFileSizeHistogram = expectedFileSizeHistogram)
70+
}
71+
}
72+
73+
/**
74+
* Verifies that the CRC information at the given version matches expectations.
75+
*
76+
* @param engine The Delta Kernel engine
77+
* @param tablePath Path to the Delta table
78+
* @param version The table version to check
79+
* @param expectedFileCount Expected number of files in the table
80+
* @param expectedTableSize Expected total size of all files in bytes
81+
* @param expectedFileSizeHistogram Expected file size histogram
82+
*/
83+
def checkCrcCorrect(
84+
engine: Engine,
85+
tablePath: String,
86+
version: Long,
87+
expectedFileCount: Long,
88+
expectedTableSize: Long,
89+
expectedFileSizeHistogram: FileSizeHistogram): Unit = {
90+
val crcInfo = ChecksumReader.getCRCInfo(
91+
engine,
92+
new Path(tablePath + "/_delta_log"),
93+
version,
94+
version)
95+
.orElseThrow(() => new AssertionError("CRC information should be present"))
96+
assert(crcInfo.getNumFiles === expectedFileCount)
97+
assert(crcInfo.getTableSizeBytes === expectedTableSize)
98+
assert(crcInfo.getFileSizeHistogram === Optional.of(expectedFileSizeHistogram))
99+
}
100+
101+
/**
102+
* Adds files to the table and updates the expected histogram.
103+
*
104+
* @param engine The Delta Kernel engine
105+
* @param tablePath Path to the Delta table
106+
* @param filesToAdd Map of file paths to their sizes
107+
* @param histogram The histogram to update with new file sizes
108+
*/
109+
private def addFiles(
110+
engine: Engine,
111+
tablePath: String,
112+
filesToAdd: Map[String, Long],
113+
histogram: FileSizeHistogram): Unit = {
114+
115+
val txn = createTxn(engine, tablePath, maxRetries = 0)
116+
117+
val actionsToCommit = filesToAdd.map { case (path, size) =>
118+
histogram.insert(size)
119+
GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1AddAction(
120+
txn.getTransactionState(engine),
121+
generateDataFileStatus(tablePath, path, fileSize = size),
122+
Collections.emptyMap(),
123+
true /* dataChange */ )
124+
}.toSeq
125+
126+
commitTransaction(
127+
txn,
128+
engine,
129+
inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator())))
130+
}
131+
132+
/**
133+
* Removes files from the table and updates the expected histogram.
134+
*
135+
* @param engine The Delta Kernel engine
136+
* @param tablePath Path to the Delta table
137+
* @param filesToRemove Map of file paths to their sizes
138+
* @param histogram The histogram to update by removing file sizes
139+
*/
140+
private def removeFiles(
141+
engine: Engine,
142+
tablePath: String,
143+
filesToRemove: Map[String, Long],
144+
histogram: FileSizeHistogram): Unit = {
145+
146+
val txn = createTxn(engine, tablePath, maxRetries = 0)
147+
148+
val actionsToCommit = filesToRemove.map { case (path, size) =>
149+
histogram.remove(size)
150+
GenerateIcebergCompatActionUtils.generateIcebergCompatWriterV1RemoveAction(
151+
txn.getTransactionState(engine),
152+
generateDataFileStatus(tablePath, path, fileSize = size),
153+
Collections.emptyMap(),
154+
true /* dataChange */ )
155+
}.toSeq
156+
157+
commitTransaction(
158+
txn,
159+
engine,
160+
inMemoryIterable(toCloseableIterator(actionsToCommit.asJava.iterator())))
161+
}
162+
163+
override def commitTransaction(
164+
txn: Transaction,
165+
engine: Engine,
166+
dataActions: CloseableIterable[Row]): TransactionCommitResult = {
167+
val result = txn.commit(engine, dataActions)
168+
result.getPostCommitHooks
169+
.stream()
170+
.forEach(hook => hook.threadSafeInvoke(engine))
171+
result
172+
}
173+
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CommitIcebergActionSuite.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,6 @@ class CommitIcebergActionSuite extends DeltaTableWriteSuiteBase {
3939
private val tblPropertiesIcebergWriterCompatV1Enabled = Map(
4040
TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED.getKey -> "true")
4141

42-
private def generateDataFileStatus(
43-
tablePath: String,
44-
fileName: String,
45-
includeStats: Boolean = true): DataFileStatus = {
46-
val filePath = defaultEngine.getFileSystemClient.resolvePath(tablePath + "/" + fileName)
47-
new DataFileStatus(
48-
filePath,
49-
1000,
50-
10,
51-
if (includeStats) {
52-
Optional.of(new DataFileStatistics(
53-
100,
54-
emptyMap(),
55-
emptyMap(),
56-
emptyMap()))
57-
} else Optional.empty())
58-
}
59-
6042
/* ----- Error cases ----- */
6143

6244
test("requires that maxRetries = 0") {

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package io.delta.kernel.defaults
1717

1818
import java.io.File
1919
import java.nio.file.{Files, Paths}
20+
import java.util.Collections.emptyMap
2021
import java.util.Optional
2122

2223
import scala.collection.JavaConverters._
@@ -39,9 +40,10 @@ import io.delta.kernel.internal.util.{Clock, FileNames, VectorUtils}
3940
import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames
4041
import io.delta.kernel.internal.util.Utils.singletonCloseableIterator
4142
import io.delta.kernel.internal.util.Utils.toCloseableIterator
43+
import io.delta.kernel.statistics.DataFileStatistics
4244
import io.delta.kernel.types.IntegerType.INTEGER
4345
import io.delta.kernel.types.StructType
44-
import io.delta.kernel.utils.{CloseableIterable, CloseableIterator, FileStatus}
46+
import io.delta.kernel.utils.{CloseableIterable, CloseableIterator, DataFileStatus, FileStatus}
4547
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable}
4648

4749
import org.apache.spark.sql.delta.VersionNotFoundException
@@ -570,4 +572,23 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
570572
dataActions: CloseableIterable[Row]): TransactionCommitResult = {
571573
txn.commit(engine, dataActions)
572574
}
575+
576+
protected def generateDataFileStatus(
577+
tablePath: String,
578+
fileName: String,
579+
fileSize: Long = 1000,
580+
includeStats: Boolean = true): DataFileStatus = {
581+
val filePath = defaultEngine.getFileSystemClient.resolvePath(tablePath + "/" + fileName)
582+
new DataFileStatus(
583+
filePath,
584+
fileSize,
585+
10,
586+
if (includeStats) {
587+
Optional.of(new DataFileStatistics(
588+
100,
589+
emptyMap(),
590+
emptyMap(),
591+
emptyMap()))
592+
} else Optional.empty())
593+
}
573594
}

sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ private[sharing] class DeltaSharingDataSource
233233

234234
val userInputResponseFormat = options.options.get(DeltaSharingOptions.RESPONSE_FORMAT)
235235
if (userInputResponseFormat.isEmpty && !options.readChangeFeed) {
236-
return autoResolveBaseRelationForSnapshotQuery(options)
236+
return autoResolveBaseRelationForSnapshotQuery(options, sqlContext)
237237
}
238238

239239
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
@@ -311,16 +311,30 @@ private[sharing] class DeltaSharingDataSource
311311
* shared table), and then decide the code path on the client side.
312312
*/
313313
private def autoResolveBaseRelationForSnapshotQuery(
314-
options: DeltaSharingOptions): BaseRelation = {
314+
options: DeltaSharingOptions,
315+
sqlContext: SQLContext): BaseRelation = {
315316
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
316317
val parsedPath = DeltaSharingRestClient.parsePath(path)
317318

319+
val responseFormat = {
320+
if (sqlContext.sparkSession.sessionState.conf.getConf(
321+
DeltaSQLConf.DELTA_SHARING_FORCE_DELTA_FORMAT)) {
322+
// If the Spark config is enabled, force the query to return results in Delta format.
323+
// This is primarily used for testing the Delta format code path, even when the source
324+
// table doesn't include advanced features like deletion vector.
325+
DeltaSharingOptions.RESPONSE_FORMAT_DELTA
326+
}
327+
else {
328+
s"${DeltaSharingOptions.RESPONSE_FORMAT_PARQUET}," +
329+
s"${DeltaSharingOptions.RESPONSE_FORMAT_DELTA}"
330+
}
331+
}
332+
318333
val client = DeltaSharingRestClient(
319334
profileFile = parsedPath.profileFile,
320335
forStreaming = false,
321336
// Indicating that the client is able to process response format in both parquet and delta.
322-
responseFormat = s"${DeltaSharingOptions.RESPONSE_FORMAT_PARQUET}," +
323-
s"${DeltaSharingOptions.RESPONSE_FORMAT_DELTA}",
337+
responseFormat = responseFormat,
324338
// comma separated delta reader features, used to tell delta sharing server what delta
325339
// reader features the client is able to process.
326340
readerFeatures = DeltaSharingUtils.SUPPORTED_READER_FEATURES.mkString(",")

0 commit comments

Comments
 (0)