Skip to content

Commit 2095e31

Browse files
authored
[Kernel] [#5] Introduce deletion vector support (delta-io#4869)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Exposes deletion vector descriptor field as a parameter of AddFile for generating v3 add actions. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Unit tests. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No.
1 parent 2d89954 commit 2095e31

File tree

8 files changed

+660
-398
lines changed

8 files changed

+660
-398
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ static CloseableIterator<Row> generateAppendActions(
270270
// TODO: populate tags in generateAppendActions
271271
Collections.emptyMap() /* tags */,
272272
Optional.empty() /* baseRowId */,
273-
Optional.empty() /* defaultRowCommitVersion */);
273+
Optional.empty() /* defaultRowCommitVersion */,
274+
Optional.empty() /* deletionVectorDescriptor */);
274275
return SingleAction.createAddFileSingleAction(addFileRow.toRow());
275276
});
276277
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/AddFile.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ public static AddFile convertDataFileStatus(
8282
boolean dataChange,
8383
Map<String, String> tags,
8484
Optional<Long> baseRowId,
85-
Optional<Long> defaultRowCommitVersion) {
85+
Optional<Long> defaultRowCommitVersion,
86+
Optional<DeletionVectorDescriptor> deletionVectorDescriptor) {
8687
Optional<MapValue> tagMapValue =
8788
!tags.isEmpty() ? Optional.of(VectorUtils.stringStringMapValue(tags)) : Optional.empty();
8889
Row row =
@@ -93,7 +94,7 @@ public static AddFile convertDataFileStatus(
9394
dataFileStatus.getSize(),
9495
dataFileStatus.getModificationTime(),
9596
dataChange,
96-
Optional.empty(), // deletionVector
97+
deletionVectorDescriptor,
9798
tagMapValue, // tags
9899
baseRowId,
99100
defaultRowCommitVersion,
@@ -118,8 +119,6 @@ public static Row createAddFileRow(
118119

119120
checkArgument(path != null, "path is not nullable");
120121
checkArgument(partitionValues != null, "partitionValues is not nullable");
121-
// TODO - Add support for DeletionVectorDescriptor
122-
checkArgument(!deletionVector.isPresent(), "DeletionVectorDescriptor is unsupported");
123122

124123
Map<Integer, Object> fieldMap = new HashMap<>();
125124
fieldMap.put(FULL_SCHEMA.indexOf("path"), path);
@@ -133,7 +132,11 @@ public static Row createAddFileRow(
133132
version -> fieldMap.put(FULL_SCHEMA.indexOf("defaultRowCommitVersion"), version));
134133
stats.ifPresent(
135134
stat -> fieldMap.put(FULL_SCHEMA.indexOf("stats"), stat.serializeAsJson(physicalSchema)));
136-
135+
deletionVector.ifPresent(
136+
dv -> {
137+
Row dvRow = dv.toRow();
138+
fieldMap.put(FULL_SCHEMA.indexOf("deletionVector"), dvRow);
139+
});
137140
return new GenericRow(FULL_SCHEMA, fieldMap);
138141
}
139142

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DeletionVectorDescriptor.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import io.delta.kernel.data.ColumnVector;
2424
import io.delta.kernel.data.Row;
25+
import io.delta.kernel.internal.data.GenericRow;
2526
import io.delta.kernel.internal.deletionvectors.Base85Codec;
2627
import io.delta.kernel.internal.fs.Path;
2728
import io.delta.kernel.types.IntegerType;
@@ -30,10 +31,7 @@
3031
import io.delta.kernel.types.StructType;
3132
import java.net.URI;
3233
import java.net.URISyntaxException;
33-
import java.util.Map;
34-
import java.util.Objects;
35-
import java.util.Optional;
36-
import java.util.UUID;
34+
import java.util.*;
3735
import java.util.stream.IntStream;
3836

3937
/** Information about a deletion vector attached to a file action. */
@@ -237,6 +235,25 @@ public String toString() {
237235
storageType, pathOrInlineDv, offset, sizeInBytes, cardinality);
238236
}
239237

238+
/** @return Row representation of this deletion vector descriptor */
239+
public Row toRow() {
240+
Map<Integer, Object> fieldMap = new HashMap<>();
241+
242+
fieldMap.put(COL_NAME_TO_ORDINAL.get("storageType"), storageType);
243+
fieldMap.put(COL_NAME_TO_ORDINAL.get("pathOrInlineDv"), pathOrInlineDv);
244+
245+
// Only add offset if it's present
246+
if (offset.isPresent()) {
247+
fieldMap.put(COL_NAME_TO_ORDINAL.get("offset"), offset.get());
248+
}
249+
// If offset is not present, the field remains null in the map
250+
251+
fieldMap.put(COL_NAME_TO_ORDINAL.get("sizeInBytes"), sizeInBytes);
252+
fieldMap.put(COL_NAME_TO_ORDINAL.get("cardinality"), cardinality);
253+
254+
return new GenericRow(READ_SCHEMA, fieldMap);
255+
}
256+
240257
@Override
241258
public boolean equals(Object o) {
242259
if (o == this) {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/GenerateIcebergCompatActionUtils.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public static Row generateIcebergCompatWriterV1AddAction(
9797
dataChange,
9898
tags,
9999
Optional.empty() /* baseRowId */,
100-
Optional.empty() /* defaultRowCommitVersion */);
100+
Optional.empty() /* defaultRowCommitVersion */,
101+
Optional.empty() /* deletionVectorDescriptor */);
101102
return SingleAction.createAddFileSingleAction(addFile.toRow());
102103
}
103104

@@ -119,6 +120,7 @@ public static Row generateIcebergCompatWriterV1AddAction(
119120
* @param partitionValues the partition values for the add
120121
* @param dataChange whether or not the add constitutes a dataChange (i.e. append vs. compaction)
121122
* @param tags key-value metadata to be attached to the add action
123+
* @param deletionVectorDescriptor optional deletion vector descriptor for the add action
122124
* @return add action row that can be included in the transaction
123125
* @throws UnsupportedOperationException if icebergWriterCompatV3 is not enabled
124126
* @throws UnsupportedOperationException if maxRetries != 0 in the transaction
@@ -132,13 +134,17 @@ public static Row generateIcebergCompatWriterV3AddAction(
132134
boolean dataChange,
133135
Map<String, String> tags,
134136
Optional<Long> baseRowId,
135-
Optional<Long> defaultRowCommitVersion) {
137+
Optional<Long> defaultRowCommitVersion,
138+
Optional<DeletionVectorDescriptor> deletionVectorDescriptor) {
136139
Map<String, String> configuration = TransactionStateRow.getConfiguration(transactionState);
137140

138141
/* ----- Validate that this is a valid usage of this API ----- */
139142
validateIcebergWriterCompatV3Enabled(configuration);
140143
validateMaxRetriesSetToZero(transactionState);
141144

145+
/* ----- Validate that deletion vector is passed in only when the table supports it ----- */
146+
deletionVectorDescriptor.ifPresent(dv -> validateIcebergDeletionVectorsEnabled(configuration));
147+
142148
/* ----- Validate this is valid write given the table's protocol & configurations ----- */
143149
checkState(
144150
TableConfig.ICEBERG_COMPAT_V3_ENABLED.fromMetadata(configuration),
@@ -161,7 +167,8 @@ public static Row generateIcebergCompatWriterV3AddAction(
161167
dataChange,
162168
tags,
163169
baseRowId,
164-
defaultRowCommitVersion);
170+
defaultRowCommitVersion,
171+
deletionVectorDescriptor);
165172
return SingleAction.createAddFileSingleAction(addFile.toRow());
166173
}
167174

@@ -210,7 +217,8 @@ public static Row generateIcebergCompatWriterV1RemoveAction(
210217
partitionValues,
211218
dataChange,
212219
Optional.empty() /* baseRowId */,
213-
Optional.empty() /* defaultRowCommitVersion */);
220+
Optional.empty() /* defaultRowCommitVersion */,
221+
Optional.empty() /* deletionVectorDescriptor */);
214222
return SingleAction.createRemoveFileSingleAction(removeFileRow);
215223
}
216224

