Skip to content

Commit ffd2bdf

Browse files
[FLINK-34014][jdbc] Avoid executeBatch when buffer is empty
1 parent 92e2b5f commit ffd2bdf

File tree

2 files changed

+17
-13
lines changed

2 files changed

+17
-13
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,19 @@ private boolean changeFlag(RowKind rowKind) {
8686

8787
@Override
8888
public void executeBatch() throws SQLException {
89-
for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
90-
if (entry.getValue().f0) {
91-
upsertExecutor.addToBatch(entry.getValue().f1);
92-
} else {
93-
// delete by key
94-
deleteExecutor.addToBatch(entry.getKey());
89+
if (!reduceBuffer.isEmpty()) {
90+
for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
91+
if (entry.getValue().f0) {
92+
upsertExecutor.addToBatch(entry.getValue().f1);
93+
} else {
94+
// delete by key
95+
deleteExecutor.addToBatch(entry.getKey());
96+
}
9597
}
98+
upsertExecutor.executeBatch();
99+
deleteExecutor.executeBatch();
100+
reduceBuffer.clear();
96101
}
97-
upsertExecutor.executeBatch();
98-
deleteExecutor.executeBatch();
99-
reduceBuffer.clear();
100102
}
101103

102104
@Override

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ public void addToBatch(RowData record) throws SQLException {
5252

5353
@Override
5454
public void executeBatch() throws SQLException {
55-
for (RowData value : buffer) {
56-
statementExecutor.addToBatch(value);
55+
if (!buffer.isEmpty()) {
56+
for (RowData value : buffer) {
57+
statementExecutor.addToBatch(value);
58+
}
59+
statementExecutor.executeBatch();
60+
buffer.clear();
5761
}
58-
statementExecutor.executeBatch();
59-
buffer.clear();
6062
}
6163

6264
@Override

0 commit comments

Comments
 (0)