Skip to content

Commit 4f2e9a9

Browse files
committed
clickhouse sink and field parse
1 parent 22a59be commit 4f2e9a9

File tree

11 files changed

+236
-36
lines changed

11 files changed

+236
-36
lines changed

clickhouse/clickhouse-side/clickhouse-all-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAllReqRow.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
2525
import com.dtstack.flink.sql.util.DtStringUtil;
26+
import com.dtstack.flink.sql.util.JDBCUtils;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2728
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2829
import org.slf4j.Logger;
@@ -33,7 +34,6 @@
3334
import java.util.List;
3435
import java.util.Map;
3536

36-
3737
public class ClickhouseAllReqRow extends RdbAllReqRow {
3838

3939
private static final Logger LOG = LoggerFactory.getLogger(ClickhouseAllReqRow.class);
@@ -45,22 +45,21 @@ public ClickhouseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Fiel
4545
}
4646

4747
@Override
48-
public Connection getConn(String dbURL, String userName, String password) {
48+
public Connection getConn(String dbURL, String userName, String passWord) {
4949
try {
50-
Class.forName(CLICKHOUSE_DRIVER);
51-
//add param useCursorFetch=true
52-
Map<String, String> addParams = Maps.newHashMap();
53-
addParams.put("useCursorFetch", "true");
54-
String targetDbUrl = DtStringUtil.addJdbcParam(dbURL, addParams, true);
55-
return DriverManager.getConnection(targetDbUrl, userName, password);
50+
Connection connection ;
51+
JDBCUtils.forName(CLICKHOUSE_DRIVER, getClass().getClassLoader());
52+
// ClickHouseProperties contains all properties
53+
if (userName == null) {
54+
connection = DriverManager.getConnection(dbURL);
55+
} else {
56+
connection = DriverManager.getConnection(dbURL, userName, passWord);
57+
}
58+
return connection;
5659
} catch (Exception e) {
5760
LOG.error("", e);
5861
throw new RuntimeException("", e);
5962
}
6063
}
6164

62-
@Override
63-
public int getFetchSize() {
64-
return Integer.MIN_VALUE;
65-
}
6665
}

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ public ClickhouseAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Fi
4141
super(new ClickhouseAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
4242
}
4343

44-
4544
@Override
4645
public void open(Configuration parameters) throws Exception {
4746
super.open(parameters);
@@ -52,13 +51,8 @@ public void open(Configuration parameters) throws Exception {
5251
.put("max_pool_size", DEFAULT_MAX_DB_CONN_POOL_SIZE)
5352
.put("user", rdbSideTableInfo.getUserName())
5453
.put("password", rdbSideTableInfo.getPassword())
55-
.put("provider_class", DT_PROVIDER_CLASS)
56-
.put("preferred_test_query", PREFERRED_TEST_QUERY_SQL)
57-
.put("idle_connection_test_period", DEFAULT_IDLE_CONNECTION_TEST_PEROID)
58-
.put("test_connection_on_checkin", DEFAULT_TEST_CONNECTION_ON_CHECKIN);
59-
54+
.put("provider_class", DT_PROVIDER_CLASS);
6055
System.setProperty("vertx.disableFileCPResolving", "true");
61-
6256
VertxOptions vo = new VertxOptions();
6357
vo.setEventLoopPoolSize(DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE);
6458
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);

clickhouse/clickhouse-side/clickhouse-side-core/src/main/java/com/dtstack/flink/sql/side/clickhouse/table/ClickhouseSideParser.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.dtstack.flink.sql.side.rdb.table.RdbSideParser;
2323
import com.dtstack.flink.sql.table.TableInfo;
24+
import ru.yandex.clickhouse.domain.ClickHouseDataType;
2425

2526
import java.util.Map;
2627

@@ -42,4 +43,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4243
clickhouseTableInfo.setType(CURR_TYPE);
4344
return clickhouseTableInfo;
4445
}
46+
47+
@Override
48+
public Class dbTypeConvertToJavaType(String fieldType) {
49+
return ClickHouseDataType.fromTypeString(fieldType).getJavaClass();
50+
}
51+
4552
}

clickhouse/clickhouse-sink/pom.xml

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,81 @@
1010
<modelVersion>4.0.0</modelVersion>
1111

