Skip to content

Commit cb3253d

Browse files
committed
build sink sql
1 parent b8d4811 commit cb3253d

File tree

12 files changed

+87
-46
lines changed

12 files changed

+87
-46
lines changed

mysql/mysql-side/mysql-all-side/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
<goal>shade</goal>
4141
</goals>
4242
<configuration>
43+
<createDependencyReducedPom>false</createDependencyReducedPom>
4344
<artifactSet>
4445
<excludes>
4546

mysql/mysql-side/mysql-async-side/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
<goal>shade</goal>
4242
</goals>
4343
<configuration>
44+
<createDependencyReducedPom>false</createDependencyReducedPom>
4445
<artifactSet>
4546
<excludes>
4647

mysql/mysql-sink/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<goal>shade</goal>
4040
</goals>
4141
<configuration>
42+
<createDependencyReducedPom>false</createDependencyReducedPom>
4243
<artifactSet>
4344
<excludes>
4445

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2626
import com.dtstack.flink.sql.util.DtStringUtil;
2727

28+
import java.util.Arrays;
2829
import java.util.List;
2930
import java.util.Map;
31+
import java.util.Optional;
32+
import java.util.stream.Collectors;
3033

3134
/**
3235
* Date: 2017/2/27
@@ -49,36 +52,43 @@ public RetractJDBCOutputFormat getOutputFormat() {
4952

5053
@Override
5154
public void buildSql(String scheam, String tableName, List<String> fields) {
52-
buildInsertSql(tableName, fields);
55+
buildReplaceUpsertStatement(tableName, fields);
56+
buildDuplicateUpsertStatement(tableName, fields);
5357
}
5458

5559
@Override
5660
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
5761
return null;
5862
}
5963

60-
private void buildInsertSql(String tableName, List<String> fields) {
61-
String sqlTmp = "replace into " + tableName + " (${fields}) values (${placeholder})";
62-
String fieldsStr = "";
63-
String placeholder = "";
64-
65-
for (String fieldName : fields) {
66-
fieldsStr += ",`" + fieldName + "`";
67-
placeholder += ",?";
68-
}
64+
private void buildReplaceUpsertStatement(String tableName, List<String> fields) {
65+
this.sql = getUpsertIntoStatement("REPLACE ", tableName, fields);
66+
}
6967

70-
fieldsStr = fieldsStr.replaceFirst(",", "");
71-
placeholder = placeholder.replaceFirst(",", "");
68+
public void buildDuplicateUpsertStatement(String tableName, List<String> fields) {
69+
String updateClause = fields.stream().map(f -> quoteIdentifier(f) + "=IFNULL(VALUES(" + quoteIdentifier(f) + "),"+ quoteIdentifier(f) + ")")
70+
.collect(Collectors.joining(", "));
7271

73-
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
74-
this.sql = sqlTmp;
72+
this.sql = getUpsertIntoStatement("INSERT",tableName, fields) +
73+
" ON DUPLICATE KEY UPDATE " + updateClause;
7574
}
7675

76+
public String getUpsertIntoStatement(String operator, String tableName, List<String> fields) {
77+
String columns = fields.stream()
78+
.map(this::quoteIdentifier)
79+
.collect(Collectors.joining(", "));
80+
81+
String placeholders = fields.stream()
82+
.map(f -> "?")
83+
.collect(Collectors.joining(", "));
84+
85+
return operator + " INTO " + quoteIdentifier(tableName) +
86+
"(" + columns + ")" + " VALUES (" + placeholders + ")";
87+
}
7788

7889
@Override
7990
public String getDriverName() {
8091
return MYSQL_DRIVER;
8192
}
8293

83-
8494
}

oracle/oracle-side/oracle-all-side/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
<goal>shade</goal>
4141
</goals>
4242
<configuration>
43+
<createDependencyReducedPom>false</createDependencyReducedPom>
4344
<artifactSet>
4445
<excludes>
4546

oracle/oracle-side/oracle-async-side/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<goal>shade</goal>
4040
</goals>
4141
<configuration>
42+
<createDependencyReducedPom>false</createDependencyReducedPom>
4243
<artifactSet>
4344
<excludes>
4445

oracle/oracle-sink/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<goal>shade</goal>
4040
</goals>
4141
<configuration>
42+
<createDependencyReducedPom>false</createDependencyReducedPom>
4243
<artifactSet>
4344
<excludes>
4445

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Iterator;
3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.stream.Collectors;
3233

3334
/**
3435
* Reason:
@@ -56,23 +57,15 @@ public void buildSql(String scheam, String tableName, List<String> fields) {
5657
}
5758

5859
private void buildInsertSql(String scheam, String tableName, List<String> fields) {
59-
60-
tableName = DtStringUtil.getTableFullPath(scheam,tableName);
61-
62-
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
63-
64-
List<String> adaptFields = Lists.newArrayList();
65-
fields.forEach(field -> adaptFields.add(DtStringUtil.addQuoteForStr(field)));
66-
67-
String fieldsStr = StringUtils.join(adaptFields, ",");
68-
String placeholder = "";
69-
70-
for (String fieldName : fields) {
71-
placeholder += ",?";
72-
}
73-
placeholder = placeholder.replaceFirst(",", "");
74-
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
75-
this.sql = sqlTmp;
60+
tableName = DtStringUtil.getTableFullPath(scheam, tableName);
61+
String columns = fields.stream()
62+
.map(this::quoteIdentifier)
63+
.collect(Collectors.joining(", "));
64+
65+
String placeholders = fields.stream()
66+
.map(f -> "?")
67+
.collect(Collectors.joining(", "));
68+
this.sql = "INSERT INTO " + tableName + "(" + columns + ")" + " VALUES (" + placeholders + ")";
7669
}
7770

7871
/**
@@ -94,7 +87,8 @@ public String buildUpdateSql(String scheam, String tableName, List<String> field
9487
+ updateKeySql(realIndexes) + ") ");
9588

9689

97-
String updateSql = getUpdateSql(fieldNames, fullField, "T1", "T2", keyColList(realIndexes));
90+
String updateSql1 = buildUpdateSqlForAllValue(fieldNames, fullField, "T1", "T2", keyColList(realIndexes));
91+
String updateSql = buildUpdateSqlForNotnullValue(fieldNames, fullField, "T1", "T2", keyColList(realIndexes));
9892

9993
if (StringUtils.isNotEmpty(updateSql)) {
10094
sb.append(" WHEN MATCHED THEN UPDATE SET ");
@@ -149,24 +143,47 @@ protected List<String> keyColList(Map<String, List<String>> realIndexes) {
149143
* @param indexCols index column
150144
* @return
151145
*/
152-
public String getUpdateSql(List<String> updateColumn, List<String> fullColumn, String leftTable, String rightTable, List<String> indexCols) {
146+
public String buildUpdateSqlForAllValue(List<String> updateColumn, List<String> fullColumn, String leftTable, String rightTable, List<String> indexCols) {
153147
String prefixLeft = StringUtils.isBlank(leftTable) ? "" : DtStringUtil.addQuoteForStr(leftTable) + ".";
154148
String prefixRight = StringUtils.isBlank(rightTable) ? "" : DtStringUtil.addQuoteForStr(rightTable) + ".";
155-
List<String> list = new ArrayList<>();
156-
for (String col : fullColumn) {
157-
// filter index column
158-
if (indexCols == null || indexCols.size() == 0 || containsIgnoreCase(indexCols,col)) {
159-
continue;
160-
}
161-
if (containsIgnoreCase(updateColumn,col)) {
162-
list.add(prefixLeft + DtStringUtil.addQuoteForStr(col) + "=" + prefixRight + DtStringUtil.addQuoteForStr(col));
149+
150+
String sql = fullColumn.stream().filter(col -> {
151+
return !(indexCols == null || indexCols.size() == 0 || containsIgnoreCase(indexCols, col));
152+
}).map(col -> {
153+
String leftCol = prefixLeft + DtStringUtil.addQuoteForStr(col);
154+
String rightCol = prefixRight + DtStringUtil.addQuoteForStr(col);
155+
156+
if (containsIgnoreCase(updateColumn, col)) {
157+
return (leftCol + "=" + rightCol);
163158
} else {
164-
list.add(prefixLeft + DtStringUtil.addQuoteForStr(col) + "=null");
159+
return (leftCol + "=null");
165160
}
166-
}
167-
return StringUtils.join(list, ",");
161+
}).collect(Collectors.joining(","));
162+
163+
return sql;
168164
}
169165

166+
public String buildUpdateSqlForNotnullValue(List<String> updateColumn, List<String> fullColumn, String leftTable, String rightTable, List<String> indexCols) {
167+
String prefixLeft = StringUtils.isBlank(leftTable) ? "" : DtStringUtil.addQuoteForStr(leftTable) + ".";
168+
String prefixRight = StringUtils.isBlank(rightTable) ? "" : DtStringUtil.addQuoteForStr(rightTable) + ".";
169+
170+
String sql = fullColumn.stream().filter(col -> {
171+
return !(indexCols == null || indexCols.size() == 0 || containsIgnoreCase(indexCols, col));
172+
}).map(col -> {
173+
String leftCol = prefixLeft + DtStringUtil.addQuoteForStr(col);
174+
String rightCol = prefixRight + DtStringUtil.addQuoteForStr(col);
175+
176+
if (containsIgnoreCase(updateColumn, col)) {
177+
return leftCol + "= nvl(" + rightCol + "," + leftCol + ")";
178+
}
179+
return "";
180+
}).collect(Collectors.joining(","));
181+
182+
return sql;
183+
}
184+
185+
186+
170187

171188
/**
172189
* build connect sql by index column, such as T1."A"=T2."A"
@@ -212,6 +229,8 @@ public boolean containsIgnoreCase(List<String> l, String s) {
212229
return false;
213230
}
214231

215-
232+
public String quoteIdentifier(String identifier) {
233+
return "\"" + identifier + "\"";
234+
}
216235

217236
}

postgresql/postgresql-side/postgresql-all-side/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
<goal>shade</goal>
4141
</goals>
4242
<configuration>
43+
<createDependencyReducedPom>false</createDependencyReducedPom>
4344
<artifactSet>
4445
<excludes>
4546

postgresql/postgresql-side/postgresql-async-side/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
<goal>shade</goal>
4242
</goals>
4343
<configuration>
44+
<createDependencyReducedPom>false</createDependencyReducedPom>
4445
<artifactSet>
4546
<excludes>
4647

0 commit comments

Comments
 (0)