Skip to content

Commit 544275c

Browse files
roseduanroseduan
andauthored
[minor][improvement][connector/jdbc] Optimize the Postgresql upsert query to avoid unnecessary execution effort
This closes #155 Co-authored-by: roseduan <[email protected]>
1 parent 81e6a4a commit 544275c

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialect.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.util.Arrays;
2727
import java.util.EnumSet;
28+
import java.util.HashSet;
2829
import java.util.Optional;
2930
import java.util.Set;
3031
import java.util.stream.Collectors;
@@ -73,17 +74,22 @@ public Optional<String> getUpsertStatement(
7374
Arrays.stream(uniqueKeyFields)
7475
.map(this::quoteIdentifier)
7576
.collect(Collectors.joining(", "));
77+
final Set<String> uniqueKeyFieldsSet = new HashSet<>(Arrays.asList(uniqueKeyFields));
7678
String updateClause =
7779
Arrays.stream(fieldNames)
80+
.filter(f -> !uniqueKeyFieldsSet.contains(f))
7881
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
7982
.collect(Collectors.joining(", "));
83+
String conflictAction =
84+
updateClause.isEmpty()
85+
? " DO NOTHING"
86+
: String.format(" DO UPDATE SET %s", updateClause);
8087
return Optional.of(
8188
getInsertIntoStatement(tableName, fieldNames)
8289
+ " ON CONFLICT ("
8390
+ uniqueColumns
8491
+ ")"
85-
+ " DO UPDATE SET "
86-
+ updateClause);
92+
+ conflictAction);
8793
}
8894

8995
@Override

flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectTest;
2222
import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
2323

24+
import org.junit.jupiter.api.Test;
25+
2426
import java.util.Arrays;
2527
import java.util.List;
2628

29+
import static org.assertj.core.api.Assertions.assertThat;
30+
2731
/** The PostgresSql params for {@link JdbcDialectTest}. */
2832
class PostgresDialectTest extends JdbcDialectTest implements PostgresTestBase {
2933

@@ -58,4 +62,24 @@ protected List<TestItem> testData() {
5862
"The precision of field 'f0' is out of the TIMESTAMP precision range [1, 6] supported by PostgreSQL dialect."),
5963
createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)"));
6064
}
65+
66+
@Test
67+
void testUpsertStatement() {
68+
PostgresDialect dialect = new PostgresDialect();
69+
final String tableName = "tbl";
70+
final String[] fieldNames = {
71+
"id", "name", "email", "ts", "field1", "field_2", "__field_3__"
72+
};
73+
final String[] doUpdatekeyFields = {"id", "__field_3__"};
74+
final String[] doNothingkeyFields = {
75+
"id", "name", "email", "ts", "field1", "field_2", "__field_3__"
76+
};
77+
78+
assertThat(dialect.getUpsertStatement(tableName, fieldNames, doUpdatekeyFields).get())
79+
.isEqualTo(
80+
"INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON CONFLICT (id, __field_3__) DO UPDATE SET name=EXCLUDED.name, email=EXCLUDED.email, ts=EXCLUDED.ts, field1=EXCLUDED.field1, field_2=EXCLUDED.field_2");
81+
assertThat(dialect.getUpsertStatement(tableName, fieldNames, doNothingkeyFields).get())
82+
.isEqualTo(
83+
"INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON CONFLICT (id, name, email, ts, field1, field_2, __field_3__) DO NOTHING");
84+
}
6185
}

0 commit comments

Comments
 (0)