Skip to content

Commit 87ba08f

Browse files
allisonport-dbhuan233usc
authored andcommitted
[Kernel] Read the fileSizeHistogram from CRCInfo and propagate it through transaction metrics (delta-io#4403)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description Reads the fileSizeHistogram from CRC and add it as part of transaction metrics. Updates it as the transaction is executed. Note, this is a little weird since it only adds read support and no write support, so we only actually have histograms available for new tables in E2E tests. But delta-io#4328 will add write support and fill out the full feature afterward. Documented throughout the changes needed when adding write support. ## How was this patch tested? Updates tests. ## Does this PR introduce _any_ user-facing changes? No
1 parent b088402 commit 87ba08f

File tree

10 files changed

+258
-43
lines changed

10 files changed

+258
-43
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,13 @@ public Protocol getProtocol() {
257257
public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions)
258258
throws ConcurrentWriteException {
259259
checkState(!closed, "Transaction is already attempted to commit. Create a new transaction.");
260-
TransactionMetrics transactionMetrics = new TransactionMetrics();
260+
// For a new table or when fileSizeHistogram is available in the CRC of the readSnapshot
261+
// we update it in the commit. When it is not available we do nothing.
262+
TransactionMetrics transactionMetrics =
263+
isNewTable
264+
? TransactionMetrics.forNewTable()
265+
: TransactionMetrics.withExistingTableFileSizeHistogram(
266+
readSnapshot.getCurrentCrcInfo().flatMap(CRCInfo::getFileSizeHistogram));
261267
try {
262268
long committedVersion =
263269
transactionMetrics.totalCommitTimer.time(
@@ -332,6 +338,9 @@ private long commitWithRetry(
332338
commitAsVersion = rebaseState.getLatestVersion() + 1;
333339
dataActions = rebaseState.getUpdatedDataActions();
334340
domainMetadatas = Optional.of(rebaseState.getUpdatedDomainMetadatas());
341+
// Action counters may be partially incremented from previous tries, reset the counters
342+
// to 0 and drop fileSizeHistogram
343+
transactionMetrics.resetActionMetricsForRetry();
335344
}
336345
}
337346
numTries++;
@@ -448,8 +457,6 @@ private long doCommit(
448457
}
449458
}
450459

451-
// Action counters may be partially incremented from previous tries, reset the counters to 0
452-
transactionMetrics.resetCounters();
453460
boolean isAppendOnlyTable = APPEND_ONLY_ENABLED.fromMetadata(metadata);
454461

455462
// Write the staged data to a delta file
@@ -488,17 +495,12 @@ private long doCommit(
488495
private void incrementMetricsForFileActionRow(TransactionMetrics txnMetrics, Row fileActionRow) {
489496
txnMetrics.totalActionsCounter.increment();
490497
if (!fileActionRow.isNullAt(ADD_FILE_ORDINAL)) {
491-
txnMetrics.addFilesCounter.increment();
492-
txnMetrics.addFilesSizeInBytesCounter.increment(
493-
new AddFile(fileActionRow.getStruct(ADD_FILE_ORDINAL)).getSize());
494-
// TODO increment fileSizeHistogram
498+
txnMetrics.updateForAddFile(new AddFile(fileActionRow.getStruct(ADD_FILE_ORDINAL)).getSize());
495499
} else if (!fileActionRow.isNullAt(REMOVE_FILE_ORDINAL)) {
496-
txnMetrics.removeFilesCounter.increment();
497500
RemoveFile removeFile = new RemoveFile(fileActionRow.getStruct(REMOVE_FILE_ORDINAL));
498-
long fileSize =
501+
long removeFileSize =
499502
removeFile.getSize().orElseThrow(DeltaErrorsInternal::missingRemoveFileSizeDuringCommit);
500-
txnMetrics.removeFilesSizeInBytesCounter.increment(fileSize);
501-
// TODO decrement fileSizeHistogram
503+
txnMetrics.updateForRemoveFile(removeFileSize);
502504
}
503505
}
504506

@@ -585,14 +587,18 @@ private void recordTransactionReport(
585587
private Optional<CRCInfo> buildPostCommitCrcInfoIfCurrentCrcAvailable(
586588
long commitAtVersion, TransactionMetricsResult metricsResult) {
587589
if (isNewTable) {
590+
// We don't need to worry about conflicting transaction here since new tables always commit
591+
// metadata (and thus fail any conflicts)
588592
return Optional.of(
589593
new CRCInfo(
590594
commitAtVersion,
591595
metadata,
592596
protocol,
593597
metricsResult.getTotalAddFilesSizeInBytes(),
594598
metricsResult.getNumAddFiles(),
595-
Optional.of(txnId.toString())));
599+
Optional.of(txnId.toString()),
600+
Optional.empty() // once we support writing CRC populate here
601+
));
596602
}
597603

598604
return currentCrcInfo
@@ -607,7 +613,9 @@ private Optional<CRCInfo> buildPostCommitCrcInfoIfCurrentCrcAvailable(
607613
// TODO: handle RemoveFiles for calculating table size and num of files.
608614
lastCrcInfo.getTableSizeBytes() + metricsResult.getTotalAddFilesSizeInBytes(),
609615
lastCrcInfo.getNumFiles() + metricsResult.getNumAddFiles(),
610-
Optional.of(txnId.toString())));
616+
Optional.of(txnId.toString()),
617+
Optional.empty() // once we support writing CRC populate here
618+
));
611619
}
612620

