Skip to content

Commit 0142a4f

Browse files
committed
rdb sink add update mode
1 parent cb3253d commit 0142a4f

File tree

28 files changed

+1419
-873
lines changed

28 files changed

+1419
-873
lines changed

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2424
import com.dtstack.flink.sql.sink.rdb.RdbSink;
25-
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2625

2726
import java.util.List;
2827
import java.util.Map;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.enums;
20+
21+
/**
22+
* restract stream数据处理模式
23+
*
24+
* Reason:
25+
* Date: 2019/1/2
26+
* Company: www.dtstack.com
27+
* @author maqi
28+
*/
29+
public enum EUpdateMode {
30+
// 不回撤数据,只下发增量数据
31+
APPEND(0),
32+
// 先删除回撤数据,然后更新
33+
UPSERT(1);
34+
35+
private int type;
36+
37+
EUpdateMode(int type) {
38+
this.type = type;
39+
}
40+
41+
public int getType() {
42+
return this.type;
43+
}
44+
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.slf4j.Logger;
6161
import org.slf4j.LoggerFactory;
6262
import java.sql.Timestamp;
63+
import java.util.Arrays;
6364
import java.util.Collection;
6465
import java.util.LinkedList;
6566
import java.util.List;
@@ -626,7 +627,7 @@ private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias,
626627

627628
private List<String> convertPrimaryAlias(SideTableInfo sideTableInfo) {
628629
List<String> res = Lists.newArrayList();
629-
sideTableInfo.getPrimaryKeys().forEach(field -> {
630+
Arrays.stream(sideTableInfo.getPrimaryKeys()).forEach(field -> {
630631
res.add(sideTableInfo.getPhysicalFields().getOrDefault(field, field));
631632
});
632633
return res;

core/src/main/java/com/dtstack/flink/sql/sink/MetricOutputFormat.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,21 @@
2424
import org.apache.flink.metrics.Counter;
2525
import org.apache.flink.metrics.Meter;
2626
import org.apache.flink.metrics.MeterView;
27+
import org.apache.flink.types.Row;
2728

2829
/**
2930
* Created by sishu.yss on 2018/11/28.
3031
*/
31-
public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2>{
32+
public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2<Boolean, Row>> {
3233

33-
protected transient Counter outRecords;
34+
public transient Counter outRecords;
35+
public transient Counter outDirtyRecords;
36+
public transient Meter outRecordsRate;
3437

35-
protected transient Counter outDirtyRecords;
36-
37-
protected transient Meter outRecordsRate;
38-
39-
public void initMetric() {
38+
public void initMetric() {
4039
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
4140
outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
4241
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
43-
}
42+
}
4443

4544
}

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
115115
public void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
116116
String primaryFields = matcher.group(1).trim();
117117
String[] splitArry = primaryFields.split(",");
118-
List<String> primaryKes = Lists.newArrayList(splitArry);
119-
tableInfo.setPrimaryKeys(primaryKes);
118+
tableInfo.setPrimaryKeys(splitArry);
120119
}
121120

122121
/**

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public abstract class TableInfo implements Serializable {
5959

6060
private final List<FieldExtraInfo> fieldExtraInfoList = Lists.newArrayList();
6161

62-
private List<String> primaryKeys;
62+
private String[] primaryKeys;
6363

6464
private Integer parallelism = -1;
6565

@@ -85,11 +85,11 @@ public Class<?>[] getFieldClasses() {
8585
return fieldClasses;
8686
}
8787

88-
public List<String> getPrimaryKeys() {
88+
public String[] getPrimaryKeys() {
8989
return primaryKeys;
9090
}
9191

92-
public void setPrimaryKeys(List<String> primaryKeys) {
92+
public void setPrimaryKeys(String[] primaryKeys) {
9393
this.primaryKeys = primaryKeys;
9494
}
9595

db2/db2-sink/src/main/java/com/dtstack/flink/sql/sink/db/DbSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.dtstack.flink.sql.sink.db;
22

33
import com.dtstack.flink.sql.sink.rdb.RdbSink;
4-
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
54

65
import java.util.List;
76
import java.util.Map;

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaOutputFormat.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package com.dtstack.flink.sql.sink.impala;
2121

2222
import com.dtstack.flink.sql.sink.impala.table.ImpalaTableInfo;
23-
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2423
import com.dtstack.flink.sql.util.JDBCUtils;
2524
import org.apache.hadoop.conf.Configuration;
2625
import org.apache.hadoop.security.UserGroupInformation;

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2222
import com.dtstack.flink.sql.sink.impala.table.ImpalaTableInfo;
2323
import com.dtstack.flink.sql.sink.rdb.RdbSink;
24-
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2524
import com.dtstack.flink.sql.table.TargetTableInfo;
2625
import org.apache.commons.lang.StringUtils;
2726
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.mysql;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
23+
import java.util.Arrays;
24+
import java.util.Optional;
25+
import java.util.stream.Collectors;
26+
27+
/**
28+
* Date: 2019/12/31
29+
* Company: www.dtstack.com
30+
* @author maqi
31+
*/
32+
public class MySQLDialect implements JDBCDialect {
33+
private static final long serialVersionUID = 1L;
34+
35+
@Override
36+
public boolean canHandle(String url) {
37+
return url.startsWith("jdbc:mysql:");
38+
}
39+
40+
@Override
41+
public Optional<String> defaultDriverName() {
42+
return Optional.of("com.mysql.jdbc.Driver");
43+
}
44+
45+
@Override
46+
public String quoteIdentifier(String identifier) {
47+
return "`" + identifier + "`";
48+
}
49+
50+
/**
51+
* 根据ALLReplace参数,选择使用replace语句还是ON DUPLICATE KEY UPDATE 语句
52+
* @param tableName
53+
* @param fieldNames
54+
* @param uniqueKeyFields
55+
* @param allReplace
56+
* @return
57+
*/
58+
@Override
59+
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
60+
return allReplace ? buildReplaceIntoStatement(tableName, fieldNames) : buildDuplicateUpsertStatement(tableName, fieldNames);
61+
}
62+
63+
public Optional<String> buildDuplicateUpsertStatement(String tableName, String[] fieldNames) {
64+
String updateClause = Arrays.stream(fieldNames).map(f -> quoteIdentifier(f) + "=IFNULL(VALUES(" + quoteIdentifier(f) + ")," + quoteIdentifier(f) + ")")
65+
.collect(Collectors.joining(", "));
66+
return Optional.of(getInsertIntoStatement(tableName, fieldNames) +
67+
" ON DUPLICATE KEY UPDATE " + updateClause
68+
);
69+
}
70+
71+
public Optional<String> buildReplaceIntoStatement(String tableName, String[] fieldNames) {
72+
String columns = Arrays.stream(fieldNames)
73+
.map(this::quoteIdentifier)
74+
.collect(Collectors.joining(", "));
75+
String placeholders = Arrays.stream(fieldNames)
76+
.map(f -> "?")
77+
.collect(Collectors.joining(", "));
78+
return Optional.of("REPLACE INTO " + quoteIdentifier(tableName) +
79+
"(" + columns + ")" + " VALUES (" + placeholders + ")");
80+
}
81+
}

0 commit comments

Comments
 (0)