Skip to content

Commit 9ca77e1

Browse files
authored
[kernel] support passing baseRowId and defaultRowCommitVersion when generate IcebergCompatWriterV3AddAction (delta-io#4878)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Support passing `baseRowId` and `defaultRowCommitVersion` when generating IcebergWriterCompatV3 addFile/removeFile. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Unit tests ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent e024622 commit 9ca77e1

File tree

5 files changed

+82
-16
lines changed

5 files changed

+82
-16
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.Collections;
4646
import java.util.List;
4747
import java.util.Map;
48+
import java.util.Optional;
4849

4950
/**
5051
* Represents a transaction to mutate a Delta table.
@@ -267,7 +268,9 @@ static CloseableIterator<Row> generateAppendActions(
267268
((DataWriteContextImpl) dataWriteContext).getPartitionValues(),
268269
true /* dataChange */,
269270
// TODO: populate tags in generateAppendActions
270-
Collections.emptyMap() /* tags */);
271+
Collections.emptyMap() /* tags */,
272+
Optional.empty() /* baseRowId */,
273+
Optional.empty() /* defaultRowCommitVersion */);
271274
return SingleAction.createAddFileSingleAction(addFileRow.toRow());
272275
});
273276
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ public static AddFile convertDataFileStatus(
8080
DataFileStatus dataFileStatus,
8181
Map<String, Literal> partitionValues,
8282
boolean dataChange,
83-
Map<String, String> tags) {
83+
Map<String, String> tags,
84+
Optional<Long> baseRowId,
85+
Optional<Long> defaultRowCommitVersion) {
8486
Optional<MapValue> tagMapValue =
8587
!tags.isEmpty() ? Optional.of(VectorUtils.stringStringMapValue(tags)) : Optional.empty();
8688
Row row =
@@ -93,8 +95,8 @@ public static AddFile convertDataFileStatus(
9395
dataChange,
9496
Optional.empty(), // deletionVector
9597
tagMapValue, // tags
96-
Optional.empty(), // baseRowId
97-
Optional.empty(), // defaultRowCommitVersion
98+
baseRowId,
99+
defaultRowCommitVersion,
98100
dataFileStatus.getStatistics());
99101

100102
return new AddFile(row);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/GenerateIcebergCompatActionUtils.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ public static Row generateIcebergCompatWriterV1AddAction(
9595
fileStatus,
9696
partitionValues,
9797
dataChange,
98-
tags);
98+
tags,
99+
Optional.empty() /* baseRowId */,
100+
Optional.empty() /* defaultRowCommitVersion */);
99101
return SingleAction.createAddFileSingleAction(addFile.toRow());
100102
}
101103

@@ -128,7 +130,9 @@ public static Row generateIcebergCompatWriterV3AddAction(
128130
DataFileStatus fileStatus,
129131
Map<String, Literal> partitionValues,
130132
boolean dataChange,
131-
Map<String, String> tags) {
133+
Map<String, String> tags,
134+
Optional<Long> baseRowId,
135+
Optional<Long> defaultRowCommitVersion) {
132136
Map<String, String> configuration = TransactionStateRow.getConfiguration(transactionState);
133137

134138
/* ----- Validate that this is a valid usage of this API ----- */
@@ -155,7 +159,9 @@ public static Row generateIcebergCompatWriterV3AddAction(
155159
fileStatus,
156160
partitionValues,
157161
dataChange,
158-
tags);
162+
tags,
163+
baseRowId,
164+
defaultRowCommitVersion);
159165
return SingleAction.createAddFileSingleAction(addFile.toRow());
160166
}
161167

@@ -202,7 +208,9 @@ public static Row generateIcebergCompatWriterV1RemoveAction(
202208
tableRoot,
203209
fileStatus,
204210
partitionValues,
205-
dataChange);
211+
dataChange,
212+
Optional.empty() /* baseRowId */,
213+
Optional.empty() /* defaultRowCommitVersion */);
206214
return SingleAction.createRemoveFileSingleAction(removeFileRow);
207215
}
208216

@@ -226,7 +234,9 @@ public static Row generateIcebergCompatWriterV3RemoveAction(
226234
Row transactionState,
227235
DataFileStatus fileStatus,
228236
Map<String, Literal> partitionValues,
229-
boolean dataChange) {
237+
boolean dataChange,
238+
Optional<Long> baseRowId,
239+
Optional<Long> defaultRowCommitVersion) {
230240
Map<String, String> config = TransactionStateRow.getConfiguration(transactionState);
231241

232242
/* ----- Validate that this is a valid usage of this API ----- */
@@ -256,7 +266,9 @@ public static Row generateIcebergCompatWriterV3RemoveAction(
256266
tableRoot,
257267
fileStatus,
258268
partitionValues,
259-
dataChange);
269+
dataChange,
270+
Optional.empty() /* baseRowId */,
271+
Optional.empty() /* defaultRowCommitVersion */);
260272
return SingleAction.createRemoveFileSingleAction(removeFileRow);
261273
}
262274

@@ -345,15 +357,19 @@ public static Row convertRemoveDataFileStatus(
345357
URI tableRoot,
346358
DataFileStatus dataFileStatus,
347359
Map<String, Literal> partitionValues,
348-
boolean dataChange) {
360+
boolean dataChange,
361+
Optional<Long> baseRowId,
362+
Optional<Long> defaultRowCommitVersion) {
349363
return createRemoveFileRowWithExtendedFileMetadata(
350364
relativizePath(new Path(dataFileStatus.getPath()), tableRoot).toUri().toString(),
351365
dataFileStatus.getModificationTime(),
352366
dataChange,
353367
serializePartitionMap(partitionValues),
354368
dataFileStatus.getSize(),
355369
dataFileStatus.getStatistics(),
356-
physicalSchema);
370+
physicalSchema,
371+
baseRowId,
372+
defaultRowCommitVersion);
357373
}
358374

