Skip to content

Commit f304d10

Browse files
committed
partial update
1 parent 49a0ea9 commit f304d10

File tree

35 files changed

+1371
-30
lines changed

35 files changed

+1371
-30
lines changed

.palantir/revapi.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,16 @@ acceptedBreaks:
6565
- code: "java.method.removed"
6666
old: "method void org.apache.iceberg.io.DataWriter<T>::add(T)"
6767
justification: "Removing deprecated method"
68+
"1.1.0":
69+
org.apache.iceberg:iceberg-api:
70+
- code: "java.method.addedToInterface"
71+
new: "method java.util.List<java.lang.Integer> org.apache.iceberg.ContentFile<F>::partialFieldIds()"
72+
justification: "{add new feature}"
73+
org.apache.iceberg:iceberg-data:
74+
- code: "java.method.abstractMethodAdded"
75+
new: "method T org.apache.iceberg.data.DeleteFilter<T>::combineRecord(T, org.apache.iceberg.StructLike,\
76+
\ org.apache.iceberg.Schema, org.apache.iceberg.Schema)"
77+
justification: "{add new feature}"
6878
apache-iceberg-0.14.0:
6979
org.apache.iceberg:iceberg-api:
7080
- code: "java.class.defaultSerializationChanged"

api/src/main/java/org/apache/iceberg/ContentFile.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ public interface ContentFile<F> {
104104
*/
105105
List<Integer> equalityFieldIds();
106106

107+
List<Integer> partialFieldIds();
108+
107109
/**
108110
* Returns the sort order id of this file, which describes how the file is ordered. This
109111
* information will be useful for merging data and equality delete files more efficiently when

api/src/main/java/org/apache/iceberg/DataFile.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,14 @@ public interface DataFile extends ContentFile<DataFile> {
102102
int PARTITION_ID = 102;
103103
String PARTITION_NAME = "partition";
104104
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
105-
// NEXT ID TO ASSIGN: 142
105+
106+
Types.NestedField PARTIAL_IDS =
107+
optional(
108+
142,
109+
"partial_ids",
110+
ListType.ofRequired(143, IntegerType.get()),
111+
"partial comparison field IDs");
112+
// NEXT ID TO ASSIGN: 144
106113

107114
static StructType getType(StructType partitionType) {
108115
// IDs start at 100 to leave room for changes to ManifestEntry
@@ -123,7 +130,8 @@ static StructType getType(StructType partitionType) {
123130
KEY_METADATA,
124131
SPLIT_OFFSETS,
125132
EQUALITY_IDS,
126-
SORT_ORDER_ID);
133+
SORT_ORDER_ID,
134+
PARTIAL_IDS);
127135
}
128136

129137
/** @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */
@@ -136,4 +144,9 @@ default FileContent content() {
136144
default List<Integer> equalityFieldIds() {
137145
return null;
138146
}
147+
148+
@Override
149+
default List<Integer> partialFieldIds() {
150+
return null;
151+
}
139152
}

api/src/main/java/org/apache/iceberg/FileContent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
public enum FileContent {
2323
DATA(0),
2424
POSITION_DELETES(1),
25-
EQUALITY_DELETES(2);
25+
EQUALITY_DELETES(2),
26+
PARTIAL_UPDATE(3);
2627

2728
private final int id;
2829

core/src/main/java/org/apache/iceberg/BaseFile.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public PartitionData copy() {
7373
private Map<Integer, ByteBuffer> upperBounds = null;
7474
private long[] splitOffsets = null;
7575
private int[] equalityIds = null;
76+
private int[] partialIds = null;
7677
private byte[] keyMetadata = null;
7778
private Integer sortOrderId;
7879

@@ -132,6 +133,7 @@ public PartitionData copy() {
132133
Map<Integer, ByteBuffer> upperBounds,
133134
List<Long> splitOffsets,
134135
int[] equalityFieldIds,
136+
int[] partialFieldIds,
135137
Integer sortOrderId,
136138
ByteBuffer keyMetadata) {
137139
this.partitionSpecId = specId;
@@ -159,6 +161,7 @@ public PartitionData copy() {
159161
this.upperBounds = SerializableByteBufferMap.wrap(upperBounds);
160162
this.splitOffsets = ArrayUtil.toLongArray(splitOffsets);
161163
this.equalityIds = equalityFieldIds;
164+
this.partialIds = partialFieldIds;
162165
this.sortOrderId = sortOrderId;
163166
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
164167
}
@@ -207,6 +210,10 @@ public PartitionData copy() {
207210
toCopy.equalityIds != null
208211
? Arrays.copyOf(toCopy.equalityIds, toCopy.equalityIds.length)
209212
: null;
213+
this.partialIds =
214+
toCopy.partialIds != null
215+
? Arrays.copyOf(toCopy.partialIds, toCopy.partialIds.length)
216+
: null;
210217
this.sortOrderId = toCopy.sortOrderId;
211218
}
212219

@@ -294,6 +301,9 @@ public void put(int i, Object value) {
294301
this.sortOrderId = (Integer) value;
295302
return;
296303
case 17:
304+
this.partialIds = ArrayUtil.toIntArray((List<Integer>) value);
305+
return;
306+
case 18:
297307
this.fileOrdinal = (long) value;
298308
return;
299309
default:
@@ -349,6 +359,8 @@ public Object get(int i) {
349359
case 16:
350360
return sortOrderId;
351361
case 17:
362+
return partialFieldIds();
363+
case 18:
352364
return fileOrdinal;
353365
default:
354366
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
@@ -445,6 +457,11 @@ public List<Integer> equalityFieldIds() {
445457
return ArrayUtil.toIntList(equalityIds);
446458
}
447459

460+
@Override
461+
public List<Integer> partialFieldIds() {
462+
return ArrayUtil.toIntList(partialIds);
463+
}
464+
448465
@Override
449466
public Integer sortOrderId() {
450467
return sortOrderId;
@@ -478,6 +495,7 @@ public String toString() {
478495
.add("split_offsets", splitOffsets == null ? "null" : splitOffsets())
479496
.add("equality_ids", equalityIds == null ? "null" : equalityFieldIds())
480497
.add("sort_order_id", sortOrderId)
498+
.add("partial_ids", equalityIds == null ? "null" : partialFieldIds())
481499
.toString();
482500
}
483501
}

core/src/main/java/org/apache/iceberg/DeleteFileIndex.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,80 @@ private static boolean canContainDeletesForFile(
156156

157157
case EQUALITY_DELETES:
158158
return canContainEqDeletesForFile(dataFile, deleteFile, schema);
159+
160+
case PARTIAL_UPDATE:
161+
return canContainPartialDeletesForFile(dataFile, deleteFile, schema);
162+
}
163+
164+
return true;
165+
}
166+
167+
// todo: add actual implementation
168+
private static boolean canContainPartialDeletesForFile(
169+
DataFile dataFile, DeleteFile deleteFile, Schema schema) {
170+
// whether to check data ranges or to assume that the ranges match
171+
// if upper/lower bounds are missing, null counts may still be used to determine delete files
172+
// can be skipped
173+
boolean checkRanges =
174+
dataFile.lowerBounds() != null
175+
&& dataFile.upperBounds() != null
176+
&& deleteFile.lowerBounds() != null
177+
&& deleteFile.upperBounds() != null;
178+
179+
Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
180+
Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
181+
Map<Integer, ByteBuffer> deleteLowers = deleteFile.lowerBounds();
182+
Map<Integer, ByteBuffer> deleteUppers = deleteFile.upperBounds();
183+
184+
Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
185+
Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
186+
Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts();
187+
Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts();
188+
189+
for (int id : deleteFile.equalityFieldIds()) {
190+
Types.NestedField field = schema.findField(id);
191+
if (!field.type().isPrimitiveType()) {
192+
// stats are not kept for nested types. assume that the delete file may match
193+
continue;
194+
}
195+
196+
if (containsNull(dataNullCounts, field) && containsNull(deleteNullCounts, field)) {
197+
// the data has null values and null has been deleted, so the deletes must be applied
198+
continue;
199+
}
200+
201+
if (allNull(dataNullCounts, dataValueCounts, field) && allNonNull(deleteNullCounts, field)) {
202+
// the data file contains only null values for this field, but there are no deletes for null
203+
// values
204+
return false;
205+
}
206+
207+
if (allNull(deleteNullCounts, deleteValueCounts, field)
208+
&& allNonNull(dataNullCounts, field)) {
209+
// the delete file removes only null rows with null for this field, but there are no data
210+
// rows with null
211+
return false;
212+
}
213+
214+
if (!checkRanges) {
215+
// some upper and lower bounds are missing, assume they match
216+
continue;
217+
}
218+
219+
ByteBuffer dataLower = dataLowers.get(id);
220+
ByteBuffer dataUpper = dataUppers.get(id);
221+
ByteBuffer deleteLower = deleteLowers.get(id);
222+
ByteBuffer deleteUpper = deleteUppers.get(id);
223+
if (dataLower == null || dataUpper == null || deleteLower == null || deleteUpper == null) {
224+
// at least one bound is not known, assume the delete file may match
225+
continue;
226+
}
227+
228+
if (!rangesOverlap(
229+
field.type().asPrimitiveType(), dataLower, dataUpper, deleteLower, deleteUpper)) {
230+
// no values overlap between the data file and the deletes
231+
return false;
232+
}
159233
}
160234

161235
return true;
@@ -474,6 +548,22 @@ DeleteFileIndex build() {
474548
globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
475549
globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
476550

551+
// fixme: this will overlap equal deletes
552+
List<Pair<Long, DeleteFile>> partialDeleteSortedBySeq =
553+
deleteFilesByPartition.get(partition).stream()
554+
.filter(entry -> entry.file().content() == FileContent.PARTIAL_UPDATE)
555+
.map(
556+
entry ->
557+
// a delete file is indexed by the sequence number it should be applied to
558+
Pair.of(entry.dataSequenceNumber(), entry.file()))
559+
.sorted(Comparator.comparingLong(Pair::first))
560+
.collect(Collectors.toList());
561+
if (partialDeleteSortedBySeq.size() > 0) {
562+
globalApplySeqs = partialDeleteSortedBySeq.stream().mapToLong(Pair::first).toArray();
563+
globalDeletes =
564+
partialDeleteSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
565+
}
566+
477567
List<Pair<Long, DeleteFile>> posFilesSortedBySeq =
478568
deleteFilesByPartition.get(partition).stream()
479569
.filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)

core/src/main/java/org/apache/iceberg/FileMetadata.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public static class Builder {
4141
private final int specId;
4242
private FileContent content = null;
4343
private int[] equalityFieldIds = null;
44+
private int[] partialFieldIds = null;
4445
private PartitionData partitionData;
4546
private String filePath = null;
4647
private FileFormat format = null;
@@ -116,6 +117,13 @@ public Builder ofEqualityDeletes(int... fieldIds) {
116117
return this;
117118
}
118119

120+
public Builder ofPartialDeletes(int[] newEqualityFieldIds, int[] newPartialFieldIds) {
121+
this.content = FileContent.PARTIAL_UPDATE;
122+
this.equalityFieldIds = newEqualityFieldIds;
123+
this.partialFieldIds = newPartialFieldIds;
124+
return this;
125+
}
126+
119127
public Builder withStatus(FileStatus stat) {
120128
this.filePath = stat.getPath().toString();
121129
this.fileSizeInBytes = stat.getLen();
@@ -222,6 +230,8 @@ public DeleteFile build() {
222230
sortOrderId == null, "Position delete file should not have sort order");
223231
break;
224232
case EQUALITY_DELETES:
233+
234+
case PARTIAL_UPDATE:
225235
if (sortOrderId == null) {
226236
sortOrderId = SortOrder.unsorted().orderId();
227237
}
@@ -246,6 +256,7 @@ public DeleteFile build() {
246256
lowerBounds,
247257
upperBounds),
248258
equalityFieldIds,
259+
partialFieldIds,
249260
sortOrderId,
250261
keyMetadata);
251262
}

core/src/main/java/org/apache/iceberg/GenericDataFile.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
5757
metrics.upperBounds(),
5858
splitOffsets,
5959
null,
60+
null,
6061
sortOrderId,
6162
keyMetadata);
6263
}

core/src/main/java/org/apache/iceberg/GenericDeleteFile.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
3939
long fileSizeInBytes,
4040
Metrics metrics,
4141
int[] equalityFieldIds,
42+
int[] partialFieldIds,
4243
Integer sortOrderId,
4344
ByteBuffer keyMetadata) {
4445
super(
@@ -57,6 +58,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
5758
metrics.upperBounds(),
5859
null,
5960
equalityFieldIds,
61+
partialFieldIds,
6062
sortOrderId,
6163
keyMetadata);
6264
}

core/src/main/java/org/apache/iceberg/SnapshotSummary.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,14 @@ private static class UpdateMetrics {
221221
private int addedPosDeleteFiles = 0;
222222
private int removedPosDeleteFiles = 0;
223223
private int addedDeleteFiles = 0;
224+
private int addedPartialFiles = 0;
224225
private int removedDeleteFiles = 0;
225226
private long addedRecords = 0L;
226227
private long deletedRecords = 0L;
227228
private long addedPosDeletes = 0L;
228229
private long removedPosDeletes = 0L;
229230
private long addedEqDeletes = 0L;
231+
private long addedPartialUpdates = 0L;
230232
private long removedEqDeletes = 0L;
231233
private boolean trustSizeAndDeleteCounts = true;
232234

@@ -290,6 +292,12 @@ void addedFile(ContentFile<?> file) {
290292
this.addedEqDeleteFiles += 1;
291293
this.addedEqDeletes += file.recordCount();
292294
break;
295+
case PARTIAL_UPDATE:
296+
this.addedDeleteFiles += 1;
297+
this.addedPartialFiles += 1;
298+
this.addedPartialUpdates += file.recordCount();
299+
break;
300+
293301
default:
294302
throw new UnsupportedOperationException(
295303
"Unsupported file content type: " + file.content());

0 commit comments

Comments
 (0)