Skip to content

Commit f38341b

Browse files
authored
[FLINK-36541][pipeline-connector][paimon] Pass checkpointId to StoreSinkWrite#prepareCommit correctly
This closes #3652.
1 parent 8b8330e commit f38341b

File tree

3 files changed

+37
-5
lines changed

3 files changed

+37
-5
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.connector.sink2.Committer;
2222
import org.apache.flink.api.connector.sink2.Sink;
2323
import org.apache.flink.core.io.SimpleVersionedSerializer;
24+
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
2425
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
2526
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
2627
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
@@ -61,7 +62,11 @@ public PaimonSink(
6162

6263
@Override
6364
public PaimonWriter<InputT> createWriter(InitContext context) {
64-
return new PaimonWriter<>(catalogOptions, context.metricGroup(), commitUser, serializer);
65+
long lastCheckpointId =
66+
context.getRestoredCheckpointId()
67+
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
68+
return new PaimonWriter<>(
69+
catalogOptions, context.metricGroup(), commitUser, serializer, lastCheckpointId);
6570
}
6671

6772
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.metrics.MetricGroup;
2525
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
2626
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
27+
import org.apache.flink.streaming.api.operators.StreamOperator;
2728

2829
import org.apache.paimon.catalog.Catalog;
2930
import org.apache.paimon.catalog.Identifier;
@@ -68,11 +69,15 @@ public class PaimonWriter<InputT>
6869
private final MetricGroup metricGroup;
6970
private final List<MultiTableCommittable> committables;
7071

72+
/** A workaround variable trace the checkpointId in {@link StreamOperator#snapshotState}. */
73+
private long lastCheckpointId;
74+
7175
public PaimonWriter(
7276
Options catalogOptions,
7377
MetricGroup metricGroup,
7478
String commitUser,
75-
PaimonRecordSerializer<InputT> serializer) {
79+
PaimonRecordSerializer<InputT> serializer,
80+
long lastCheckpointId) {
7681
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
7782
this.metricGroup = metricGroup;
7883
this.commitUser = commitUser;
@@ -85,12 +90,14 @@ public PaimonWriter(
8590
new ExecutorThreadFactory(
8691
Thread.currentThread().getName() + "-CdcMultiWrite-Compaction"));
8792
this.serializer = serializer;
93+
this.lastCheckpointId = lastCheckpointId;
8894
}
8995

9096
@Override
9197
public Collection<MultiTableCommittable> prepareCommit() {
9298
Collection<MultiTableCommittable> allCommittables = new ArrayList<>(committables);
9399
committables.clear();
100+
lastCheckpointId++;
94101
return allCommittables;
95102
}
96103

@@ -156,6 +163,9 @@ private FileStoreTable getTable(Identifier tableId) {
156163
* Called on checkpoint or end of input so that the writer to flush all pending data for
157164
* at-least-once.
158165
*
166+
* <p>Execution order: flush(boolean endOfInput)=>prepareCommit()=>snapshotState(long
167+
* checkpointId).
168+
*
159169
* <p>this method will also be called when receiving {@link FlushEvent}, but we don't need to
160170
* commit the MultiTableCommittables immediately in this case, because {@link PaimonCommitter}
161171
* support committing data of different schemas.
@@ -166,10 +176,14 @@ public void flush(boolean endOfInput) throws IOException {
166176
Identifier key = entry.getKey();
167177
StoreSinkWrite write = entry.getValue();
168178
boolean waitCompaction = false;
169-
// checkpointId will be updated correctly by PreCommitOperator.
170-
long checkpointId = 1L;
171179
committables.addAll(
172-
write.prepareCommit(waitCompaction, checkpointId).stream()
180+
// here we set it to lastCheckpointId+1 to
181+
// avoid prepareCommit the same checkpointId with the first round.
182+
// One thing to note is that during schema evolution, flush and checkpoint are
183+
// consistent,
184+
// but as long as there is data coming in, it will not trigger any conflict
185+
// issues
186+
write.prepareCommit(waitCompaction, lastCheckpointId + 1).stream()
173187
.map(
174188
committable ->
175189
MultiTableCommittable.fromCommittable(key, committable))

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,19 @@ public void testSinkWithDataChange(String metastore)
258258
.forEachRemaining(result::add);
259259
Assertions.assertEquals(
260260
Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "x")), result);
261+
262+
result = new ArrayList<>();
263+
tEnv.sqlQuery("select max_sequence_number from paimon_catalog.test.`table1$files`")
264+
.execute()
265+
.collect()
266+
.forEachRemaining(result::add);
267+
// Each commit will generate one sequence number(equal to checkpointId).
268+
Assertions.assertEquals(
269+
Arrays.asList(
270+
Row.ofKind(RowKind.INSERT, 1L),
271+
Row.ofKind(RowKind.INSERT, 2L),
272+
Row.ofKind(RowKind.INSERT, 3L)),
273+
result);
261274
}
262275

263276
@ParameterizedTest

0 commit comments

Comments
 (0)