From 7a8dbf14252a01545aca4a584c1d3ade4b3f4757 Mon Sep 17 00:00:00 2001 From: wulin Date: Thu, 21 Mar 2024 14:50:19 +0800 Subject: [PATCH 1/2] [FLINK-34901][connectors/jdbc] Improve the performance of the update Postgres database --- .../jdbc/dialect/AbstractPostgresCompatibleDialect.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java index 0ca425f30..c2dd06e7a 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java @@ -56,6 +56,7 @@ public Optional getUpsertStatement( .collect(Collectors.joining(", ")); String updateClause = Arrays.stream(fieldNames) + .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f)) .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); return Optional.of( From 8be4bd42f6d3677f903c4c87374be7cb8ae6f86b Mon Sep 17 00:00:00 2001 From: ouyangwulink Date: Thu, 23 May 2024 21:54:57 +0800 Subject: [PATCH 2/2] [FLINK-34901][connectors/jdbc]update clause must EXCLUDED unique index. --- .../jdbc/dialect/AbstractPostgresCompatibleDialect.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java index c2dd06e7a..b90b343df 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java @@ -50,13 +50,14 @@ public String getLimitClause(long limit) { @Override public Optional getUpsertStatement( String tableName, String[] fieldNames, String[] uniqueKeyFields) { + Set uniqueKeyFieldsSet = Arrays.stream(uniqueKeyFields).collect(Collectors.toSet()); String uniqueColumns = - Arrays.stream(uniqueKeyFields) + uniqueKeyFieldsSet.stream() .map(this::quoteIdentifier) .collect(Collectors.joining(", ")); String updateClause = Arrays.stream(fieldNames) - .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f)) + .filter(f -> !uniqueKeyFieldsSet.contains(f)) .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); return Optional.of(