Skip to content

Commit 4c57d8b

Browse files
committed
no partition optimization for snapshots with multiple specs; skip duplicate records that may be produced by copy-on-write
1 parent b059ba7 commit 4c57d8b

File tree

2 files changed

+125
-31
lines changed

2 files changed

+125
-31
lines changed

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ private void gatherPartitionData(
237237
Set<Integer> pinnedSpecs,
238238
Set<Long> snapshotsWithUnpinnedSpecs)
239239
throws IOException {
240+
Map<Long, Set<Integer>> specsInSnapshot = new HashMap<>();
240241
try (CloseableIterable<ScanTaskGroup<ChangelogScanTask>> groups = scan.planTasks()) {
241242
for (ScanTaskGroup<ChangelogScanTask> group : groups) {
242243
for (ChangelogScanTask task : group.tasks()) {
@@ -255,9 +256,16 @@ private void gatherPartitionData(
255256
.computeIfAbsent(snapshotId, (id) -> new HashMap<>())
256257
.computeIfAbsent(partition, (p) -> new HashSet<>())
257258
.add(type);
259+
specsInSnapshot.computeIfAbsent(snapshotId, id -> new HashSet<>()).add(specId);
258260
}
259261
}
260262
}
263+
264+
// snapshots where multiple specs are used are also not safe
265+
specsInSnapshot.entrySet().stream()
266+
.filter(e -> e.getValue().size() > 1)
267+
.map(Map.Entry::getKey)
268+
.forEach(snapshotsWithUnpinnedSpecs::add);
261269
}
262270

263271
private void createAndOutputReadTasks(
@@ -352,7 +360,7 @@ private void createAndOutputReadTasks(
352360
checkStateNotNull(changeTypesPerPartitionPerSnapshot.get(snapshotId))
353361
.get(partition))))) {
354362
// TODO: remove debug printing
355-
System.out.printf("\tUnidirectional task with partition '%s':\n", partition);
363+
System.out.printf("\tBidirectional task with partition '%s':\n", partition);
356364
System.out.printf(
357365
"\t\t(%s) DF: %s\n",
358366
task.getClass().getSimpleName(), name(getDataFile(task).location()));

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ResolveChanges.java

Lines changed: 116 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
*/
1818
package org.apache.beam.sdk.io.iceberg.cdc;
1919

20-
import java.util.Iterator;
20+
import java.util.Comparator;
21+
import java.util.HashSet;
22+
import java.util.List;
23+
import java.util.Set;
24+
import org.apache.beam.sdk.schemas.Schema;
2125
import org.apache.beam.sdk.transforms.DoFn;
2226
import org.apache.beam.sdk.transforms.join.CoGbkResult;
2327
import org.apache.beam.sdk.values.KV;
2428
import org.apache.beam.sdk.values.Row;
2529
import org.apache.beam.sdk.values.TimestampedValue;
2630
import org.apache.beam.sdk.values.TupleTag;
31+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
2732

2833
/**
2934
* Receives inserts and deletes, keyed by snapshot ID and Primary Key, and determines if any updates
@@ -34,62 +39,143 @@
3439
*
3540
* <p>Otherwise, records are output as-is: INSERT as INSERT, and DELETE as DELETE.
3641
*
37-
* <p>Input elements have their timestamp reified. This is because CoGroupByKey assigns all elements
38-
* in a window with the same timestamp, erasing individual record timestamps. This DoFn preserves it
39-
* by outputting records with their reified timestamps.
42+
* <p>Note: snapshots written using the Copy-on-Write method will produce tasks where records appear
43+
* to be deleted then re-inserted. We perform an initial de-duplication and drop these records to
44+
* avoid mistaking them as no-op updates.
45+
*
46+
* <p>Input elements are prepared by reifying their timestamps. This is because CoGroupByKey assigns
47+
* all elements in a window with the same timestamp, erasing individual record timestamps. This DoFn
48+
* preserves it by outputting records with their reified timestamps.
4049
*/
4150
public class ResolveChanges extends DoFn<KV<Row, CoGbkResult>, Row> {
4251
public static final TupleTag<TimestampedValue<Row>> DELETES = new TupleTag<>() {};
4352
public static final TupleTag<TimestampedValue<Row>> INSERTS = new TupleTag<>() {};
4453

4554
@DoFn.ProcessElement
4655
public void processElement(@Element KV<Row, CoGbkResult> element, OutputReceiver<Row> out) {
56+
Set<String> pkFields = new HashSet<>(element.getKey().getSchema().getFieldNames());
4757
CoGbkResult result = element.getValue();
4858

49-
// iterables are lazy-loaded from the shuffle service
50-
Iterable<TimestampedValue<Row>> deletes = result.getAll(DELETES);
51-
Iterable<TimestampedValue<Row>> inserts = result.getAll(INSERTS);
59+
List<TimestampedValue<Row>> deletes = Lists.newArrayList(result.getAll(DELETES));
60+
List<TimestampedValue<Row>> inserts = Lists.newArrayList(result.getAll(INSERTS));
61+
deletes.sort(Comparator.comparing(TimestampedValue::getTimestamp));
62+
inserts.sort(Comparator.comparing(TimestampedValue::getTimestamp));
63+
64+
boolean[] duplicateDeletes = new boolean[deletes.size()];
65+
boolean[] duplicateInserts = new boolean[inserts.size()];
5266

53-
boolean hasDeletes = deletes.iterator().hasNext();
54-
boolean hasInserts = inserts.iterator().hasNext();
67+
boolean hasDeletes = !deletes.isEmpty();
68+
boolean hasInserts = !inserts.isEmpty();
5569

5670
if (hasInserts && hasDeletes) {
5771
// UPDATE: row ID exists in both streams
5872
// - emit all deletes as 'UPDATE_BEFORE', and all inserts as 'UPDATE_AFTER'
59-
// - emit extra inserts as 'UPDATE_AFTER'
60-
// - ignore extra deletes (TODO: double check if this is a good decision)
61-
Iterator<TimestampedValue<Row>> deletesIterator = deletes.iterator();
62-
Iterator<TimestampedValue<Row>> insertsIterator = inserts.iterator();
63-
while (deletesIterator.hasNext() && insertsIterator.hasNext()) {
64-
// TODO: output as UPDATE_BEFORE kind
65-
TimestampedValue<Row> updateBefore = deletesIterator.next();
66-
out.outputWithTimestamp(updateBefore.getValue(), updateBefore.getTimestamp());
67-
System.out.printf("[BIDIRECTIONAL] -- UpdateBefore%n%s%n", updateBefore);
68-
69-
// TODO: output as UPDATE_AFTER kind
70-
TimestampedValue<Row> updateAfter = insertsIterator.next();
71-
out.outputWithTimestamp(updateAfter.getValue(), updateAfter.getTimestamp());
72-
System.out.printf("[BIDIRECTIONAL] -- UpdateAfter%n%s%n", updateAfter);
73+
// - emit extra deletes as 'DELETE'
74+
// - emit extra inserts as 'INSERT'
75+
76+
// First, loop through both deletes and inserts and deduplicate records.
77+
// Duplicates can occur when an Iceberg writer uses CoW method:
78+
// Deletes records by rewriting an entire DataFile except for the few records intended for
79+
// deletion.
80+
// From our perspective, all the records were deleted, and some were inserted back in.
81+
// We must ignore these records and not mistake them for "updates".
82+
for (int d = 0; d < deletes.size(); d++) {
83+
TimestampedValue<Row> delete = deletes.get(d);
84+
85+
for (int i = 0; i < inserts.size(); i++) {
86+
if (duplicateInserts[i]) {
87+
continue;
88+
}
89+
90+
TimestampedValue<Row> insert = inserts.get(i);
91+
if (isDuplicate(pkFields, delete.getValue(), insert.getValue())) {
92+
duplicateDeletes[d] = true;
93+
duplicateInserts[i] = true;
94+
System.out.printf("[DEDUPE] -- Ignored CoW record: %s%n", delete);
95+
break;
96+
}
97+
}
98+
}
99+
100+
// Second, loop through and output UPDATE pairs
101+
int d = 0;
102+
int i = 0;
103+
while (d < deletes.size() && i < inserts.size()) {
104+
// find next unique delete
105+
while (d < deletes.size() && duplicateDeletes[d]) {
106+
d++;
107+
}
108+
// find next unique insert
109+
while (i < inserts.size() && duplicateInserts[i]) {
110+
i++;
111+
}
112+
113+
if (d < deletes.size() && i < inserts.size()) {
114+
// UPDATE pair found. output as UpdateBefore/After
115+
TimestampedValue<Row> updateBefore = deletes.get(d);
116+
TimestampedValue<Row> updateAfter = inserts.get(i);
117+
118+
// TODO: output as UPDATE_BEFORE and UPDATE_AFTER kind
119+
out.outputWithTimestamp(updateBefore.getValue(), updateBefore.getTimestamp());
120+
out.outputWithTimestamp(updateAfter.getValue(), updateAfter.getTimestamp());
121+
System.out.printf(
122+
"[BIDIRECTIONAL] -- UpdateBefore:%n\t%s%n\tUpdateAfter%n\t%s%n",
123+
updateBefore, updateAfter);
124+
125+
d++;
126+
i++;
127+
}
73128
}
74-
while (insertsIterator.hasNext()) {
75-
// TODO: output as UPDATE_AFTER kind
76-
TimestampedValue<Row> insert = insertsIterator.next();
77-
out.outputWithTimestamp(insert.getValue(), insert.getTimestamp());
78-
System.out.printf("[BIDIRECTIONAL] -- Added(extra)%n%s%n", insert);
129+
130+
// Finally, output extra deletes or inserts
131+
while (d < deletes.size()) {
132+
// TODO: output as DELETE kind
133+
if (!duplicateDeletes[d]) {
134+
TimestampedValue<Row> delete = deletes.get(d);
135+
out.outputWithTimestamp(delete.getValue(), delete.getTimestamp());
136+
System.out.printf("[BIDIRECTIONAL] -- Deleted(extra)%n%s%n", delete);
137+
}
138+
d++;
139+
}
140+
while (i < inserts.size()) {
141+
// TODO: output as INSERT kind
142+
if (!duplicateInserts[i]) {
143+
TimestampedValue<Row> insert = inserts.get(i);
144+
out.outputWithTimestamp(insert.getValue(), insert.getTimestamp());
145+
System.out.printf("[BIDIRECTIONAL] -- Inserted(extra)%n%s%n", insert);
146+
}
147+
i++;
79148
}
80149
} else if (hasInserts) {
81150
// INSERT only
82151
for (TimestampedValue<Row> rec : inserts) {
83-
System.out.printf("[UNIDIRECTIONAL] -- Added%n%s%n", rec);
152+
System.out.printf("[BIDIRECTIONAL (only inserts)] -- Added%n%s%n", rec);
84153
out.outputWithTimestamp(rec.getValue(), rec.getTimestamp());
85154
}
86155
} else if (hasDeletes) {
87156
// DELETE only
88157
for (TimestampedValue<Row> rec : deletes) {
89158
// TODO: output as DELETE kind
90-
System.out.printf("[UNIDIRECTIONAL] -- Deleted%n%s%n", rec);
159+
System.out.printf("[BIDIRECTIONAL (only deletes)] -- Deleted%n%s%n", rec);
91160
out.outputWithTimestamp(rec.getValue(), rec.getTimestamp());
92161
}
93162
}
94163
}
164+
165+
/** Compares both records and checks whether they are duplicates or not. */
166+
private static boolean isDuplicate(Set<String> pkFields, Row delete, Row insert) {
167+
Schema schema = insert.getSchema();
168+
for (String field : insert.getSchema().getFieldNames()) {
169+
if (pkFields.contains(field)) {
170+
// these records are grouped by Primary Key, so we already know PK values are equal
171+
continue;
172+
}
173+
// return early if two values are not equal
174+
if (!Row.Equals.deepEquals(
175+
insert.getValue(field), delete.getValue(field), schema.getField(field).getType())) {
176+
return false;
177+
}
178+
}
179+
return true;
180+
}
95181
}

0 commit comments

Comments
 (0)