Skip to content

Commit e510df7

Browse files
[Kernel] Return txn metrics as part of TransactionCommitResult (delta-io#4353)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Return txn metrics as part of TransactionCommitResult ## How was this patch tested? Updates test suite to check these metrics as well ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent f0b516c commit e510df7

File tree

4 files changed

+63
-33
lines changed

4 files changed

+63
-33
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
*/
1616
package io.delta.kernel;
1717

18+
import static java.util.Objects.requireNonNull;
19+
1820
import io.delta.kernel.annotation.Evolving;
1921
import io.delta.kernel.engine.Engine;
2022
import io.delta.kernel.hook.PostCommitHook;
23+
import io.delta.kernel.metrics.TransactionMetricsResult;
2124
import io.delta.kernel.utils.CloseableIterable;
2225
import java.util.List;
2326

@@ -31,10 +34,15 @@
3134
public class TransactionCommitResult {
3235
private final long version;
3336
private final List<PostCommitHook> postCommitHooks;
37+
private final TransactionMetricsResult transactionMetrics;
3438

35-
public TransactionCommitResult(long version, List<PostCommitHook> postCommitHooks) {
39+
public TransactionCommitResult(
40+
long version,
41+
List<PostCommitHook> postCommitHooks,
42+
TransactionMetricsResult transactionMetrics) {
3643
this.version = version;
37-
this.postCommitHooks = postCommitHooks;
44+
this.postCommitHooks = requireNonNull(postCommitHooks);
45+
this.transactionMetrics = requireNonNull(transactionMetrics);
3846
}
3947

4048
/**
@@ -62,4 +70,9 @@ public long getVersion() {
6270
public List<PostCommitHook> getPostCommitHooks() {
6371
return postCommitHooks;
6472
}
73+
74+
/** @return the metrics for this transaction */
75+
public TransactionMetricsResult getTransactionMetrics() {
76+
return transactionMetrics;
77+
}
6578
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -252,15 +252,20 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
252252
checkState(!closed, "Transaction is already attempted to commit. Create a new transaction.");
253253
TransactionMetrics transactionMetrics = new TransactionMetrics();
254254
try {
255-
TransactionCommitResult result =
255+
long committedVersion =
256256
transactionMetrics.totalCommitTimer.time(
257257
() -> commitWithRetry(engine, dataActions, transactionMetrics));
258258
recordTransactionReport(
259259
engine,
260-
Optional.of(result.getVersion()) /* committedVersion */,
260+
Optional.of(committedVersion),
261261
transactionMetrics,
262262
Optional.empty() /* exception */);
263-
return result;
263+
TransactionMetricsResult txnMetricsCaptured =
264+
transactionMetrics.captureTransactionMetricsResult();
265+
return new TransactionCommitResult(
266+
committedVersion,
267+
generatePostCommitHooks(committedVersion, txnMetricsCaptured),
268+
txnMetricsCaptured);
264269
} catch (Exception e) {
265270
recordTransactionReport(
266271
engine,
@@ -271,7 +276,7 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
271276
}
272277
}
273278

274-
private TransactionCommitResult commitWithRetry(
279+
private long commitWithRetry(
275280
Engine engine, CloseableIterable<Row> dataActions, TransactionMetrics transactionMetrics) {
276281
try {
277282
long commitAsVersion = readSnapshot.getVersion() + 1;
@@ -391,7 +396,7 @@ private Optional<Long> getUpdatedInCommitTimestampAfterConflict(
391396
return attemptInCommitTimestamp;
392397
}
393398

394-
private TransactionCommitResult doCommit(
399+
private long doCommit(
395400
Engine engine,
396401
long commitAsVersion,
397402
CommitInfo attemptCommitInfo,
@@ -462,16 +467,7 @@ private TransactionCommitResult doCommit(
462467
"Write file actions to JSON log file `%s`",
463468
FileNames.deltaFile(logPath, commitAsVersion));
464469

465-
List<PostCommitHook> postCommitHooks = new ArrayList<>();
466-
if (isReadyForCheckpoint(commitAsVersion)) {
467-
postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion));
468-
}
469-
470-
buildPostCommitCrcInfoIfCurrentCrcAvailable(
471-
commitAsVersion, transactionMetrics.captureTransactionMetricsResult())
472-
.ifPresent(crcInfo -> postCommitHooks.add(new ChecksumSimpleHook(crcInfo, logPath)));
473-
474-
return new TransactionCommitResult(commitAsVersion, postCommitHooks);
470+
return commitAsVersion;
475471
} catch (FileAlreadyExistsException e) {
476472
throw e;
477473
} catch (IOException ioe) {
@@ -485,6 +481,19 @@ public boolean isBlindAppend() {
485481
return true;
486482
}
487483

484+
private List<PostCommitHook> generatePostCommitHooks(
485+
long committedVersion, TransactionMetricsResult txnMetrics) {
486+
List<PostCommitHook> postCommitHooks = new ArrayList<>();
487+
if (isReadyForCheckpoint(committedVersion)) {
488+
postCommitHooks.add(new CheckpointHook(dataPath, committedVersion));
489+
}
490+
491+
buildPostCommitCrcInfoIfCurrentCrcAvailable(committedVersion, txnMetrics)
492+
.ifPresent(crcInfo -> postCommitHooks.add(new ChecksumSimpleHook(crcInfo, logPath)));
493+
494+
return postCommitHooks;
495+
}
496+
488497
/**
489498
* Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can
490499
* result in an additional file read and that this will only happen if ICT is enabled.

kernel/kernel-api/src/main/java/io/delta/kernel/metrics/TransactionMetricsResult.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,6 @@ public interface TransactionMetricsResult {
5757
* this metric may be incomplete.
5858
*/
5959
long getTotalAddFilesSizeInBytes();
60+
61+
// TODO add fileSizeHistogram
6062
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/TransactionReportSuite.scala

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import io.delta.kernel.internal.actions.{RemoveFile, SingleAction}
2727
import io.delta.kernel.internal.data.GenericRow
2828
import io.delta.kernel.internal.metrics.Timer
2929
import io.delta.kernel.internal.util.Utils
30-
import io.delta.kernel.metrics.{SnapshotReport, TransactionReport}
30+
import io.delta.kernel.metrics.{SnapshotReport, TransactionMetricsResult, TransactionReport}
3131
import io.delta.kernel.types.{IntegerType, StructType}
3232
import io.delta.kernel.utils.{CloseableIterable, CloseableIterator, DataFileStatus}
3333

@@ -51,15 +51,19 @@ class TransactionReportSuite extends AnyFunSuite with MetricsReportTestUtils {
5151
def getTransactionAndSnapshotReport(
5252
createTransaction: Engine => Transaction,
5353
generateCommitActions: (Transaction, Engine) => CloseableIterable[Row],
54-
expectException: Boolean)
54+
expectException: Boolean,
55+
validateTransactionMetrics: (TransactionMetricsResult, Long) => Unit)
5556
: (TransactionReport, Long, Option[SnapshotReport], Option[Exception]) = {
5657
val timer = new Timer()
5758

5859
val (metricsReports, exception) = collectMetricsReports(
5960
engine => {
6061
val transaction = createTransaction(engine)
6162
val actionsToCommit = generateCommitActions(transaction, engine)
62-
timer.time(() => transaction.commit(engine, actionsToCommit)) // Time the actual operation
63+
val txnCommitResult = timer.time(() =>
64+
transaction.commit(engine, actionsToCommit)) // Time the actual operation
65+
// Validate the txn metrics returned in txnCommitResult
66+
validateTransactionMetrics(txnCommitResult.getTransactionMetrics, timer.totalDurationNs())
6367
},
6468
expectException)
6569

@@ -109,6 +113,18 @@ class TransactionReportSuite extends AnyFunSuite with MetricsReportTestUtils {
109113
operation: Operation = Operation.MANUAL_UPDATE): Unit = {
110114
// scalastyle:on
111115
assert(expectException == expectedCommitVersion.isEmpty)
116+
def validateTransactionMetrics(txnMetrics: TransactionMetricsResult, duration: Long): Unit = {
117+
// Since we cannot know the actual duration of commit we sanity check that they are > 0 and
118+
// less than the total operation duration
119+
assert(txnMetrics.getTotalCommitDurationNs > 0)
120+
assert(txnMetrics.getTotalCommitDurationNs < duration)
121+
122+
assert(txnMetrics.getNumCommitAttempts == expectedNumAttempts)
123+
assert(txnMetrics.getNumAddFiles == expectedNumAddFiles)
124+
assert(txnMetrics.getTotalAddFilesSizeInBytes == expectedTotalAddFilesSizeInBytes)
125+
assert(txnMetrics.getNumRemoveFiles == expectedNumRemoveFiles)
126+
assert(txnMetrics.getNumTotalActions == expectedNumTotalActions)
127+
}
112128

113129
val (transactionReport, duration, snapshotReportOpt, exception) =
114130
getTransactionAndSnapshotReport(
@@ -117,7 +133,8 @@ class TransactionReportSuite extends AnyFunSuite with MetricsReportTestUtils {
117133
Table.forPath(engine, path).createTransactionBuilder(engine, engineInfo, operation),
118134
engine),
119135
generateCommitActions,
120-
expectException)
136+
expectException,
137+
validateTransactionMetrics)
121138

122139
// Verify contents
123140
assert(transactionReport.getTablePath == defaultEngine.getFileSystemClient.resolvePath(path))
@@ -143,18 +160,7 @@ class TransactionReportSuite extends AnyFunSuite with MetricsReportTestUtils {
143160
})
144161
}
145162
assert(transactionReport.getCommittedVersion.toScala == expectedCommitVersion)
146-
147-
// Since we cannot know the actual duration of commit we sanity check that they are > 0 and
148-
// less than the total operation duration
149-
assert(transactionReport.getTransactionMetrics.getTotalCommitDurationNs > 0)
150-
assert(transactionReport.getTransactionMetrics.getTotalCommitDurationNs < duration)
151-
152-
assert(transactionReport.getTransactionMetrics.getNumCommitAttempts == expectedNumAttempts)
153-
assert(transactionReport.getTransactionMetrics.getNumAddFiles == expectedNumAddFiles)
154-
assert(transactionReport.getTransactionMetrics.getTotalAddFilesSizeInBytes
155-
== expectedTotalAddFilesSizeInBytes)
156-
assert(transactionReport.getTransactionMetrics.getNumRemoveFiles == expectedNumRemoveFiles)
157-
assert(transactionReport.getTransactionMetrics.getNumTotalActions == expectedNumTotalActions)
163+
validateTransactionMetrics(transactionReport.getTransactionMetrics, duration)
158164
}
159165

160166
def generateAppendActions(fileStatusIter: CloseableIterator[DataFileStatus])(

0 commit comments

Comments
 (0)