359375
@VisibleForTesting
@@ -364,7 +380,9 @@ public static Row createRemoveFileRowWithExtendedFileMetadata(
364380
MapValue partitionValues,
365381
long size,
366382
Optional<DataFileStatistics> stats,
367-
StructType physicalSchema) {
383+
StructType physicalSchema,
384+
Optional<Long> baseRowId,
385+
Optional<Long> defaultRowCommitVersion) {
368386
Map<Integer, Object> fieldMap = new HashMap<>();
369387
fieldMap.put(RemoveFile.FULL_SCHEMA.indexOf("path"), requireNonNull(path));
370388
fieldMap.put(RemoveFile.FULL_SCHEMA.indexOf("deletionTimestamp"), deletionTimestamp);
@@ -377,6 +395,10 @@ public static Row createRemoveFileRowWithExtendedFileMetadata(
377395
stat ->
378396
fieldMap.put(
379397
RemoveFile.FULL_SCHEMA.indexOf("stats"), stat.serializeAsJson(physicalSchema)));
398+
baseRowId.ifPresent(id -> fieldMap.put(RemoveFile.FULL_SCHEMA.indexOf("baseRowId"), id));
399+
defaultRowCommitVersion.ifPresent(
400+
version ->
401+
fieldMap.put(RemoveFile.FULL_SCHEMA.indexOf("defaultRowCommitVersion"), version));
380402
return new GenericRow(RemoveFile.FULL_SCHEMA, fieldMap);
381403
}
382404
}

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/actions/RemoveFileSuite.scala

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.delta.kernel.internal.actions
1717

18+
import java.lang.{Long => JLong}
1819
import java.util.Optional
1920

2021
import scala.collection.JavaConverters._
@@ -37,15 +38,24 @@ class RemoveFileSuite extends AnyFunSuite {
3738
dataChange: Boolean,
3839
partitionValues: Map[String, String],
3940
size: Long,
40-
stats: Option[String]): Row = {
41+
stats: Option[String],
42+
baseRowId: Option[Long] = Option.empty,
43+
defaultRowCommitVersion: Option[Long] = Option.empty): Row = {
44+
def toJavaOptional[T](option: Option[T]): Optional[T] = option match {
45+
case Some(value) => Optional.of(value)
46+
case None => Optional.empty()
47+
}
48+
4149
GenerateIcebergCompatActionUtils.createRemoveFileRowWithExtendedFileMetadata(
4250
path,
4351
deletionTimestamp,
4452
dataChange,
4553
stringStringMapValue(partitionValues.asJava),
4654
size,
4755
StatsUtils.deserializeFromJson(stats.getOrElse("")),
48-
null)
56+
null,
57+
toJavaOptional(baseRowId.asInstanceOf[Option[JLong]]),
58+
toJavaOptional(defaultRowCommitVersion.asInstanceOf[Option[JLong]]))
4959
}
5060

5161
test("getters can read RemoveFile's fields from the backing row") {
@@ -72,4 +82,31 @@ class RemoveFileSuite extends AnyFunSuite {
7282
assert(!removeFile.getBaseRowId.isPresent)
7383
assert(!removeFile.getDefaultRowCommitVersion.isPresent)
7484
}
85+
86+
test("getters can read RemoveFile's fields from the backing row with row tracking") {
87+
val removeFileRow = createTestRemoveFileRow(
88+
path = "test/path",
89+
deletionTimestamp = 1000L,
90+
dataChange = true,
91+
partitionValues = Map("a" -> "1"),
92+
size = 55555L,
93+
stats = Option("{\"numRecords\":100}"),
94+
baseRowId = Option(30L),
95+
defaultRowCommitVersion = Option(40L))
96+
97+
val removeFile = new RemoveFile(removeFileRow)
98+
assert(removeFile.getPath === "test/path")
99+
assert(removeFile.getDeletionTimestamp == Optional.of(1000L))
100+
assert(removeFile.getDataChange)
101+
assert(removeFile.getExtendedFileMetadata == Optional.of(true))
102+
assert(removeFile.getPartitionValues.isPresent &&
103+
VectorUtils.toJavaMap(removeFile.getPartitionValues.get).asScala.equals(Map("a" -> "1")))
104+
assert(removeFile.getSize == Optional.of(55555L))
105+
assert(removeFile.getStats.isPresent &&
106+
removeFile.getStats.get.serializeAsJson(null) == "{\"numRecords\":100}")
107+
assert(removeFile.getBaseRowId === Optional.of(30L))
108+
assert(removeFile.getDefaultRowCommitVersion === Optional.of(40L))
109+
assert(!removeFile.getTags.isPresent)
110+
assert(!removeFile.getDeletionVector.isPresent)
111+
}
75112
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/TransactionReportSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,9 @@ class TransactionReportSuite extends DeltaTableWriteSuiteBase with MetricsReport
222222
new Path(TransactionStateRow.getTablePath(trans.getTransactionState(engine))).toUri,
223223
fileStatus,
224224
Collections.emptyMap(), // partitionValues
225-
true // dataChange
225+
true, // dataChange,
226+
Optional.empty(), // baseRowId
227+
Optional.empty() // defaultRowCommitVersion
226228
))
227229
})
228230
}

0 commit comments

Comments
 (0)