Skip to content

Commit 977a20b

Browse files
committed
sqlserver sink
1 parent d66acef commit 977a20b

File tree

18 files changed

+325
-269
lines changed

18 files changed

+325
-269
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.clickhouse;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
23+
import java.util.Optional;
24+
25+
/**
26+
* Date: 2020/1/15
27+
* Company: www.dtstack.com
28+
* @author maqi
29+
*/
30+
public class ClickhouseDialect implements JDBCDialect {
31+
32+
@Override
33+
public boolean canHandle(String url) {
34+
return url.startsWith("jdbc:clickhouse:");
35+
}
36+
37+
@Override
38+
public Optional<String> defaultDriverName() {
39+
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
40+
}
41+
42+
@Override
43+
public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
44+
throw new RuntimeException("Clickhouse does not support update sql, please remove primary key or use append mode");
45+
}
46+
}

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121

2222

2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
24+
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
2425
import com.dtstack.flink.sql.sink.rdb.RdbSink;
26+
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
2527

2628
import java.util.List;
2729
import java.util.Map;
@@ -32,46 +34,25 @@ public class ClickhouseSink extends RdbSink implements IStreamSinkGener<RdbSink>
3234
private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
3335

3436
public ClickhouseSink() {
37+
super(new ClickhouseDialect());
3538
}
3639

