Skip to content

Commit e902d31

Browse files
committed
oracle upsert mode
1 parent ebe21fb commit e902d31

File tree

8 files changed

+312
-206
lines changed

8 files changed

+312
-206
lines changed

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MySQLDialect.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public String quoteIdentifier(String identifier) {
5656
* @return
5757
*/
5858
@Override
59-
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
59+
public Optional<String> getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
6060
return allReplace ? buildReplaceIntoStatement(tableName, fieldNames) : buildDuplicateUpsertStatement(tableName, fieldNames);
6161
}
6262

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535

3636
public class MysqlSink extends RdbSink implements IStreamSinkGener<RdbSink> {
3737

38-
private static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
39-
4038
public MysqlSink() {
4139
super(new MySQLDialect());
4240
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.oracle;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
import com.dtstack.flink.sql.util.DtStringUtil;
23+
import org.apache.commons.lang3.StringUtils;
24+
25+
import java.util.Arrays;
26+
import java.util.List;
27+
import java.util.Optional;
28+
import java.util.stream.Collectors;
29+
30+
/**
31+
* Date: 2020/1/3
32+
* Company: www.dtstack.com
33+
* @author maqi
34+
*/
35+
public class OracleDialect implements JDBCDialect {
36+
37+
@Override
38+
public boolean canHandle(String url) {
39+
return url.startsWith("jdbc:oracle:");
40+
}
41+
42+
@Override
43+
public Optional<String> defaultDriverName() {
44+
return Optional.of("oracle.jdbc.driver.OracleDriver");
45+
}
46+
47+
@Override
48+
public Optional<String> getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
49+
tableName = DtStringUtil.getTableFullPath(schema, tableName);
50+
StringBuilder sb = new StringBuilder();
51+
sb.append("MERGE INTO " + tableName + " T1 USING "
52+
+ "(" + buildDualQueryStatement(fieldNames) + ") T2 ON ("
53+
+ buildConnectionConditions(uniqueKeyFields) + ") ");
54+
55+
String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace);
56+
57+
if (StringUtils.isNotEmpty(updateSql)) {
58+
sb.append(" WHEN MATCHED THEN UPDATE SET ");
59+
sb.append(updateSql);
60+
}
61+
62+
sb.append(" WHEN NOT MATCHED THEN "
63+
+ "INSERT (" + Arrays.stream(fieldNames).map(this::quoteIdentifier).collect(Collectors.joining(",")) + ") VALUES ("
64+
+ Arrays.stream(fieldNames).map(col -> "T2." + quoteIdentifier(col)).collect(Collectors.joining(",")) + ")");
65+
66+
return Optional.of(sb.toString());
67+
}
68+
69+
/**
70+
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
71+
* @param fieldNames
72+
* @param uniqueKeyFields
73+
* @param allReplace
74+
* @return
75+
*/
76+
private String buildUpdateConnection(String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
77+
List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
78+
return Arrays.stream(fieldNames).filter(col -> !uniqueKeyList.contains(col)).map(col -> {
79+
return allReplace ? quoteIdentifier("T1") + "." + quoteIdentifier(col) + " = " + quoteIdentifier("T2") + "." + quoteIdentifier(col) :
80+
quoteIdentifier("T1") + "." + quoteIdentifier(col) + " =nvl(" + quoteIdentifier("T2") + "." + quoteIdentifier(col) + ","
81+
+ quoteIdentifier("T1") + "." + quoteIdentifier(col) + ")";
82+
}).collect(Collectors.joining(","));
83+
}
84+
85+
86+
private String buildConnectionConditions(String[] uniqueKeyFields) {
87+
return Arrays.stream(uniqueKeyFields).map(col -> "T1." + quoteIdentifier(col) + "=T2." + quoteIdentifier(col)).collect(Collectors.joining(","));
88+
}
89+
90+
/**
91+
* build select sql , such as (SELECT ? "A",? "B" FROM DUAL)
92+
*
93+
* @param column destination column
94+
* @return
95+
*/
96+
public String buildDualQueryStatement(String[] column) {
97+
StringBuilder sb = new StringBuilder("SELECT ");
98+
String collect = Arrays.stream(column).map(col -> " ? " + quoteIdentifier(col)).collect(Collectors.joining(", "));
99+
sb.append(collect).append(" FROM DUAL");
100+
return sb.toString();
101+
}
102+
}

0 commit comments

Comments
 (0)