1212
<artifactId>sql.sink.clickhouse</artifactId>
13+
<name>clickhouse-sink</name>
14+
<url>http://maven.apache.org</url>
1315

16+
<properties>
17+
<sql.sink.rdb.version>1.0-SNAPSHOT</sql.sink.rdb.version>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>com.dtstack.flink</groupId>
23+
<artifactId>sql.sink.rdb</artifactId>
24+
<version>${sql.sink.rdb.version}</version>
25+
</dependency>
26+
</dependencies>
27+
28+
<build>
29+
<plugins>
30+
<plugin>
31+
<groupId>org.apache.maven.plugins</groupId>
32+
<artifactId>maven-shade-plugin</artifactId>
33+
<version>1.4</version>
34+
<executions>
35+
<execution>
36+
<phase>package</phase>
37+
<goals>
38+
<goal>shade</goal>
39+
</goals>
40+
<configuration>
41+
<artifactSet>
42+
<excludes>
43+
44+
</excludes>
45+
</artifactSet>
46+
<filters>
47+
<filter>
48+
<artifact>*:*</artifact>
49+
<excludes>
50+
<exclude>META-INF/*.SF</exclude>
51+
<exclude>META-INF/*.DSA</exclude>
52+
<exclude>META-INF/*.RSA</exclude>
53+
</excludes>
54+
</filter>
55+
</filters>
56+
</configuration>
57+
</execution>
58+
</executions>
59+
</plugin>
60+
61+
<plugin>
62+
<artifactId>maven-antrun-plugin</artifactId>
63+
<version>1.2</version>
64+
<executions>
65+
<execution>
66+
<id>copy-resources</id>
67+
<!-- here the phase you need -->
68+
<phase>package</phase>
69+
<goals>
70+
<goal>run</goal>
71+
</goals>
72+
<configuration>
73+
<tasks>
74+
<copy todir="${basedir}/../../plugins/clickhousesink">
75+
<fileset dir="target/">
76+
<include name="${project.artifactId}-${project.version}.jar"/>
77+
</fileset>
78+
</copy>
79+
80+
<move file="${basedir}/../../plugins/clickhousesink/${project.artifactId}-${project.version}.jar"
81+
tofile="${basedir}/../../plugins/clickhousesink/${project.name}-${git.branch}.jar"/>
82+
</tasks>
83+
</configuration>
84+
</execution>
85+
</executions>
86+
</plugin>
87+
</plugins>
88+
</build>
1489

1590
</project>
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.sink.clickhouse;
21+
22+
23+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
24+
import com.dtstack.flink.sql.sink.rdb.RdbSink;
25+
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
26+
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
31+
public class ClickhouseSink extends RdbSink implements IStreamSinkGener<RdbSink> {
32+
33+
private static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
34+
35+
public ClickhouseSink() {
36+
}
37+
38+
@Override
39+
public RetractJDBCOutputFormat getOutputFormat() {
40+
return new RetractJDBCOutputFormat();
41+
}
42+
43+
@Override
44+
public void buildSql(String scheam, String tableName, List<String> fields) {
45+
buildInsertSql(tableName, fields);
46+
}
47+
48+
@Override
49+
public String buildUpdateSql(String schema, String tableName, List<String> fieldNames, Map<String, List<String>> realIndexes, List<String> fullField) {
50+
return null;
51+
}
52+
53+
private void buildInsertSql(String tableName, List<String> fields) {
54+
String sqlTmp = "insert into " + tableName + " (${fields}) values (${placeholder})";
55+
String fieldsStr = "";
56+
String placeholder = "";
57+
58+
for (String fieldName : fields) {
59+
fieldsStr += ",`" + fieldName + "`";
60+
placeholder += ",?";
61+
}
62+
63+
fieldsStr = fieldsStr.replaceFirst(",", "");
64+
placeholder = placeholder.replaceFirst(",", "");
65+
66+
sqlTmp = sqlTmp.replace("${fields}", fieldsStr).replace("${placeholder}", placeholder);
67+
this.sql = sqlTmp;
68+
System.out.println("---insert sql----");
69+
System.out.println(sql);
70+
}
71+
72+
73+
@Override
74+
public String getDriverName() {
75+
return CLICKHOUSE_DRIVER;
76+
}
77+
78+
79+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.sink.clickhouse.table;
21+
22+
import com.dtstack.flink.sql.sink.rdb.table.RdbSinkParser;
23+
import com.dtstack.flink.sql.table.TableInfo;
24+
import ru.yandex.clickhouse.domain.ClickHouseDataType;
25+
26+
import java.util.Map;
27+
28+
29+
public class ClickhouseSinkParser extends RdbSinkParser {
30+
private static final String CURR_TYPE = "clickhouse";
31+
32+
@Override
33+
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
34+
TableInfo clickhouseTableInfo = super.getTableInfo(tableName, fieldsInfo, props);
35+
clickhouseTableInfo.setType(CURR_TYPE);
36+
return clickhouseTableInfo;
37+
}
38+
39+
@Override
40+
public Class dbTypeConvertToJavaType(String fieldType) {
41+
return ClickHouseDataType.fromTypeString(fieldType).getJavaClass();
42+
}
43+
44+
}

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
9999
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
100100
String fieldName = String.join(" ", filedNameArr);
101101
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
102-
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
102+
Class fieldClass = dbTypeConvertToJavaType(fieldType);
103103

104104
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
105105
tableInfo.addField(fieldName);
@@ -117,4 +117,9 @@ public static void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
117117
List<String> primaryKes = Lists.newArrayList(splitArry);
118118
tableInfo.setPrimaryKeys(primaryKes);
119119
}
120+
121+
public Class dbTypeConvertToJavaType(String fieldType) {
122+
return ClassUtil.stringConvertClass(fieldType);
123+
}
124+
120125
}

pom.xml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,22 @@
1010
<url>http://maven.apache.org</url>
1111
<modules>
1212
<module>core</module>
13-
<!--<module>kafka09</module>-->
14-
<!--<module>kafka10</module>-->
15-
<!--<module>kafka11</module>-->
16-
<!--<module>mysql</module>-->
17-
<!--<module>hbase</module>-->
18-
<!--<module>elasticsearch5</module>-->
19-
<!--<module>mongo</module>-->
20-
<!--<module>redis5</module>-->
13+
<module>kafka09</module>
14+
<module>kafka10</module>
15+
<module>kafka11</module>
16+
<module>mysql</module>
17+
<module>hbase</module>
18+
<module>elasticsearch5</module>
19+
<module>mongo</module>
20+
<module>redis5</module>
2121
<module>launcher</module>
2222
<module>rdb</module>
23-
<!--<module>sqlserver</module>-->
24-
<!--<module>oracle</module>-->
25-
<!--<module>cassandra</module>-->
23+
<module>sqlserver</module>
24+
<module>oracle</module>
25+
<module>cassandra</module>
2626
<!--<module>kafka08</module>-->
2727
<module>serversocket</module>
28-
<!--<module>console</module>-->
28+
<module>console</module>
2929
<module>clickhouse</module>
3030
</modules>
3131

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,13 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
174174

175175
try {
176176
for (int i = 0; i < CONN_RETRY_NUM; i++) {
177-
178177
try {
179178
connection = getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword());
180179
break;
181180
} catch (Exception e) {
182181
if (i == CONN_RETRY_NUM - 1) {
183182
throw new RuntimeException("", e);
184183
}
185-
186184
try {
187185
String connInfo = "url:" + tableInfo.getUrl() + ";userName:" + tableInfo.getUserName() + ",pwd:" + tableInfo.getPassword();
188186
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
4747
rdbTableInfo.setTableName(MathUtil.getString(props.get(RdbSideTableInfo.TABLE_NAME_KEY.toLowerCase())));
4848
rdbTableInfo.setUserName(MathUtil.getString(props.get(RdbSideTableInfo.USER_NAME_KEY.toLowerCase())));
4949
rdbTableInfo.setPassword(MathUtil.getString(props.get(RdbSideTableInfo.PASSWORD_KEY.toLowerCase())));
50-
5150
rdbTableInfo.setSchema(MathUtil.getString(props.get(RdbSideTableInfo.SCHEMA_KEY.toLowerCase())));
5251

5352
rdbTableInfo.check();

0 commit comments

Comments
 (0)