Skip to content

Commit c1b3239

Browse files
committed
Merge pull request #155 from apache/main
Optimize the Postgresql upsert query to avoid unnecessary execution effort
1 parent cb26a9c commit c1b3239

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.util.Arrays;
2828
import java.util.EnumSet;
29+
import java.util.HashSet;
2930
import java.util.Optional;
3031
import java.util.Set;
3132
import java.util.stream.Collectors;
@@ -69,17 +70,22 @@ public Optional<String> getUpsertStatement(
6970
Arrays.stream(uniqueKeyFields)
7071
.map(this::quoteIdentifier)
7172
.collect(Collectors.joining(", "));
73+
final Set<String> uniqueKeyFieldsSet = new HashSet<>(Arrays.asList(uniqueKeyFields));
7274
String updateClause =
7375
Arrays.stream(fieldNames)
76+
.filter(f -> !uniqueKeyFieldsSet.contains(f))
7477
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
7578
.collect(Collectors.joining(", "));
79+
String conflictAction =
80+
updateClause.isEmpty()
81+
? " DO NOTHING"
82+
: String.format(" DO UPDATE SET %s", updateClause);
7683
return Optional.of(
7784
getInsertIntoStatement(tableName, fieldNames)
7885
+ " ON CONFLICT ("
7986
+ uniqueColumns
8087
+ ")"
81-
+ " DO UPDATE SET "
82-
+ updateClause);
88+
+ conflictAction);
8389
}
8490

8591
@Override

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialectTypeTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020

2121
import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest;
2222

23+
import org.junit.jupiter.api.Test;
24+
2325
import java.util.Arrays;
2426
import java.util.List;
2527

28+
import static org.assertj.core.api.Assertions.assertThat;
29+
2630
/** The PostgresSql params for {@link JdbcDialectTypeTest}. */
2731
public class PostgresDialectTypeTest extends JdbcDialectTypeTest {
2832

@@ -62,4 +66,24 @@ protected List<TestItem> testData() {
6266
"The precision of field 'f0' is out of the TIMESTAMP precision range [1, 6] supported by PostgreSQL dialect."),
6367
createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)"));
6468
}
69+
70+
@Test
71+
void testUpsertStatement() {
72+
PostgresDialect dialect = new PostgresDialect();
73+
final String tableName = "tbl";
74+
final String[] fieldNames = {
75+
"id", "name", "email", "ts", "field1", "field_2", "__field_3__"
76+
};
77+
final String[] doUpdatekeyFields = {"id", "__field_3__"};
78+
final String[] doNothingkeyFields = {
79+
"id", "name", "email", "ts", "field1", "field_2", "__field_3__"
80+
};
81+
82+
assertThat(dialect.getUpsertStatement(tableName, fieldNames, doUpdatekeyFields).get())
83+
.isEqualTo(
84+
"INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON CONFLICT (id, __field_3__) DO UPDATE SET name=EXCLUDED.name, email=EXCLUDED.email, ts=EXCLUDED.ts, field1=EXCLUDED.field1, field_2=EXCLUDED.field_2");
85+
assertThat(dialect.getUpsertStatement(tableName, fieldNames, doNothingkeyFields).get())
86+
.isEqualTo(
87+
"INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON CONFLICT (id, name, email, ts, field1, field_2, __field_3__) DO NOTHING");
88+
}
6589
}

0 commit comments

Comments
 (0)