Skip to content

Commit ef99a45

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_4.0.x_32086' into 1.8_release_4.0.x
2 parents c52042f + 01dd7d4 commit ef99a45

File tree

20 files changed

+1023
-4
lines changed

20 files changed

+1023
-4
lines changed

flinkx-core/src/main/java/com/dtstack/flinkx/config/ReaderConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public void setColumn(List column) {
9595
public class ConnectionConfig extends AbstractConfig {
9696

9797
public static final String KEY_TABLE_LIST = "table";
98+
public static final String KEY_SCHEMA = "schema";
9899
public static final String KEY_JDBC_URL_LIST = "jdbcUrl";
99100
public static final String KEY_JDBC_USERNAME = "username";
100101
public static final String KEY_JDBC_PASSWORD = "password";
@@ -111,6 +112,14 @@ public void setTable(List<String> table) {
111112
setVal(KEY_TABLE_LIST, table);
112113
}
113114

115+
public String getSchema(){
116+
return (String) getVal(KEY_SCHEMA);
117+
}
118+
119+
public void setSchema(String schema){
120+
setVal(KEY_SCHEMA, schema);
121+
}
122+
114123
public List<String> getJdbcUrl() {
115124
return (List<String>) getVal(KEY_JDBC_URL_LIST);
116125
}

flinkx-core/src/main/java/com/dtstack/flinkx/config/WriterConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,11 @@ public void setConnection(List<ConnectionConfig> connection) {
9696
public class ConnectionConfig extends AbstractConfig {
9797
private static final String KEY_JDBC_URL = "jdbcUrl";
9898
private static final String KEY_TABLE_LIST = "table";
99+
public static final String KEY_SCHEMA = "schema";
99100

100101
private String jdbcUrl;
101102
private List<String> table;
103+
private String schema;
102104

103105
public ConnectionConfig(Map<String, Object> map) {
104106
super(map);
@@ -109,6 +111,7 @@ public ConnectionConfig(Map<String, Object> map) {
109111
jdbcUrl = ((List) jdbcUrlObj).get(0).toString();
110112
}
111113
table = (List<String>) getVal(KEY_TABLE_LIST);
114+
schema = (String) getVal(KEY_SCHEMA);
112115
}
113116

114117
public String getJdbcUrl() {
@@ -126,6 +129,10 @@ public List<String> getTable() {
126129
public void setTable(List<String> table) {
127130
this.table = table;
128131
}
132+
133+
public String getSchema(){return schema;}
134+
135+
public void setSchema(String schema){this.schema = schema;}
129136
}
130137

131138
}

flinkx-core/src/main/java/com/dtstack/flinkx/enums/EDatabaseType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,6 @@ public enum EDatabaseType {
5454
polarDB,
5555
Phoenix,
5656
dm,
57-
SapHana
57+
SapHana,
58+
KingBase
5859
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>flinkx-kingbase</artifactId>
7+
<groupId>com.dtstack.flinkx</groupId>
8+
<version>1.6</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>flinkx-kingbase-core</artifactId>
13+
14+
<dependencies>
15+
<dependency>
16+
<groupId>com.kingbase8</groupId>
17+
<artifactId>kingbase8</artifactId>
18+
<version>8.2.0</version>
19+
</dependency>
20+
21+
</dependencies>
22+
23+
24+
</project>
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flinkx.kingbase.constants;
20+
21+
/**
22+
* kingbase常量
23+
*
24+
* Company: www.dtstack.com
25+
26+
*/
27+
28+
public class KingbaseCons {
29+
30+
public static final String DRIVER = "com.kingbase8.Driver";
31+
/**
32+
* kingbase 主键索引名后缀
33+
*/
34+
public static final String KEY_UPDATE_KEY = "key";
35+
36+
public static final String KEY_PRIMARY_SUFFIX = "_PKEY";
37+
38+
public static final String INSERT_SQL_MODE_TYPE = "copy";
39+
40+
/**
41+
* copy语法分隔符
42+
*/
43+
public static final String DEFAULT_FIELD_DELIM = "\001";
44+
public static final String DEFAULT_NULL_DELIM = "\002";
45+
public static final String LINE_DELIMITER = "\n";
46+
47+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flinkx.kingbase.util;
20+
21+
import com.dtstack.flinkx.enums.EDatabaseType;
22+
import com.dtstack.flinkx.rdb.BaseDatabaseMeta;
23+
import org.apache.commons.lang3.StringUtils;
24+
25+
import java.util.ArrayList;
26+
import java.util.LinkedList;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
import static com.dtstack.flinkx.constants.ConstantValue.COMMA_SYMBOL;
31+
import static com.dtstack.flinkx.constants.ConstantValue.LEFT_PARENTHESIS_SYMBOL;
32+
import static com.dtstack.flinkx.constants.ConstantValue.RIGHT_PARENTHESIS_SYMBOL;
33+
import static com.dtstack.flinkx.kingbase.constants.KingbaseCons.DRIVER;
34+
import static com.dtstack.flinkx.kingbase.constants.KingbaseCons.KEY_PRIMARY_SUFFIX;
35+
import static com.dtstack.flinkx.kingbase.constants.KingbaseCons.KEY_UPDATE_KEY;
36+
37+
/**
38+
* The class of KingBase database prototype
39+
*
40+
* Company: www.dtstack.com
41+
42+
*/
43+
44+
public class KingBaseDatabaseMeta extends BaseDatabaseMeta {
45+
46+
@Override
47+
protected String makeValues(List<String> column) {
48+
StringBuilder sb = new StringBuilder();
49+
sb.append(LEFT_PARENTHESIS_SYMBOL);
50+
for(int i = 0; i < column.size(); ++i) {
51+
if(i != 0) {
52+
sb.append(COMMA_SYMBOL);
53+
}
54+
sb.append(quoteColumn(column.get(i)));
55+
}
56+
sb.append(RIGHT_PARENTHESIS_SYMBOL);
57+
return sb.toString();
58+
}
59+
60+
@Override
61+
public EDatabaseType getDatabaseType() {
62+
return EDatabaseType.KingBase;
63+
}
64+
65+
@Override
66+
public String getDriverClass() {
67+
return DRIVER;
68+
}
69+
70+
@Override
71+
public String getSqlQueryFields(String tableName) {
72+
return "SELECT * FROM " + tableName + " LIMIT 0";
73+
}
74+
75+
@Override
76+
public String getSqlQueryColumnFields(List<String> column, String table) {
77+
return "SELECT " + quoteColumns(column) + " FROM " + quoteTable(table) + " LIMIT 0";
78+
}
79+
80+
/**
81+
* Kingbase 的主键索引名为TABLE_PKEY格式
82+
* @param column column名
83+
* @param table 表名
84+
* @param updateKey 索引
85+
* @return updateSql
86+
*/
87+
@Override
88+
public String getUpsertStatement(List<String> column, String table, Map<String,List<String>> updateKey) {
89+
List<String> columnList = new LinkedList<>();
90+
updateKey.forEach((key, value) -> {
91+
// 兼顾查询主键索引名或者填入key map的情况
92+
if (StringUtils.endsWith(key, KEY_PRIMARY_SUFFIX) || StringUtils.equals(key, KEY_UPDATE_KEY)) {
93+
columnList.addAll(value);
94+
}
95+
});
96+
return "INSERT INTO " + quoteTable(table)
97+
+ " (" + quoteColumns(column) + ") VALUES "
98+
+ makeValues(column.size())
99+
+ " ON CONFLICT " +makeValues(columnList) + " DO UPDATE SET "
100+
+ makeUpdatePart(column);
101+
}
102+
103+
private String makeValues(int nCols) {
104+
return LEFT_PARENTHESIS_SYMBOL + StringUtils.repeat("?", ",", nCols) + RIGHT_PARENTHESIS_SYMBOL;
105+
}
106+
107+
private String makeUpdatePart (List<String> column) {
108+
List<String> updateList = new ArrayList<>();
109+
for(String col : column) {
110+
String quotedCol = quoteColumn(col);
111+
updateList.add(quotedCol + "=EXCLUDED." + quotedCol);
112+
}
113+
return StringUtils.join(updateList, COMMA_SYMBOL);
114+
}
115+
116+
@Override
117+
public String quoteValue(String value, String column) {
118+
return String.format("\"%s\" as %s",value,column);
119+
}
120+
121+
@Override
122+
public String getSplitFilter(String columnName) {
123+
return String.format("mod(%s, ${N}) = ${M}", getStartQuote() + columnName + getEndQuote());
124+
}
125+
126+
@Override
127+
public String getSplitFilterWithTmpTable(String tmpTable, String columnName) {
128+
return String.format("mod(%s.%s, ${N}) = ${M}", tmpTable, getStartQuote() + columnName + getEndQuote());
129+
}
130+
131+
@Override
132+
public String getStartQuote() {
133+
return "";
134+
}
135+
136+
@Override
137+
public String getEndQuote() {
138+
return "";
139+
}
140+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.dtstack.flinkx.kingbase.util;
2+
3+
import com.dtstack.flinkx.rdb.type.TypeConverterInterface;
4+
import org.apache.commons.lang3.StringUtils;
5+
6+
import java.util.Arrays;
7+
import java.util.Collections;
8+
import java.util.List;
9+
10+
/**
11+
* kingbase中不常用类型转换为java类型
12+
* @Company: www.dtstack.com
13+
14+
*/
15+
public class KingBaseTypeConverter implements TypeConverterInterface {
16+
17+
private List<String> stringTypes = Arrays.asList("uuid", "xml", "cidr", "inet", "macaddr", "text", "character", "character varying");
18+
19+
private List<String> byteTypes = Arrays.asList("bytea","bit varying");
20+
21+
private List<String> bitTypes = Collections.singletonList("bit");
22+
23+
private List<String> doubleTypes = Arrays.asList("double precision","double","float8","money");
24+
25+
private List<String> intTypes = Arrays.asList("int","int2","int4","int8","integer","bigint","bigserial","smallint");
26+
27+
@Override
28+
public Object convert(Object data,String typeName) {
29+
if (data == null){
30+
return null;
31+
}
32+
String dataValue = data.toString();
33+
if(stringTypes.contains(typeName)){
34+
return dataValue;
35+
}
36+
if(StringUtils.isBlank(dataValue)){
37+
return null;
38+
}
39+
if(doubleTypes.contains(typeName)){
40+
if(StringUtils.startsWith(dataValue, "$")){
41+
dataValue = StringUtils.substring(dataValue, 1);
42+
}
43+
data = Double.parseDouble(dataValue);
44+
} else if(bitTypes.contains(typeName)){
45+
//
46+
}else if(byteTypes.contains(typeName)){
47+
data = Byte.valueOf(dataValue);
48+
} else if(intTypes.contains(typeName)){
49+
data = Integer.parseInt(dataValue);
50+
}
51+
52+
return data;
53+
}
54+
}

0 commit comments

Comments
 (0)