@@ -224,6 +232,7 @@ public static Row generateIcebergCompatWriterV1RemoveAction(
224232
* @param partitionValues the partition values for the remove
225233
* @param dataChange whether or not the remove constitutes a dataChange (i.e. delete vs.
226234
* compaction)
235+
* @param deletionVectorDescriptor optional deletion vector descriptor for the add action
227236
* @return remove action row that can be committed to the transaction
228237
* @throws UnsupportedOperationException if icebergWriterCompatV3 is not enabled
229238
* @throws UnsupportedOperationException if maxRetries != 0 in the transaction
@@ -236,13 +245,17 @@ public static Row generateIcebergCompatWriterV3RemoveAction(
236245
Map<String, Literal> partitionValues,
237246
boolean dataChange,
238247
Optional<Long> baseRowId,
239-
Optional<Long> defaultRowCommitVersion) {
248+
Optional<Long> defaultRowCommitVersion,
249+
Optional<DeletionVectorDescriptor> deletionVectorDescriptor) {
240250
Map<String, String> config = TransactionStateRow.getConfiguration(transactionState);
241251

242252
/* ----- Validate that this is a valid usage of this API ----- */
243253
validateIcebergWriterCompatV3Enabled(config);
244254
validateMaxRetriesSetToZero(transactionState);
245255

256+
/* ----- Validate that deletion vector is passed in only when the table supports it ----- */
257+
deletionVectorDescriptor.ifPresent(dv -> validateIcebergDeletionVectorsEnabled(config));
258+
246259
/* ----- Validate this is valid write given the table's protocol & configurations ----- */
247260
// We only allow removes with dataChange=false when appendOnly=true
248261
if (dataChange && TableConfig.APPEND_ONLY_ENABLED.fromMetadata(config)) {
@@ -268,7 +281,8 @@ public static Row generateIcebergCompatWriterV3RemoveAction(
268281
partitionValues,
269282
dataChange,
270283
Optional.empty() /* baseRowId */,
271-
Optional.empty() /* defaultRowCommitVersion */);
284+
Optional.empty() /* defaultRowCommitVersion */,
285+
deletionVectorDescriptor);
272286
return SingleAction.createRemoveFileSingleAction(removeFileRow);
273287
}
274288

@@ -307,6 +321,20 @@ private static void validateIcebergWriterCompatV3Enabled(Map<String, String> con
307321
}
308322
}
309323

