diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java index f5b4af245..4ba4e89c4 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.EnumSet; +import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -69,17 +70,22 @@ public Optional getUpsertStatement( Arrays.stream(uniqueKeyFields) .map(this::quoteIdentifier) .collect(Collectors.joining(", ")); + final Set uniqueKeyFieldsSet = new HashSet<>(Arrays.asList(uniqueKeyFields)); String updateClause = Arrays.stream(fieldNames) + .filter(f -> !uniqueKeyFieldsSet.contains(f)) .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); + String conflictAction = + updateClause.isEmpty() + ? " DO NOTHING" + : String.format(" DO UPDATE SET %s", updateClause); return Optional.of( getInsertIntoStatement(tableName, fieldNames) + " ON CONFLICT (" + uniqueColumns + ")" - + " DO UPDATE SET " - + updateClause); + + conflictAction); } @Override diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectTypeTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectTypeTest.java index ce1c4e8ef..289579b8d 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectTypeTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectTypeTest.java @@ -20,9 +20,13 @@ import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest; +import org.junit.jupiter.api.Test; + import java.util.Arrays; import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + /** The PostgresSql params for {@link JdbcDialectTypeTest}. */ public class PostgresDialectTypeTest extends JdbcDialectTypeTest { @@ -62,4 +66,24 @@ protected List testData() { "The precision of field 'f0' is out of the TIMESTAMP precision range [1, 6] supported by PostgreSQL dialect."), createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)")); } + + @Test + void testUpsertStatement() { + PostgresDialect dialect = new PostgresDialect(); + final String tableName = "tbl"; + final String[] fieldNames = { + "id", "name", "email", "ts", "field1", "field_2", "__field_3__" + }; + final String[] doUpdatekeyFields = {"id", "__field_3__"}; + final String[] doNothingkeyFields = { + "id", "name", "email", "ts", "field1", "field_2", "__field_3__" + }; + + assertThat(dialect.getUpsertStatement(tableName, fieldNames, doUpdatekeyFields).get()) + .isEqualTo( + "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON CONFLICT (id, __field_3__) DO UPDATE SET name=EXCLUDED.name, email=EXCLUDED.email, ts=EXCLUDED.ts, field1=EXCLUDED.field1, field_2=EXCLUDED.field_2"); + assertThat(dialect.getUpsertStatement(tableName, fieldNames, doNothingkeyFields).get()) + .isEqualTo( + "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON CONFLICT (id, name, email, ts, field1, field_2, __field_3__) DO NOTHING"); + } }