Skip to content

Commit bd6c5e0

Browse files
committed
fix
1 parent f2db0c2 commit bd6c5e0

File tree

3 files changed

+6
-6
lines changed

3 files changed

+6
-6
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketRowWriteOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public void processElement(StreamRecord<Tuple2<InternalRow, Integer>> element)
6060
@Override
6161
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
6262
throws IOException {
63-
return super.prepareCommit(waitCompaction, checkpointId);
63+
List<Committable> committables = super.prepareCommit(waitCompaction, checkpointId);
64+
tryRefreshWrite();
65+
return committables;
6466
}
6567

6668
/** {@link StreamOperatorFactory} of {@link DynamicBucketRowWriteOperator}. */

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ protected List<Committable> prepareCommit(boolean waitCompaction, long checkpoin
220220
Committable.Kind.LOG_OFFSET,
221221
new LogOffsetCommittable(k, v))));
222222
}
223-
223+
tryRefreshWrite();
224224
return committables;
225225
}
226226

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,17 +159,15 @@ public void close() throws Exception {
159159
@Override
160160
protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId)
161161
throws IOException {
162-
List<Committable> committables = write.prepareCommit(waitCompaction, checkpointId);
163-
tryRefreshWrite();
164-
return committables;
162+
return write.prepareCommit(waitCompaction, checkpointId);
165163
}
166164

167165
@VisibleForTesting
168166
public StoreSinkWrite getWrite() {
169167
return write;
170168
}
171169

172-
private void tryRefreshWrite() {
170+
protected void tryRefreshWrite() {
173171
if (writeRefresher != null) {
174172
writeRefresher.tryRefresh();
175173
table = writeRefresher.updatedTable();

0 commit comments

Comments
 (0)