Skip to content

Commit 40460cb

Browse files
committed
rdb scan
1 parent 86d05ee commit 40460cb

File tree

41 files changed

+296
-706
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+296
-706
lines changed

clickhouse/clickhouse-side/clickhouse-all-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAllReqRow.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,17 @@
2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
24-
import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
25-
import com.dtstack.flink.sql.util.DtStringUtil;
24+
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
2625
import com.dtstack.flink.sql.util.JDBCUtils;
2726
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28-
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2927
import org.slf4j.Logger;
3028
import org.slf4j.LoggerFactory;
3129

3230
import java.sql.Connection;
3331
import java.sql.DriverManager;
3432
import java.util.List;
35-
import java.util.Map;
3633

37-
public class ClickhouseAllReqRow extends RdbAllReqRow {
34+
public class ClickhouseAllReqRow extends AbstractRdbAllReqRow {
3835

3936
private static final Logger LOG = LoggerFactory.getLogger(ClickhouseAllReqRow.class);
4037

clickhouse/clickhouse-side/clickhouse-async-side/src/main/java/com/dtstack/flink/sql/side/clickhouse/ClickhouseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void open(Configuration parameters) throws Exception {
5858
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
5959
vo.setFileResolverCachingEnabled(false);
6060
Vertx vertx = Vertx.vertx(vo);
61-
setRdbSQLClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
61+
setRdbSqlClient(JDBCClient.createNonShared(vertx, clickhouseClientConfig));
6262
}
6363

6464
}

clickhouse/clickhouse-sink/src/main/java/com/dtstack/flink/sql/sink/clickhouse/ClickhouseSink.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,19 @@
2222

2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2424
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
25-
import com.dtstack.flink.sql.sink.rdb.RdbSink;
25+
import com.dtstack.flink.sql.sink.rdb.AbstractRdbSink;
2626
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
2727

28-
import java.util.List;
29-
import java.util.Map;
3028

31-
32-
public class ClickhouseSink extends RdbSink implements IStreamSinkGener<RdbSink> {
29+
public class ClickhouseSink extends AbstractRdbSink implements IStreamSinkGener<AbstractRdbSink> {
3330
public ClickhouseSink() {
3431
super(new ClickhouseDialect());
3532
}
3633

3734
@Override
3835
public JDBCUpsertOutputFormat getOutputFormat() {
3936
JDBCOptions jdbcOptions = JDBCOptions.builder()
40-
.setDBUrl(dbURL)
37+
.setDbUrl(dbUrl)
4138
.setDialect(jdbcDialect)
4239
.setUsername(userName)
4340
.setPassword(password)

db2/db2-side/db2-all-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AllReqRow.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,14 @@
2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
24-
import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
25-
import com.dtstack.flink.sql.util.DtStringUtil;
26-
import com.google.common.collect.Maps;
24+
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
2725
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2826
import org.slf4j.Logger;
2927
import org.slf4j.LoggerFactory;
3028

3129
import java.sql.Connection;
3230
import java.sql.DriverManager;
3331
import java.util.List;
34-
import java.util.Map;
3532

3633
/**
3734
* Date: 2019/11/20
@@ -40,7 +37,7 @@
4037
* @author xiuzhu
4138
*/
4239

43-
public class Db2AllReqRow extends RdbAllReqRow {
40+
public class Db2AllReqRow extends AbstractRdbAllReqRow {
4441

4542
private static final Logger LOG = LoggerFactory.getLogger(Db2AllReqRow.class);
4643

db2/db2-side/db2-async-side/src/main/java/com/dtstack/flink/sql/side/db2/Db2AsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void open(Configuration parameters) throws Exception {
7474
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
7575
vo.setFileResolverCachingEnabled(false);
7676
Vertx vertx = Vertx.vertx(vo);
77-
setRdbSQLClient(JDBCClient.createNonShared(vertx, db2lientConfig));
77+
setRdbSqlClient(JDBCClient.createNonShared(vertx, db2lientConfig));
7878
}
7979

8080
}

db2/db2-sink/src/main/java/com/dtstack/flink/sql/sink/db/DbSink.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
package com.dtstack.flink.sql.sink.db;
22

33
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
4-
import com.dtstack.flink.sql.sink.rdb.RdbSink;
4+
import com.dtstack.flink.sql.sink.rdb.AbstractRdbSink;
55
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
66

7-
import java.util.List;
8-
import java.util.Map;
9-
10-
public class DbSink extends RdbSink {
7+
public class DbSink extends AbstractRdbSink {
118

129
public DbSink() {
1310
super(new DbDialect());
1411
}
1512
@Override
1613
public JDBCUpsertOutputFormat getOutputFormat() {
1714
JDBCOptions jdbcOptions = JDBCOptions.builder()
18-
.setDBUrl(dbURL)
15+
.setDbUrl(dbUrl)
1916
.setDialect(jdbcDialect)
2017
.setUsername(userName)
2118
.setPassword(password)

impala/impala-side/impala-all-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAllReqRow.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
2424
import com.dtstack.flink.sql.side.impala.table.ImpalaSideTableInfo;
25-
import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
26-
import com.dtstack.flink.sql.util.DtStringUtil;
25+
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
2726
import com.dtstack.flink.sql.util.JDBCUtils;
28-
import com.google.common.collect.Maps;
2927
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3028
import org.apache.hadoop.conf.Configuration;
3129
import org.apache.hadoop.security.UserGroupInformation;
@@ -36,7 +34,6 @@
3634
import java.sql.Connection;
3735
import java.sql.DriverManager;
3836
import java.util.List;
39-
import java.util.Map;
4037

4138
/**
4239
* side operator with cache for all(period reload)
@@ -46,7 +43,7 @@
4643
* @author xiuzhu
4744
*/
4845

49-
public class ImpalaAllReqRow extends RdbAllReqRow {
46+
public class ImpalaAllReqRow extends AbstractRdbAllReqRow {
5047

5148
private static final long serialVersionUID = 2098635140857937717L;
5249

impala/impala-side/impala-async-side/src/main/java/com/dtstack/flink/sql/side/impala/ImpalaAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void open(Configuration parameters) throws Exception {
6666
vo.setWorkerPoolSize(DEFAULT_VERTX_WORKER_POOL_SIZE);
6767
vo.setFileResolverCachingEnabled(false);
6868
Vertx vertx = Vertx.vertx(vo);
69-
setRdbSQLClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
69+
setRdbSqlClient(JDBCClient.createNonShared(vertx, impalaClientConfig));
7070
}
7171

7272
public JsonObject getClientConfig() {

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2222
import com.dtstack.flink.sql.sink.impala.table.ImpalaTableInfo;
2323
import com.dtstack.flink.sql.sink.rdb.JDBCOptions;
24-
import com.dtstack.flink.sql.sink.rdb.RdbSink;
24+
import com.dtstack.flink.sql.sink.rdb.AbstractRdbSink;
2525
import com.dtstack.flink.sql.sink.rdb.format.JDBCUpsertOutputFormat;
2626
import com.dtstack.flink.sql.table.TargetTableInfo;
2727
import org.apache.hadoop.conf.Configuration;
@@ -36,7 +36,7 @@
3636
* @author xiuzhu
3737
*/
3838

39-
public class ImpalaSink extends RdbSink implements IStreamSinkGener<RdbSink> {
39+
public class ImpalaSink extends AbstractRdbSink implements IStreamSinkGener<AbstractRdbSink> {
4040

4141
private ImpalaTableInfo impalaTableInfo;
4242

@@ -47,7 +47,7 @@ public ImpalaSink() {
4747
@Override
4848
public JDBCUpsertOutputFormat getOutputFormat() {
4949
JDBCOptions jdbcOptions = JDBCOptions.builder()
50-
.setDBUrl(getImpalaJdbcUrl())
50+
.setDbUrl(getImpalaJdbcUrl())
5151
.setDialect(jdbcDialect)
5252
.setUsername(userName)
5353
.setPassword(password)
@@ -70,8 +70,8 @@ public JDBCUpsertOutputFormat getOutputFormat() {
7070

7171
public String getImpalaJdbcUrl() {
7272
Integer authMech = impalaTableInfo.getAuthMech();
73-
String newUrl = dbURL;
74-
StringBuffer urlBuffer = new StringBuffer(dbURL);
73+
String newUrl = dbUrl;
74+
StringBuffer urlBuffer = new StringBuffer(dbUrl);
7575
if (authMech == EAuthMech.NoAuthentication.getType()) {
7676
return newUrl;
7777
} else if (authMech == EAuthMech.Kerberos.getType()) {
@@ -121,7 +121,7 @@ public String getImpalaJdbcUrl() {
121121
}
122122

123123
@Override
124-
public RdbSink genStreamSink(TargetTableInfo targetTableInfo) {
124+
public AbstractRdbSink genStreamSink(TargetTableInfo targetTableInfo) {
125125
super.genStreamSink(targetTableInfo);
126126
this.impalaTableInfo = (ImpalaTableInfo) targetTableInfo;
127127
return this;

mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
2323
import com.dtstack.flink.sql.side.SideTableInfo;
24-
import com.dtstack.flink.sql.side.rdb.all.RdbAllReqRow;
24+
import com.dtstack.flink.sql.side.rdb.all.AbstractRdbAllReqRow;
2525
import com.dtstack.flink.sql.util.DtStringUtil;
2626
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2727
import com.google.common.collect.Maps;
@@ -41,7 +41,7 @@
4141
* @author xuchao
4242
*/
4343

44-
public class MysqlAllReqRow extends RdbAllReqRow {
44+
public class MysqlAllReqRow extends AbstractRdbAllReqRow {
4545

4646
private static final long serialVersionUID = 2098635140857937717L;
4747

@@ -54,13 +54,13 @@ public MysqlAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
5454
}
5555

5656
@Override
57-
public Connection getConn(String dbURL, String userName, String password) {
57+
public Connection getConn(String dbUrl, String userName, String password) {
5858
try {
5959
Class.forName(MYSQL_DRIVER);
6060
//add param useCursorFetch=true
6161
Map<String, String> addParams = Maps.newHashMap();
6262
addParams.put("useCursorFetch", "true");
63-
String targetDbUrl = DtStringUtil.addJdbcParam(dbURL, addParams, true);
63+
String targetDbUrl = DtStringUtil.addJdbcParam(dbUrl, addParams, true);
6464
return DriverManager.getConnection(targetDbUrl, userName, password);
6565
} catch (Exception e) {
6666
LOG.error("", e);

0 commit comments

Comments
 (0)