diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java index 0ca425f30..b90b343df 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java @@ -50,12 +50,14 @@ public String getLimitClause(long limit) { @Override public Optional getUpsertStatement( String tableName, String[] fieldNames, String[] uniqueKeyFields) { + Set uniqueKeyFieldsSet = Arrays.stream(uniqueKeyFields).collect(Collectors.toSet()); String uniqueColumns = - Arrays.stream(uniqueKeyFields) + uniqueKeyFieldsSet.stream() .map(this::quoteIdentifier) .collect(Collectors.joining(", ")); String updateClause = Arrays.stream(fieldNames) + .filter(f -> !uniqueKeyFieldsSet.contains(f)) .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); return Optional.of(