3740
@Override
38-
public RetractJDBCOutputFormat getOutputFormat() {
39-
return new RetractJDBCOutputFormat();
40-
}
41-
42-
@Override
43-
public void buildSql(String scheam, String tableName, List<String> fields) {
44-
buildInsertSql(tableName, fields);
45-
}
46-
47-
@Override
48-
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
49-
return null;
50-
}
51-
52-
private void buildInsertSql(String tableName, List<String> fields) {
53-
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
54-
String fieldsStr = "";
55-
String placeholder = "";
56-
57-
for (String fieldName : fields) {
58-
fieldsStr += ",`" + fieldName + "`";
59-
placeholder += ",?";
60-
}
61-
62-
fieldsStr = fieldsStr.replaceFirst(",", "");
63-
placeholder = placeholder.replaceFirst(",", "");
64-
65-
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
66-
this.sql = sqlTmp;
67-
System.out.println("---insert sql----");
68-
System.out.println(sql);
69-
}
70-
71-
72-
@Override
73-
public String getDriverName() {
74-
return CLICKHOUSE_DRIVER;
41+
public JDBCUpsertOutputFormat getOutputFormat() {
42+
JDBCOptions jdbcOptions = JDBCOptions.builder()
43+
.setDBUrl(dbURL).setDialect(jdbcDialect)
44+
.setUsername(userName).setPassword(password)
45+
.setTableName(tableName).build();
46+
47+
return JDBCUpsertOutputFormat.builder()
48+
.setOptions(jdbcOptions)
49+
.setFieldNames(fieldNames)
50+
.setFlushMaxSize(batchNum)
51+
.setFlushIntervalMills(batchWaitInterval)
52+
.setFieldTypes(sqlTypes)
53+
.setKeyFields(primaryKeys)
54+
.setAllReplace(allReplace)
55+
.setUpdateMode(updateMode).build();
7556
}
7657

7758

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias,
627627

628628
private List<String> convertPrimaryAlias(SideTableInfo sideTableInfo) {
629629
List<String> res = Lists.newArrayList();
630-
Arrays.stream(sideTableInfo.getPrimaryKeys()).forEach(field -> {
630+
sideTableInfo.getPrimaryKeys().forEach(field -> {
631631
res.add(sideTableInfo.getPhysicalFields().getOrDefault(field, field));
632632
});
633633
return res;

core/src/main/java/com/dtstack/flink/sql/sink/MetricOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
/**
3030
* Created by sishu.yss on 2018/11/28.
3131
*/
32-
public abstract class MetricOutputFormat extends RichOutputFormat<Tuple2<Boolean, Row>> {
32+
public abstract class MetricOutputFormat<T> extends RichOutputFormat<T> {
3333

3434
public transient Counter outRecords;
3535
public transient Counter outDirtyRecords;

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
115115
public void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
116116
String primaryFields = matcher.group(1).trim();
117117
String[] splitArry = primaryFields.split(",");
118-
tableInfo.setPrimaryKeys(splitArry);
118+
List<String> primaryKes = Lists.newArrayList(splitArry);
119+
tableInfo.setPrimaryKeys(primaryKes);
119120
}
120121

121122
/**

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public abstract class TableInfo implements Serializable {
5959

6060
private final List<FieldExtraInfo> fieldExtraInfoList = Lists.newArrayList();
6161

62-
private String[] primaryKeys;
62+
private List<String> primaryKeys;
6363

6464
private Integer parallelism = -1;
6565

@@ -85,11 +85,11 @@ public Class<?>[] getFieldClasses() {
8585
return fieldClasses;
8686
}
8787

88-
public String[] getPrimaryKeys() {
88+
public List<String> getPrimaryKeys() {
8989
return primaryKeys;
9090
}
9191

92-
public void setPrimaryKeys(String[] primaryKeys) {
92+
public void setPrimaryKeys(List<String> primaryKeys) {
9393
this.primaryKeys = primaryKeys;
9494
}
9595

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.polardb;
20+
21+
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
22+
23+
import java.util.Arrays;
24+
import java.util.Optional;
25+
import java.util.stream.Collectors;
26+
27+
/**
28+
* Date: 2020/1/15
29+
* Company: www.dtstack.com
30+
* @author maqi
31+
*/
32+
public class PolardbDialect implements JDBCDialect {
33+
34+
@Override
35+
public boolean canHandle(String url) {
36+
return url.startsWith("jdbc:mysql:");
37+
}
38+
39+
@Override
40+
public Optional<String> defaultDriverName() {
41+
return Optional.of("com.mysql.cj.jdbc.Driver");
42+
}
43+
44+
@Override
45+
public String quoteIdentifier(String identifier) {
46+
return "`" + identifier + "`";
47+
}
48+
49+
/**
50+
* 根据ALLReplace参数,选择使用replace语句还是ON DUPLICATE KEY UPDATE 语句
51+
* @param tableName
52+
* @param fieldNames
53+
* @param uniqueKeyFields
54+
* @param allReplace
55+
* @return
56+
*/
57+
@Override
58+
public Optional<String> getUpsertStatement(String schema, String tableName, String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
59+
return allReplace ? buildReplaceIntoStatement(tableName, fieldNames) : buildDuplicateUpsertStatement(tableName, fieldNames);
60+
}
61+
62+
public Optional<String> buildDuplicateUpsertStatement(String tableName, String[] fieldNames) {
63+
String updateClause = Arrays.stream(fieldNames).map(f -> quoteIdentifier(f) + "=IFNULL(VALUES(" + quoteIdentifier(f) + ")," + quoteIdentifier(f) + ")")
64+
.collect(Collectors.joining(", "));
65+
return Optional.of(getInsertIntoStatement("", tableName, fieldNames) +
66+
" ON DUPLICATE KEY UPDATE " + updateClause
67+
);
68+
}
69+
70+
public Optional<String> buildReplaceIntoStatement(String tableName, String[] fieldNames) {
71+
String columns = Arrays.stream(fieldNames)
72+
.map(this::quoteIdentifier)
73+
.collect(Collectors.joining(", "));
74+
String placeholders = Arrays.stream(fieldNames)
75+
.map(f -> "?")
76+
.collect(Collectors.joining(", "));
77+
return Optional.of("REPLACE INTO " + quoteIdentifier(tableName) +
78+
"(" + columns + ")" + " VALUES (" + placeholders + ")");
79+
}
80+
}
Lines changed: 19 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,33 @@
11
package com.dtstack.flink.sql.sink.polardb;
22

3-
import com.dtstack.flink.sql.sink.IStreamSinkGener;
3+
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
44
import com.dtstack.flink.sql.sink.rdb.RdbSink;
5+
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
56

6-
import java.util.List;
7-
import java.util.Map;
87

9-
public class PolardbSink extends RdbSink implements IStreamSinkGener<RdbSink> {
8+
public class PolardbSink extends RdbSink {
109

1110
private static final String POLARDB_DRIVER = "com.mysql.cj.jdbc.Driver";
1211

1312
public PolardbSink() {
13+
super(new PolardbDialect());
1414
}
1515

1616
@Override
17-
public RetractJDBCOutputFormat getOutputFormat() {
18-
return new RetractJDBCOutputFormat();
19-
}
20-
21-
@Override
22-
public void buildSql(String scheam, String tableName, List<String> fields) {
23-
buildInsertSql(tableName, fields);
24-
}
25-
26-
@Override
27-
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
28-
return null;
29-
}
30-
31-
private void buildInsertSql(String tableName, List<String> fields) {
32-
String sqlTmp = "replace into " + tableName + " (${fields}) values (${placeholder})";
33-
String fieldsStr = "";
34-
String placeholder = "";
35-
36-
for (String fieldName : fields) {
37-
fieldsStr += ",`" + fieldName + "`";
38-
placeholder += ",?";
39-
}
40-
41-
fieldsStr = fieldsStr.replaceFirst(",", "");
42-
placeholder = placeholder.replaceFirst(",", "");
43-
44-
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
45-
this.sql = sqlTmp;
46-
}
47-
48-
49-
@Override
50-
public String getDriverName() {
51-
return POLARDB_DRIVER;
17+
public JDBCUpsertOutputFormat getOutputFormat() {
18+
JDBCOptions jdbcOptions = JDBCOptions.builder()
19+
.setDBUrl(dbURL).setDialect(jdbcDialect)
20+
.setUsername(userName).setPassword(password)
21+
.setTableName(tableName).build();
22+
23+
return JDBCUpsertOutputFormat.builder()
24+
.setOptions(jdbcOptions)
25+
.setFieldNames(fieldNames)
26+
.setFlushMaxSize(batchNum)
27+
.setFlushIntervalMills(batchWaitInterval)
28+
.setFieldTypes(sqlTypes)
29+
.setKeyFields(primaryKeys)
30+
.setAllReplace(allReplace)
31+
.setUpdateMode(updateMode).build();
5232
}
5333
}

polardb/polardb-sink/src/main/java/com/dtstack/flink/sql/sink/polardb/table/PolardbSinkParser.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ public class PolardbSinkParser extends RdbSinkParser {
1010

1111
@Override
1212
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
13-
TableInfo mysqlTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
14-
mysqlTableInfo.setType(CURR_TYPE);
15-
return mysqlTableInfo;
13+
TableInfo polardbTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
14+
polardbTableInfo.setType(CURR_TYPE);
15+
return polardbTableInfo;
1616
}
1717
}

pom.xml

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,29 @@
1010
<url>http://maven.apache.org</url>
1111
<modules>
1212
<module>core</module>
13-
<!--<module>kafka09</module>-->
14-
<!--<module>kafka10</module>-->
15-
<!--<module>kafka11</module>-->
16-
<!--<module>kafka</module>-->
17-
<!--<module>mysql</module>-->
18-
<!--<module>hbase</module>-->
19-
<!--<module>elasticsearch5</module>-->
20-
<!--<module>mongo</module>-->
21-
<!--<module>redis5</module>-->
22-
<!--<module>launcher</module>-->
13+
<module>kafka09</module>
14+
<module>kafka10</module>
15+
<module>kafka11</module>
16+
<module>kafka</module>
17+
<module>mysql</module>
18+
<module>hbase</module>
19+
<module>elasticsearch5</module>
20+
<module>mongo</module>
21+
<module>redis5</module>
22+
<module>launcher</module>
2323
<module>rdb</module>
24-
<!--<module>sqlserver</module>-->
24+
<module>sqlserver</module>
2525
<module>oracle</module>
26-
<!--<module>cassandra</module>-->
27-
<!--<module>kudu</module>-->
26+
<module>cassandra</module>
27+
<module>kudu</module>
2828
<module>postgresql</module>
29-
<!--<module>kafka08</module>-->
30-
<!--<module>serversocket</module>-->
31-
<!--<module>console</module>-->
32-
<!--<module>clickhouse</module>-->
33-
<!--<module>impala</module>-->
34-
<!--<module>db2</module>-->
35-
<!--<module>polardb</module>-->
29+
<module>kafka08</module>
30+
<module>serversocket</module>
31+
<module>console</module>
32+
<module>clickhouse</module>
33+
<module>impala</module>
34+
<module>db2</module>
35+
<module>polardb</module>
3636

3737
</modules>
3838

0 commit comments

Comments
 (0)