613621
/**

kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/CRCInfo.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.delta.kernel.internal.actions.Metadata;
2525
import io.delta.kernel.internal.actions.Protocol;
2626
import io.delta.kernel.internal.data.GenericRow;
27+
import io.delta.kernel.internal.stats.FileSizeHistogram;
2728
import io.delta.kernel.internal.util.InternalUtils;
2829
import io.delta.kernel.types.LongType;
2930
import io.delta.kernel.types.StringType;
@@ -46,6 +47,7 @@ public class CRCInfo {
4647
private static final String METADATA = "metadata";
4748
private static final String PROTOCOL = "protocol";
4849
private static final String TXN_ID = "txnId";
50+
private static final String FILE_SIZE_HISTOGRAM = "fileSizeHistogram";
4951

5052
public static final StructType CRC_FILE_SCHEMA =
5153
new StructType()
@@ -55,7 +57,8 @@ public class CRCInfo {
5557
.add(NUM_PROTOCOL, LongType.LONG)
5658
.add(METADATA, Metadata.FULL_SCHEMA)
5759
.add(PROTOCOL, Protocol.FULL_SCHEMA)
58-
.add(TXN_ID, StringType.STRING, /*nullable*/ true);
60+
.add(TXN_ID, StringType.STRING, /*nullable*/ true)
61+
.add(FILE_SIZE_HISTOGRAM, FileSizeHistogram.FULL_SCHEMA, /*nullable*/ true);
5962

6063
public static Optional<CRCInfo> fromColumnarBatch(
6164
long version, ColumnarBatch batch, int rowId, String crcFilePath) {
@@ -79,13 +82,18 @@ public static Optional<CRCInfo> fromColumnarBatch(
7982
txnIdColumnVector.isNullAt(rowId)
8083
? Optional.empty()
8184
: Optional.of(txnIdColumnVector.getString(rowId));
85+
Optional<FileSizeHistogram> fileSizeHistogram =
86+
FileSizeHistogram.fromColumnVector(
87+
batch.getColumnVector(getSchemaIndex(FILE_SIZE_HISTOGRAM)), rowId);
8288

8389
// protocol and metadata are nullable per fromColumnVector's implementation.
8490
if (protocol == null || metadata == null) {
8591
logger.warn("Invalid checksum file missing protocol and/or metadata: {}", crcFilePath);
8692
return Optional.empty();
8793
}
88-
return Optional.of(new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txnId));
94+
return Optional.of(
95+
new CRCInfo(
96+
version, metadata, protocol, tableSizeBytes, numFiles, txnId, fileSizeHistogram));
8997
}
9098

