Skip to content

Commit 450bfa5

Browse files
author
toutian
committed
Merge branch '1.5_v3.6.0_oralce_table_column_case_bug' into '1.5_v3.6.0'
修改oracle 拼接sql,对表和字段 添加 \"\" (1)修改oracle 拼接sql,对表和字段 添加 \"\" (2)修改json解析的时候遇到未定义的key的时候不再抛异常,返回null See merge request !7
2 parents 3a7fae9 + d5c5bf1 commit 450bfa5

File tree

4 files changed

+22
-1
lines changed

4 files changed

+22
-1
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ public void setFailOnMissingField(boolean failOnMissingField) {
150150
private JsonNode getIgnoreCase(String key) {
151151
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
152152
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
153+
if(node == null){
154+
return null;
155+
}
156+
153157
JsonNodeType nodeType = node.getNodeType();
154158

155159
if (nodeType == JsonNodeType.ARRAY){

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ public Row deserialize(byte[] message) throws IOException {
145145
public JsonNode getIgnoreCase(String key) {
146146
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
147147
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
148+
149+
if(node == null){
150+
return null;
151+
}
152+
148153
JsonNodeType nodeType = node.getNodeType();
149154

150155
if (nodeType==JsonNodeType.ARRAY){

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ public Row deserialize(byte[] message) throws IOException {
148148
public JsonNode getIgnoreCase(String key) {
149149
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
150150
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
151+
if(node == null){
152+
return null;
153+
}
154+
151155
JsonNodeType nodeType = node.getNodeType();
152156

153157
if (nodeType==JsonNodeType.ARRAY){

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flink.sql.sink.rdb.format.ExtendOutputFormat;
2323
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2424
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
2526

2627
import java.util.ArrayList;
2728
import java.util.Iterator;
@@ -54,8 +55,14 @@ public void buildSql(String tableName, List<String> fields) {
5455
}
5556

5657
private void buildInsertSql(String tableName, List<String> fields) {
58+
59+
tableName = quoteTable(tableName);
5760
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
58-
String fieldsStr = StringUtils.join(fields, ",");
61+
62+
List<String> adaptFields = Lists.newArrayList();
63+
fields.forEach(field -> adaptFields.add(quoteColumn(field)));
64+
65+
String fieldsStr = StringUtils.join(adaptFields, ",");
5966
String placeholder = "";
6067

6168
for (String fieldName : fields) {
@@ -68,6 +75,7 @@ private void buildInsertSql(String tableName, List<String> fields) {
6875

6976
@Override
7077
public String buildUpdateSql(String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
78+
tableName = quoteTable(tableName);
7179
return "MERGE INTO " + tableName + " T1 USING "
7280
+ "(" + makeValues(fieldNames) + ") T2 ON ("
7381
+ updateKeySql(realIndexes) + ") WHEN MATCHED THEN UPDATE SET "

0 commit comments

Comments
 (0)