324+
/**
325+
* Validates that table feature `deletion vectors` is enabled. Checked when a deletion vector
326+
* descriptor is passed to generateIcebergCompatWriterV3AddAction.
327+
*/
328+
private static void validateIcebergDeletionVectorsEnabled(Map<String, String> config) {
329+
if (!TableConfig.DELETION_VECTORS_CREATION_ENABLED.fromMetadata(config)) {
330+
throw new UnsupportedOperationException(
331+
String.format(
332+
"APIs within GenerateIcebergCompatActionUtils are only supported on tables with"
333+
+ " '%s' set to true",
334+
TableConfig.DELETION_VECTORS_CREATION_ENABLED.getKey()));
335+
}
336+
}
337+
310338
/**
311339
* Throws an exception if `maxRetries` was not set to 0 in the transaction. We restrict these APIs
312340
* to require `maxRetries = 0` since conflict resolution is not supported for operations other
@@ -359,7 +387,8 @@ public static Row convertRemoveDataFileStatus(
359387
Map<String, Literal> partitionValues,
360388
boolean dataChange,
361389
Optional<Long> baseRowId,
362-
Optional<Long> defaultRowCommitVersion) {
390+
Optional<Long> defaultRowCommitVersion,
391+
Optional<DeletionVectorDescriptor> deletionVectorDescriptor) {
363392
return createRemoveFileRowWithExtendedFileMetadata(
364393
relativizePath(new Path(dataFileStatus.getPath()), tableRoot).toUri().toString(),
365394
dataFileStatus.getModificationTime(),
@@ -369,7 +398,8 @@ public static Row convertRemoveDataFileStatus(
369398
dataFileStatus.getStatistics(),
370399
physicalSchema,
371400
baseRowId,
372-
defaultRowCommitVersion);
401+
defaultRowCommitVersion,
402+
deletionVectorDescriptor);
373403
}
374404

375405
@VisibleForTesting
@@ -382,7 +412,8 @@ public static Row createRemoveFileRowWithExtendedFileMetadata(
382412
Optional<DataFileStatistics> stats,
383413
StructType physicalSchema,
384414
Optional<Long> baseRowId,
385-
Optional<Long> defaultRowCommitVersion) {
415+
Optional<Long> defaultRowCommitVersion,
416+
Optional<DeletionVectorDescriptor> deletionVector) {
386417
Map<Integer, Object> fieldMap = new HashMap<>();
387418
fieldMap.put(RemoveFile.FULL_SCHEMA.indexOf("path"), requireNonNull(path));
388419
fieldMap.put(RemoveFile.FULL_SCHEMA.indexOf("deletionTimestamp"), deletionTimestamp);
@@ -399,6 +430,11 @@ public static Row createRemoveFileRowWithExtendedFileMetadata(
399430
defaultRowCommitVersion.ifPresent(
400431
version ->
401432
fieldMap.put(RemoveFile.FULL_SCHEMA.indexOf("defaultRowCommitVersion"), version));
433+
deletionVector.ifPresent(
434+
dv -> {
435+
Row dvRow = dv.toRow();
436+
fieldMap.put(RemoveFile.FULL_SCHEMA.indexOf("deletionVector"), dvRow);
437+
});
402438
return new GenericRow(RemoveFile.FULL_SCHEMA, fieldMap);
403439
}
404440
}

0 commit comments

Comments
 (0)