Skip to content

Commit 4e81f78

Browse files
committed
naming changes; key on snapshot ID instead of ordinal ID
1 parent eb23828 commit 4e81f78

File tree

4 files changed

+92
-65
lines changed

4 files changed

+92
-65
lines changed

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ public class ChangelogScanner
6161
private static final Counter numDeletedDataFileScanTasks =
6262
Metrics.counter(ChangelogScanner.class, "numDeletedDataFileScanTasks");
6363
public static final TupleTag<KV<ChangelogDescriptor, List<SerializableChangelogTask>>>
64-
UNIFORM_CHANGES = new TupleTag<>();
64+
UNIDIRECTIONAL_CHANGES = new TupleTag<>();
6565
public static final TupleTag<KV<ChangelogDescriptor, List<SerializableChangelogTask>>>
66-
MIXED_CHANGES = new TupleTag<>();
66+
BIDIRECTIONAL_CHANGES = new TupleTag<>();
6767
public static final KvCoder<ChangelogDescriptor, List<SerializableChangelogTask>> OUTPUT_CODER =
6868
KvCoder.of(ChangelogDescriptor.coder(), ListCoder.of(SerializableChangelogTask.coder()));
6969
private final IcebergScanConfig scanConfig;
@@ -117,7 +117,7 @@ private void createAndOutputReadTasks(
117117
Map<Long, Long> cachedSnapshotTimestamps = new HashMap<>();
118118
// Maintain the same scan task groupings produced by Iceberg's binpacking, for
119119
// better work load distribution among readers.
120-
// Also allows the user to control by setting a `read.split.target-size`:
120+
// This allows the user to control load per worker by tuning `read.split.target-size`:
121121
// https://iceberg.apache.org/docs/latest/configuration/#read-properties
122122
Map<Integer, List<List<SerializableChangelogTask>>> changelogScanTaskGroups = new HashMap<>();
123123

@@ -126,7 +126,7 @@ private void createAndOutputReadTasks(
126126

127127
try (CloseableIterable<ScanTaskGroup<ChangelogScanTask>> scanTaskGroups = scan.planTasks()) {
128128
for (ScanTaskGroup<ChangelogScanTask> scanTaskGroup : scanTaskGroups) {
129-
Map<Integer, List<SerializableChangelogTask>> ordinalGroups = new HashMap<>();
129+
Map<Integer, List<SerializableChangelogTask>> ordinalTaskGroup = new HashMap<>();
130130

131131
for (ChangelogScanTask changelogScanTask : scanTaskGroup.tasks()) {
132132
long snapshotId = changelogScanTask.commitSnapshotId();
@@ -137,7 +137,7 @@ private void createAndOutputReadTasks(
137137

138138
SerializableChangelogTask task =
139139
SerializableChangelogTask.from(changelogScanTask, timestampMillis);
140-
ordinalGroups.computeIfAbsent(ordinal, (unused) -> new ArrayList<>()).add(task);
140+
ordinalTaskGroup.computeIfAbsent(ordinal, (o) -> new ArrayList<>()).add(task);
141141

142142
changeTypesPerOrdinal
143143
.computeIfAbsent(ordinal, (o) -> new HashSet<>())
@@ -158,7 +158,7 @@ private void createAndOutputReadTasks(
158158
}
159159

160160
for (Map.Entry<Integer, List<SerializableChangelogTask>> ordinalGroup :
161-
ordinalGroups.entrySet()) {
161+
ordinalTaskGroup.entrySet()) {
162162
changelogScanTaskGroups
163163
.computeIfAbsent(ordinalGroup.getKey(), (unused) -> new ArrayList<>())
164164
.add(ordinalGroup.getValue());
@@ -198,17 +198,20 @@ private void createAndOutputReadTasks(
198198
KV.of(descriptor, subgroup);
199199

200200
// Determine where each ordinal's tasks will go, based on the type of changes:
201-
// 1. If an ordinal's changes are uniform (i.e. all inserts or all deletes), they should be
201+
// 1. If an ordinal's changes are unidirectional (i.e. only inserts or only deletes), they
202+
// should be
202203
// processed directly in the fast path.
203-
// 2. If an ordinal's changes are mixed (i.e. some inserts and some deletes), they will need
204+
// 2. If an ordinal's changes are bidirectional (i.e. both inserts and deletes), they will
205+
// need
204206
// more careful processing to determine if any updates have occurred.
205207
Set<SerializableChangelogTask.Type> changeTypes =
206208
checkStateNotNull(changeTypesPerOrdinal.get(ordinal));
207209
TupleTag<KV<ChangelogDescriptor, List<SerializableChangelogTask>>> outputTag;
208-
if (changeTypes.contains(ADDED_ROWS) && changeTypes.size() > 1) { // added and deleted rows
209-
outputTag = MIXED_CHANGES;
210-
} else { // all added or all deleted rows
211-
outputTag = UNIFORM_CHANGES;
210+
if (changeTypes.contains(ADDED_ROWS)
211+
&& changeTypes.size() > 1) { // both added and deleted rows
212+
outputTag = BIDIRECTIONAL_CHANGES;
213+
} else { // only added or only deleted rows
214+
outputTag = UNIDIRECTIONAL_CHANGES;
212215
}
213216

214217
multiOutputReceiver.get(outputTag).outputWithTimestamp(output, timestamp);

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/IncrementalChangelogSource.java

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.io.iceberg.cdc;
1919

20-
import static org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.MIXED_CHANGES;
21-
import static org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.UNIFORM_CHANGES;
20+
import static org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.BIDIRECTIONAL_CHANGES;
21+
import static org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.UNIDIRECTIONAL_CHANGES;
2222
import static org.apache.beam.sdk.io.iceberg.cdc.ReadFromChangelogs.KEYED_DELETES;
2323
import static org.apache.beam.sdk.io.iceberg.cdc.ReadFromChangelogs.KEYED_INSERTS;
2424
import static org.apache.beam.sdk.io.iceberg.cdc.ReadFromChangelogs.UNIFORM_ROWS;
@@ -77,25 +77,28 @@ public PCollection<Row> expand(PBegin input) {
7777
.apply(
7878
"Create Changelog Tasks",
7979
ParDo.of(new ChangelogScanner(scanConfig))
80-
.withOutputTags(UNIFORM_CHANGES, TupleTagList.of(MIXED_CHANGES)));
80+
.withOutputTags(
81+
UNIDIRECTIONAL_CHANGES, TupleTagList.of(BIDIRECTIONAL_CHANGES)));
8182

8283
// for changelog ordinal groups that have UNIFORM changes (i.e. all deletes, or all inserts),
8384
// take the fast approach of just reading and emitting CDC records.
84-
PCollection<Row> fastPathCdcRows =
85-
processUniformChanges(
86-
changelogTasks.get(UNIFORM_CHANGES).setCoder(ChangelogScanner.OUTPUT_CODER));
85+
PCollection<Row> uniDirectionalCdcRows =
86+
processUniDirectionalChanges(
87+
changelogTasks.get(UNIDIRECTIONAL_CHANGES).setCoder(ChangelogScanner.OUTPUT_CODER));
8788

88-
// changelog ordinal groups that have MIXED changes (i.e. some deletes and some inserts)
89-
// will need extra processing to identify any updates
90-
PCollection<Row> slowPathCdcRows =
91-
processMixedChanges(
92-
changelogTasks.get(MIXED_CHANGES).setCoder(ChangelogScanner.OUTPUT_CODER));
89+
// changelog ordinal groups that have BIDIRECTIONAL changes (i.e. both deletes and inserts)
90+
// will need extra processing (including a shuffle) to identify any updates
91+
PCollection<Row> largeBiDirectionalCdcRows =
92+
processLargeBiDirectionalChanges(
93+
changelogTasks.get(BIDIRECTIONAL_CHANGES).setCoder(ChangelogScanner.OUTPUT_CODER));
9394

94-
// Merge UNIFORM and MIXED outputs
95-
return PCollectionList.of(fastPathCdcRows).and(slowPathCdcRows).apply(Flatten.pCollections());
95+
// Merge UNIDIRECTIONAL and BIDIRECTIONAL outputs
96+
return PCollectionList.of(uniDirectionalCdcRows)
97+
.and(largeBiDirectionalCdcRows)
98+
.apply(Flatten.pCollections());
9699
}
97100

98-
private PCollection<Row> processUniformChanges(
101+
private PCollection<Row> processUniDirectionalChanges(
99102
PCollection<KV<ChangelogDescriptor, List<SerializableChangelogTask>>> uniformChangelogs) {
100103
return uniformChangelogs
101104
.apply(Redistribute.arbitrarily())
@@ -107,41 +110,47 @@ private PCollection<Row> processUniformChanges(
107110
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema()));
108111
}
109112

110-
private PCollection<Row> processMixedChanges(
111-
PCollection<KV<ChangelogDescriptor, List<SerializableChangelogTask>>> mixedChangelogs) {
112-
PCollectionTuple mixedCdcKeyedRows =
113-
mixedChangelogs
113+
private PCollection<Row> processLargeBiDirectionalChanges(
114+
PCollection<KV<ChangelogDescriptor, List<SerializableChangelogTask>>>
115+
biDirectionalChangelogs) {
116+
PCollectionTuple biDirectionalKeyedRows =
117+
biDirectionalChangelogs
114118
.apply(Redistribute.arbitrarily())
115119
.apply(
116-
"Read Mixed Changes",
120+
"Read Large BiDirectional Changes",
117121
ParDo.of(ReadFromChangelogs.withKeyedOutput(scanConfig))
118122
.withOutputTags(KEYED_INSERTS, TupleTagList.of(KEYED_DELETES)));
119123

120124
// prior to CoGBK, set a windowing strategy to maintain the earliest timestamp in the window
125+
// this allows us to emit records downstream that may have larger reified timestamps
121126
Window<KV<Row, TimestampedValue<Row>>> windowingStrategy =
122127
Window.<KV<Row, TimestampedValue<Row>>>into(new GlobalWindows())
123128
.withTimestampCombiner(TimestampCombiner.EARLIEST);
124129

125130
// preserve the element's timestamp by moving it into the value
131+
// in the normal case, this will be a no-op because all CDC rows in an ordinal have the same
132+
// commit timestamp.
133+
// but this will matter if we add custom watermarking, where record timestamps are
134+
// derived from a specified column
126135
KvCoder<Row, Row> keyedOutputCoder = ReadFromChangelogs.keyedOutputCoder(scanConfig);
127136
PCollection<KV<Row, TimestampedValue<Row>>> keyedInsertsWithTimestamps =
128-
mixedCdcKeyedRows
137+
biDirectionalKeyedRows
129138
.get(KEYED_INSERTS)
130139
.setCoder(keyedOutputCoder)
131-
.apply(Reify.timestampsInValue())
140+
.apply("Reify INSERT Timestamps", Reify.timestampsInValue())
132141
.apply(windowingStrategy);
133142
PCollection<KV<Row, TimestampedValue<Row>>> keyedDeletesWithTimestamps =
134-
mixedCdcKeyedRows
143+
biDirectionalKeyedRows
135144
.get(KEYED_DELETES)
136145
.setCoder(keyedOutputCoder)
137-
.apply(Reify.timestampsInValue())
146+
.apply("Reify DELETE Timestamps", Reify.timestampsInValue())
138147
.apply(windowingStrategy);
139148

140149
// CoGroup by record ID and emit any (DELETE + INSERT) pairs as updates: (UPDATE_BEFORE,
141150
// UPDATE_AFTER)
142151
return KeyedPCollectionTuple.of(INSERTS, keyedInsertsWithTimestamps)
143152
.and(DELETES, keyedDeletesWithTimestamps)
144-
.apply(CoGroupByKey.create())
153+
.apply("CoGroupBy Row ID", CoGroupByKey.create())
145154
.apply("Reconcile Inserts and Deletes", ParDo.of(new ReconcileChanges()))
146155
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema()));
147156
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReadFromChangelogs.java

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ public class ReadFromChangelogs<OutT>
6666
private transient StructProjection recordIdProjection;
6767
private transient org.apache.iceberg.Schema recordIdSchema;
6868
private final Schema beamRowSchema;
69-
private final Schema rowIdWithOrdinalBeamSchema;
70-
private static final String ORDINAL_FIELD = "__beam__changelog__ordinal__";
69+
private final Schema rowAndSnapshotIDBeamSchema;
70+
private static final String SNAPSHOT_FIELD = "__beam__changelog__snapshot__id__";
7171

7272
private ReadFromChangelogs(IcebergScanConfig scanConfig, boolean keyedOutput) {
7373
this.scanConfig = scanConfig;
@@ -78,13 +78,7 @@ private ReadFromChangelogs(IcebergScanConfig scanConfig, boolean keyedOutput) {
7878
this.recordIdSchema = recordSchema.select(recordSchema.identifierFieldNames());
7979
this.recordIdProjection = StructProjection.create(recordSchema, recordIdSchema);
8080

81-
Schema rowIdBeamSchema = icebergSchemaToBeamSchema(recordIdSchema);
82-
List<Schema.Field> fields =
83-
ImmutableList.<Schema.Field>builder()
84-
.add(Schema.Field.of(ORDINAL_FIELD, Schema.FieldType.INT32))
85-
.addAll(rowIdBeamSchema.getFields())
86-
.build();
87-
this.rowIdWithOrdinalBeamSchema = new Schema(fields);
81+
this.rowAndSnapshotIDBeamSchema = rowAndSnapshotIDBeamSchema(scanConfig);
8882
}
8983

9084
static ReadFromChangelogs<Row> of(IcebergScanConfig scanConfig) {
@@ -100,19 +94,24 @@ static ReadFromChangelogs<KV<Row, Row>> withKeyedOutput(IcebergScanConfig scanCo
10094
* schema's identifier fields.
10195
*/
10296
static KvCoder<Row, Row> keyedOutputCoder(IcebergScanConfig scanConfig) {
97+
org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema();
98+
Schema rowAndSnapshotIDBeamSchema = rowAndSnapshotIDBeamSchema(scanConfig);
99+
return KvCoder.of(
100+
SchemaCoder.of(rowAndSnapshotIDBeamSchema),
101+
SchemaCoder.of(icebergSchemaToBeamSchema(recordSchema)));
102+
}
103+
104+
private static Schema rowAndSnapshotIDBeamSchema(IcebergScanConfig scanConfig) {
103105
org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema();
104106
org.apache.iceberg.Schema recordIdSchema =
105-
recordSchema.select(recordSchema.identifierFieldNames());
107+
recordSchema.select(recordSchema.identifierFieldNames());
106108
Schema rowIdBeamSchema = icebergSchemaToBeamSchema(recordIdSchema);
107109
List<Schema.Field> fields =
108-
ImmutableList.<Schema.Field>builder()
109-
.add(Schema.Field.of(ORDINAL_FIELD, Schema.FieldType.INT32))
110-
.addAll(rowIdBeamSchema.getFields())
111-
.build();
112-
Schema rowIdWithOrdinalBeamSchema = new Schema(fields);
113-
return KvCoder.of(
114-
SchemaCoder.of(rowIdWithOrdinalBeamSchema),
115-
SchemaCoder.of(icebergSchemaToBeamSchema(recordSchema)));
110+
ImmutableList.<Schema.Field>builder()
111+
.add(Schema.Field.of(SNAPSHOT_FIELD, Schema.FieldType.INT64))
112+
.addAll(rowIdBeamSchema.getFields())
113+
.build();
114+
return new Schema(fields);
116115
}
117116

118117
@Setup
@@ -164,12 +163,16 @@ private void processAddedRowsTask(
164163
try (CloseableIterable<Record> fullIterable = ReadUtils.createReader(task, table, scanConfig)) {
165164
DeleteFilter<Record> deleteFilter =
166165
ReadUtils.genericDeleteFilter(
167-
table, scanConfig, task.getDataFile().getPath(), task.getExistingDeletes());
166+
table, scanConfig, task.getDataFile().getPath(), task.getAddedDeletes());
168167
CloseableIterable<Record> filtered = deleteFilter.filter(fullIterable);
169168

170169
for (Record rec : filtered) {
171170
outputRecord(
172-
rec, outputReceiver, task.getOrdinal(), task.getTimestampMillis(), KEYED_INSERTS);
171+
rec,
172+
outputReceiver,
173+
task.getCommitSnapshotId(),
174+
task.getTimestampMillis(),
175+
KEYED_INSERTS);
173176
}
174177
}
175178
numAddedRowsScanTasksCompleted.inc();
@@ -192,7 +195,11 @@ private void processDeletedRowsTask(
192195
for (Record rec : newlyDeletedRecords) {
193196
// TODO: output with DELETE kind
194197
outputRecord(
195-
rec, outputReceiver, task.getOrdinal(), task.getTimestampMillis(), KEYED_DELETES);
198+
rec,
199+
outputReceiver,
200+
task.getCommitSnapshotId(),
201+
task.getTimestampMillis(),
202+
KEYED_DELETES);
196203
}
197204
}
198205
numDeletedRowsScanTasksCompleted.inc();
@@ -209,7 +216,11 @@ private void processDeletedFileTask(
209216
for (Record rec : filtered) {
210217
// TODO: output with DELETE kind
211218
outputRecord(
212-
rec, outputReceiver, task.getOrdinal(), task.getTimestampMillis(), KEYED_DELETES);
219+
rec,
220+
outputReceiver,
221+
task.getCommitSnapshotId(),
222+
task.getTimestampMillis(),
223+
KEYED_DELETES);
213224
}
214225
}
215226
numDeletedDataFileScanTasksCompleted.inc();
@@ -218,35 +229,38 @@ private void processDeletedFileTask(
218229
private void outputRecord(
219230
Record rec,
220231
MultiOutputReceiver outputReceiver,
221-
int ordinal,
232+
long snapshotId,
222233
long timestampMillis,
223234
TupleTag<KV<Row, Row>> keyedTag) {
224235
Row row = IcebergUtils.icebergRecordToBeamRow(beamRowSchema, rec);
225236
Instant timestamp = Instant.ofEpochMilli(timestampMillis);
226237
if (keyedOutput) { // slow path
227238
StructProjection recId = recordIdProjection.wrap(rec);
228-
// Create a Row ID consisting of record ID columns and the changelog task's ordinal #
229-
Row id = structToBeamRow(ordinal, recId, recordIdSchema, rowIdWithOrdinalBeamSchema);
239+
// Create a Row ID consisting of:
240+
// 1. the task's commit snapshot ID
241+
// 2. the record ID column values
242+
// This is needed to sufficiently distinguish a record change
243+
Row id = structToBeamRow(snapshotId, recId, recordIdSchema, rowAndSnapshotIDBeamSchema);
230244
outputReceiver.get(keyedTag).outputWithTimestamp(KV.of(id, row), timestamp);
231245
} else { // fast path
232-
System.out.printf("[UNIFORM] -- Output(%s, %s)\n%s%n", ordinal, timestamp, row);
246+
System.out.printf("[UNIFORM] -- Output(%s, %s)\n%s%n", snapshotId, timestamp, row);
233247
outputReceiver.get(UNIFORM_ROWS).outputWithTimestamp(row, timestamp);
234248
}
235249
}
236250

237251
public static Row structToBeamRow(
238-
int ordinal, StructLike struct, org.apache.iceberg.Schema schema, Schema beamSchema) {
252+
long snapshotId, StructLike struct, org.apache.iceberg.Schema schema, Schema beamSchema) {
239253
ImmutableMap.Builder<String, Object> values = ImmutableMap.builder();
240254
List<Types.NestedField> columns = schema.columns();
241255
for (Types.NestedField column : columns) {
242256
String name = column.name();
243257
Object value = schema.accessorForField(column.fieldId()).get(struct);
244258
values.put(name, value);
245259
}
246-
// Include ordinal as part of the row ID.
260+
// Include snapshot ID as part of the row ID.
247261
// This is essential to ensure that the downstream ReconcileChanges compares rows
248262
// within the same operation.
249-
values.put(ORDINAL_FIELD, ordinal);
263+
values.put(SNAPSHOT_FIELD, snapshotId);
250264
return Row.withSchema(beamSchema).withFieldValues(values.build()).build();
251265
}
252266

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReconcileChanges.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public void processElement(
3636
@Timestamp Instant timestamp,
3737
OutputReceiver<Row> out) {
3838
CoGbkResult result = element.getValue();
39+
System.out.println("xxx [MIXED] Process timestamp: " + timestamp);
3940

4041
// iterables are lazy-loaded from the shuffle service
4142
Iterable<TimestampedValue<Row>> deletes = result.getAll(DELETES);

0 commit comments

Comments
 (0)