Skip to content

Commit f3447e5

Browse files
committed
[flink] not getting commitUser from state
1 parent c01c4c3 commit f3447e5

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ protected StoreSinkWriteState createState(
5353
return new NoopStoreSinkWriteState(subtaskId);
5454
}
5555

56+
@Override
57+
protected String getCommitUser(StateInitializationContext context) throws Exception {
58+
// No conflicts will occur in append only unaware bucket writer, so
59+
// commitUser does not matter.
60+
return commitUser == null ? initialCommitUser : commitUser;
61+
}
62+
5663
@Override
5764
public void processElement(StreamRecord<CdcRecord> element) throws Exception {
5865
// only accepts INSERT record

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ protected StoreSinkWriteState createState(
9393
// is needed.
9494
return new NoopStoreSinkWriteState(subtaskId);
9595
}
96+
97+
@Override
98+
protected String getCommitUser(StateInitializationContext context)
99+
throws Exception {
100+
// No conflicts will occur in append only unaware bucket writer, so
101+
// commitUser does not matter.
102+
return commitUser == null ? initialCommitUser : commitUser;
103+
}
96104
};
97105
}
98106
};

0 commit comments

Comments
 (0)