Skip to content

Commit 0466c19

Browse files
committed
address comment.
1 parent 29c064a commit 0466c19

File tree

1 file changed

+2
-32
lines changed

1 file changed

+2
-32
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.flink.cdc.connectors.paimon.sink.v2;
1919

20-
import org.apache.flink.api.common.state.OperatorStateStore;
2120
import org.apache.flink.api.connector.sink2.Committer;
22-
import org.apache.flink.metrics.groups.OperatorMetricGroup;
2321

2422
import org.apache.paimon.flink.FlinkCatalogFactory;
2523
import org.apache.paimon.flink.sink.MultiTableCommittable;
@@ -29,8 +27,6 @@
2927
import org.slf4j.Logger;
3028
import org.slf4j.LoggerFactory;
3129

32-
import javax.annotation.Nullable;
33-
3430
import java.util.Collection;
3531
import java.util.Collections;
3632
import java.util.List;
@@ -48,34 +44,8 @@ public PaimonCommitter(Options catalogOptions, String commitUser) {
4844
storeMultiCommitter =
4945
new StoreMultiCommitter(
5046
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions),
51-
new org.apache.paimon.flink.sink.Committer.Context() {
52-
53-
@Override
54-
public String commitUser() {
55-
return commitUser;
56-
}
57-
58-
@Nullable
59-
@Override
60-
public OperatorMetricGroup metricGroup() {
61-
return null;
62-
}
63-
64-
@Override
65-
public boolean streamingCheckpointEnabled() {
66-
return false;
67-
}
68-
69-
@Override
70-
public boolean isRestored() {
71-
return false;
72-
}
73-
74-
@Override
75-
public OperatorStateStore stateStore() {
76-
return null;
77-
}
78-
});
47+
org.apache.paimon.flink.sink.Committer.createContext(
48+
commitUser, null, true, false, null));
7949
}
8050

8151
@Override

0 commit comments

Comments
 (0)