9199
private final long version;
@@ -94,14 +102,16 @@ public static Optional<CRCInfo> fromColumnarBatch(
94102
private final long tableSizeBytes;
95103
private final long numFiles;
96104
private final Optional<String> txnId;
105+
private final Optional<FileSizeHistogram> fileSizeHistogram;
97106

98107
public CRCInfo(
99108
long version,
100109
Metadata metadata,
101110
Protocol protocol,
102111
long tableSizeBytes,
103112
long numFiles,
104-
Optional<String> txnId) {
113+
Optional<String> txnId,
114+
Optional<FileSizeHistogram> fileSizeHistogram) {
105115
checkArgument(tableSizeBytes >= 0);
106116
checkArgument(numFiles >= 0);
107117
this.version = version;
@@ -110,6 +120,7 @@ public CRCInfo(
110120
this.tableSizeBytes = tableSizeBytes;
111121
this.numFiles = numFiles;
112122
this.txnId = requireNonNull(txnId);
123+
this.fileSizeHistogram = requireNonNull(fileSizeHistogram);
113124
}
114125

115126
/** The version of the Delta table that this CRCInfo represents. */
@@ -139,6 +150,11 @@ public Optional<String> getTxnId() {
139150
return txnId;
140151
}
141152

153+
/** The {@link FileSizeHistogram} stored in this CRCInfo. */
154+
public Optional<FileSizeHistogram> getFileSizeHistogram() {
155+
return fileSizeHistogram;
156+
}
157+
142158
/**
143159
* Encode as a {@link Row} object with the schema {@link CRCInfo#CRC_FILE_SCHEMA}.
144160
*
@@ -156,12 +172,14 @@ public Row toRow() {
156172

157173
// Add optional fields
158174
txnId.ifPresent(txn -> values.put(getSchemaIndex(TXN_ID), txn));
175+
// TODO write fileSizeHistogram here
159176
return new GenericRow(CRC_FILE_SCHEMA, values);
160177
}
161178

162179
@Override
163180
public int hashCode() {
164-
return Objects.hash(version, metadata, protocol, tableSizeBytes, numFiles, txnId);
181+
return Objects.hash(
182+
version, metadata, protocol, tableSizeBytes, numFiles, txnId, fileSizeHistogram);
165183
}
166184

167185
@Override
@@ -175,7 +193,8 @@ public boolean equals(Object o) {
175193
&& numFiles == other.numFiles
176194
&& metadata.equals(other.metadata)
177195
&& protocol.equals(other.protocol)
178-
&& txnId.equals(other.txnId);
196+
&& txnId.equals(other.txnId)
197+
&& fileSizeHistogram.equals(other.fileSizeHistogram);
179198
}
180199

181200
private static int getSchemaIndex(String fieldName) {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/TransactionMetrics.java

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
*/
1616
package io.delta.kernel.internal.metrics;
1717

18+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
19+
20+
import io.delta.kernel.internal.stats.FileSizeHistogram;
21+
import io.delta.kernel.metrics.FileSizeHistogramResult;
1822
import io.delta.kernel.metrics.TransactionMetricsResult;
23+
import java.util.Optional;
1924

2025
/**
2126
* Stores the metrics for an ongoing transaction. These metrics are updated and recorded throughout
@@ -27,32 +32,92 @@
2732
*/
2833
public class TransactionMetrics {
2934

35+
/**
36+
* @return a fresh TransactionMetrics object with a default tableFileSizeHistogram (with 0 counts)
37+
*/
38+
public static TransactionMetrics forNewTable() {
39+
return new TransactionMetrics(Optional.of(FileSizeHistogram.createDefaultHistogram()));
40+
}
41+
42+
/**
43+
* @return a fresh TransactionMetrics object with an initial tableFileSizeHistogram as provided
44+
*/
45+
public static TransactionMetrics withExistingTableFileSizeHistogram(
46+
Optional<FileSizeHistogram> tableFileSizeHistogram) {
47+
return new TransactionMetrics(tableFileSizeHistogram);
48+
}
49+
3050
public final Timer totalCommitTimer = new Timer();
3151

3252
public final Counter commitAttemptsCounter = new Counter();
3353

34-
public final Counter addFilesCounter = new Counter();
54+
private final Counter addFilesCounter = new Counter();
3555

36-
public final Counter removeFilesCounter = new Counter();
56+
private final Counter removeFilesCounter = new Counter();
3757

3858
public final Counter totalActionsCounter = new Counter();
3959

40-
public final Counter addFilesSizeInBytesCounter = new Counter();
60+
private final Counter addFilesSizeInBytesCounter = new Counter();
4161

42-
public final Counter removeFilesSizeInBytesCounter = new Counter();
62+
private final Counter removeFilesSizeInBytesCounter = new Counter();
63+
64+
private Optional<FileSizeHistogram> tableFileSizeHistogram;
65+
66+
private TransactionMetrics(Optional<FileSizeHistogram> tableFileSizeHistogram) {
67+
this.tableFileSizeHistogram = tableFileSizeHistogram;
68+
}
4369

4470
/**
45-
* Resets the action counters (addFilesCounter, removeFilesCounter and totalActionsCounter) to 0.
46-
* Action counters may be partially incremented if an action iterator is not read to completion
47-
* (i.e. if an exception interrupts a file write). This allows us to reset the counters so that we
48-
* can increment them correctly from 0 on a retry.
71+
* Updates the metrics for a seen AddFile with size {@code addFileSize}. Specifically, updates
72+
* addFilesCounter, addFilesSizeInBytesCounter, and tableFileSizeHistogram. Note, it does NOT
73+
* increment totalActionsCounter, this needs to be done separately.
74+
*
75+
* @param addFileSize the size of the add file to update the metrics for
4976
*/
50-
public void resetCounters() {
77+
public void updateForAddFile(long addFileSize) {
78+
checkArgument(addFileSize >= 0, "File size must be non-negative, got %s", addFileSize);
79+
addFilesCounter.increment();
80+
addFilesSizeInBytesCounter.increment(addFileSize);
81+
tableFileSizeHistogram.ifPresent(histogram -> histogram.insert(addFileSize));
82+
}
83+
84+
/**
85+
* Updates the metrics for a seen RemoveFile with size {@code removeFileSize}. Specifically,
86+
* updates removeFilesCounter, removeFilesSizeInBytesCounter, and tableFileSizeHistogram. Note, it
87+
* does NOT increment totalActionsCounter, this needs to be done separately.
88+
*
89+
* @param removeFileSize the size of the remove file to update the metrics for
90+
*/
91+
public void updateForRemoveFile(long removeFileSize) {
92+
checkArgument(removeFileSize >= 0, "File size must be non-negative, got %s", removeFileSize);
93+
removeFilesCounter.increment();
94+
removeFilesSizeInBytesCounter.increment(removeFileSize);
95+
tableFileSizeHistogram.ifPresent(histogram -> histogram.remove(removeFileSize));
96+
}
97+
98+
/**
99+
* Resets any action metrics for a failed commit to prepare them for retrying. Specifically,
100+
*
101+
* <ul>
102+
* <li>Resets addFilesCounter, removeFilesCounter, totalActionsCounter,
103+
* addFilesSizeInBytesCounter, and removeFilesSizeInBytesCounter to 0
104+
* <li>Sets tableFileSizeHistogram to be empty since we don't know the updated distribution
105+
* after the conflicting txn committed
106+
* </ul>
107+
*
108+
* Action counters / tableFileSizeHistogram may be partially incremented if an action iterator is
109+
* not read to completion (i.e. if an exception interrupts a file write). This allows us to reset
110+
* the counters so that we can increment them correctly from 0 on a retry.
111+
*/
112+
public void resetActionMetricsForRetry() {
51113
addFilesCounter.reset();
52114
addFilesSizeInBytesCounter.reset();
53115
removeFilesCounter.reset();
54116
totalActionsCounter.reset();
55117
removeFilesSizeInBytesCounter.reset();
118+
// For now, on retry we set tableFileSizeHistogram = Optional.empty() because we don't know the
119+
// correct state of tableFileSizeHistogram after conflicting transaction has committed
120+
tableFileSizeHistogram = Optional.empty();
56121
}
57122

58123
public TransactionMetricsResult captureTransactionMetricsResult() {
@@ -65,6 +130,8 @@ public TransactionMetricsResult captureTransactionMetricsResult() {
65130
final long numRemoveFiles = removeFilesCounter.value();
66131
final long numTotalActions = totalActionsCounter.value();
67132
final long totalRemoveFileSizeInBytes = removeFilesSizeInBytesCounter.value();
133+
final Optional<FileSizeHistogramResult> tableFileSizeHistogramResult =
134+
tableFileSizeHistogram.map(FileSizeHistogram::captureFileSizeHistogramResult);
68135

69136
@Override
70137
public long getTotalCommitDurationNs() {
@@ -100,6 +167,11 @@ public long getTotalAddFilesSizeInBytes() {
100167
public long getTotalRemoveFilesSizeInBytes() {
101168
return totalRemoveFileSizeInBytes;
102169
}
170+
171+
@Override
172+
public Optional<FileSizeHistogramResult> getTableFileSizeHistogram() {
173+
return tableFileSizeHistogramResult;
174+
}
103175
};
104176
}
105177

@@ -108,13 +180,14 @@ public String toString() {
108180
return String.format(
109181
"TransactionMetrics(totalCommitTimer=%s, commitAttemptsCounter=%s, addFilesCounter=%s, "
110182
+ "removeFilesCounter=%s, totalActionsCounter=%s, totalAddFilesSizeInBytes=%s,"
111-
+ "totalRemoveFilesSizeInBytes=%s)",
183+
+ "totalRemoveFilesSizeInBytes=%s, tableFileSizeHistogram=%s)",
112184
totalCommitTimer,
113185
commitAttemptsCounter,
114186
addFilesCounter,
115187
removeFilesCounter,
116188
totalActionsCounter,
117189
addFilesSizeInBytesCounter,
118-
removeFilesSizeInBytesCounter);
190+
removeFilesSizeInBytesCounter,
191+
tableFileSizeHistogram);
119192
}
120193
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/stats/FileSizeHistogram.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.delta.kernel.internal.data.GenericRow;
2424
import io.delta.kernel.internal.util.InternalUtils;
2525
import io.delta.kernel.internal.util.VectorUtils;
26+
import io.delta.kernel.metrics.FileSizeHistogramResult;
2627
import io.delta.kernel.types.ArrayType;
2728
import io.delta.kernel.types.LongType;
2829
import io.delta.kernel.types.StructType;
@@ -159,6 +160,8 @@ private static long[] createDefaultBinBoundaries() {
159160
return boundaries;
160161
}
161162

163+
// TODO factory from FileSizeHistogramResult
164+
162165
////////////////////////////////////
163166
// Member variables and methods //
164167
////////////////////////////////////
@@ -257,6 +260,30 @@ public Row toRow() {
257260
return new GenericRow(FULL_SCHEMA, value);
258261
}
259262

263+
public FileSizeHistogramResult captureFileSizeHistogramResult() {
264+
return new FileSizeHistogramResult() {
265+
final long[] copiedSortedBinBoundaries =
266+
Arrays.copyOf(sortedBinBoundaries, sortedBinBoundaries.length);
267+
final long[] copiedFileCounts = Arrays.copyOf(fileCounts, fileCounts.length);
268+
final long[] copiedTotalBytes = Arrays.copyOf(totalBytes, totalBytes.length);
269+
270+
@Override
271+
public long[] getSortedBinBoundaries() {
272+
return copiedSortedBinBoundaries;
273+
}
274+
275+
@Override
276+
public long[] getFileCounts() {
277+
return copiedFileCounts;
278+
}
279+
280+
@Override
281+
public long[] getTotalBytes() {
282+
return copiedTotalBytes;
283+
}
284+
};
285+
}
286+
260287
@Override
261288
public boolean equals(Object o) {
262289
if (this == o) return true;

0 commit comments

Comments
 (0)