Skip to content

Commit 01dd7d4

Browse files
committed
feat: 解决update模式下查找主键失败的问题
1 parent 0bffca7 commit 01dd7d4

File tree

4 files changed

+89
-16
lines changed

4 files changed

+89
-16
lines changed

flinkx-kingbase/flinkx-kingbase-core/src/main/java/com/dtstack/flinkx/kingbase/constants/KingbaseCons.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828
public class KingbaseCons {
2929

3030
public static final String DRIVER = "com.kingbase8.Driver";
31+
/**
32+
* kingbase 主键索引名后缀
33+
*/
3134
public static final String KEY_UPDATE_KEY = "key";
35+
36+
public static final String KEY_PRIMARY_SUFFIX = "_PKEY";
37+
3238
public static final String INSERT_SQL_MODE_TYPE = "copy";
3339

3440
/**

flinkx-kingbase/flinkx-kingbase-core/src/main/java/com/dtstack/flinkx/kingbase/util/KingBaseDatabaseMeta.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import org.apache.commons.lang3.StringUtils;
2424

2525
import java.util.ArrayList;
26+
import java.util.LinkedList;
2627
import java.util.List;
2728
import java.util.Map;
2829

2930
import static com.dtstack.flinkx.constants.ConstantValue.COMMA_SYMBOL;
3031
import static com.dtstack.flinkx.constants.ConstantValue.LEFT_PARENTHESIS_SYMBOL;
3132
import static com.dtstack.flinkx.constants.ConstantValue.RIGHT_PARENTHESIS_SYMBOL;
3233
import static com.dtstack.flinkx.kingbase.constants.KingbaseCons.DRIVER;
34+
import static com.dtstack.flinkx.kingbase.constants.KingbaseCons.KEY_PRIMARY_SUFFIX;
3335
import static com.dtstack.flinkx.kingbase.constants.KingbaseCons.KEY_UPDATE_KEY;
3436

3537
/**
@@ -75,12 +77,26 @@ public String getSqlQueryColumnFields(List<String> column, String table) {
7577
return "SELECT " + quoteColumns(column) + " FROM " + quoteTable(table) + " LIMIT 0";
7678
}
7779

80+
/**
81+
* Kingbase 的主键索引名为TABLE_PKEY格式
82+
* @param column column名
83+
* @param table 表名
84+
* @param updateKey 索引
85+
* @return updateSql
86+
*/
7887
@Override
7988
public String getUpsertStatement(List<String> column, String table, Map<String,List<String>> updateKey) {
89+
List<String> columnList = new LinkedList<>();
90+
updateKey.forEach((key, value) -> {
91+
// 兼顾查询主键索引名或者填入key map的情况
92+
if (StringUtils.endsWith(key, KEY_PRIMARY_SUFFIX) || StringUtils.equals(key, KEY_UPDATE_KEY)) {
93+
columnList.addAll(value);
94+
}
95+
});
8096
return "INSERT INTO " + quoteTable(table)
8197
+ " (" + quoteColumns(column) + ") VALUES "
8298
+ makeValues(column.size())
83-
+ " ON CONFLICT " +makeValues(updateKey.get(KEY_UPDATE_KEY)) + " DO UPDATE SET "
99+
+ " ON CONFLICT " +makeValues(columnList) + " DO UPDATE SET "
84100
+ makeUpdatePart(column);
85101
}
86102

flinkx-kingbase/flinkx-kingbase-writer/src/main/java/com/dtstack/flinkx/kingbase/format/KingbaseOutputFormat.java

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
package com.dtstack.flinkx.kingbase.format;
2020

21+
import com.dtstack.flinkx.constants.ConstantValue;
2122
import com.dtstack.flinkx.enums.EWriteMode;
2223
import com.dtstack.flinkx.exception.WriteRecordException;
2324
import com.dtstack.flinkx.rdb.outputformat.JdbcOutputFormat;
25+
import com.dtstack.flinkx.rdb.util.DbUtil;
26+
import com.dtstack.flinkx.util.ClassUtil;
2427
import com.dtstack.flinkx.util.ExceptionUtil;
25-
import com.dtstack.flinkx.util.StringUtil;
2628
import com.kingbase8.copy.CopyManager;
2729
import com.kingbase8.core.BaseConnection;
2830
import org.apache.commons.collections.CollectionUtils;
@@ -54,6 +56,61 @@ public class KingbaseOutputFormat extends JdbcOutputFormat {
5456

5557
private CopyManager copyManager;
5658

59+
/**
60+
* schema名
61+
*/
62+
public String schema;
63+
64+
@Override
65+
protected void openInternal(int taskNumber, int numTasks){
66+
try {
67+
ClassUtil.forName(driverName, getClass().getClassLoader());
68+
dbConn = DbUtil.getConnection(dbUrl, username, password);
69+
70+
if (restoreConfig.isRestore()){
71+
dbConn.setAutoCommit(false);
72+
}
73+
// 查询主键时用table格式
74+
if(CollectionUtils.isEmpty(fullColumn)) {
75+
fullColumn = probeFullColumns(table, dbConn);
76+
}
77+
78+
if (!EWriteMode.INSERT.name().equalsIgnoreCase(mode)){
79+
if(updateKey == null || updateKey.size() == 0) {
80+
updateKey = probePrimaryKeys(table, dbConn);
81+
}
82+
}
83+
// 其他情况,使用schema.table作为表名
84+
table = schema + ConstantValue.POINT_SYMBOL + table;
85+
if(fullColumnType == null) {
86+
fullColumnType = analyzeTable();
87+
}
88+
89+
for(String col : column) {
90+
for (int i = 0; i < fullColumn.size(); i++) {
91+
if (col.equalsIgnoreCase(fullColumn.get(i))){
92+
columnType.add(fullColumnType.get(i));
93+
break;
94+
}
95+
}
96+
}
97+
98+
preparedStatement = prepareTemplates();
99+
readyCheckpoint = false;
100+
101+
LOG.info("subTask[{}}] wait finished", taskNumber);
102+
} catch (SQLException sqe) {
103+
throw new IllegalArgumentException("open() failed.", sqe);
104+
}
105+
try {
106+
if (batchInterval > 1) {
107+
dbConn.setAutoCommit(false);
108+
}
109+
} catch (Exception e) {
110+
LOG.warn(ExceptionUtil.getErrorMessage(e));
111+
}
112+
}
113+
57114

58115
@Override
59116
protected PreparedStatement prepareTemplates() throws SQLException {
@@ -71,18 +128,6 @@ protected PreparedStatement prepareTemplates() throws SQLException {
71128
return super.prepareTemplates();
72129
}
73130

74-
@Override
75-
protected void openInternal(int taskNumber, int numTasks){
76-
super.openInternal(taskNumber, numTasks);
77-
try {
78-
if (batchInterval > 1) {
79-
dbConn.setAutoCommit(false);
80-
}
81-
} catch (Exception e) {
82-
LOG.warn(ExceptionUtil.getErrorMessage(e));
83-
}
84-
}
85-
86131
@Override
87132
protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
88133
if(!checkIsCopyMode(insertSqlMode)){
@@ -195,4 +240,8 @@ private boolean checkIsCopyMode(String insertMode){
195240
return true;
196241
}
197242

243+
public void setSchema(String schema){
244+
this.schema = schema;
245+
}
246+
198247
}

flinkx-kingbase/flinkx-kingbase-writer/src/main/java/com/dtstack/flinkx/kingbase/writer/KingbaseWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,19 @@
4141

4242
public class KingbaseWriter extends JdbcDataWriter {
4343

44+
public String schema;
45+
4446
public KingbaseWriter(DataTransferConfig config) {
4547
super(config);
46-
String schema = config.getJob().getContent().get(0).getWriter().getParameter().getConnection().get(0).getSchema();
47-
table = schema + ConstantValue.POINT_SYMBOL + table;
48+
schema = config.getJob().getContent().get(0).getWriter().getParameter().getConnection().get(0).getSchema();
4849
setDatabaseInterface(new KingBaseDatabaseMeta());
4950
setTypeConverterInterface(new KingBaseTypeConverter());
5051
}
5152

5253
@Override
5354
public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
5455
KingbaseOutputFormat kingBaseOutputFormat = new KingbaseOutputFormat();
56+
kingBaseOutputFormat.setSchema(schema);
5557
JdbcOutputFormatBuilder builder = new JdbcOutputFormatBuilder(kingBaseOutputFormat);
5658
builder.setDriverName(databaseInterface.getDriverClass());
5759
builder.setDbUrl(dbUrl);

0 commit comments

Comments
 (0)