Skip to content

Commit 6d32bdf

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev' into v1.9.0_dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/Main.java # core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java # core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java # core/src/main/java/com/dtstack/flink/sql/function/FunctionManager.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java
2 parents e9c2dc4 + 03afb78 commit 6d32bdf

File tree

31 files changed

+867
-35
lines changed

31 files changed

+867
-35
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@
4848
* Java: JDK8及以上
4949
* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群)
5050
* 操作系统:理论上不限
51+
* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例:
52+
```
53+
fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250
54+
security.kerberos.login.use-ticket-cache: true
55+
security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/yanxi.keytab
56+
security.kerberos.login.principal: [email protected]
57+
security.kerberos.login.contexts: Client,KafkaClient
58+
zookeeper.sasl.service-name: zookeeper
59+
zookeeper.sasl.login-context-name: Client
60+
61+
```
5162

5263
### 1.3 打包
5364

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
263263
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
264264
Thread.sleep(5 * 1000);
265265
} catch (InterruptedException e1) {
266-
e1.printStackTrace();
266+
LOG.error("", e1);
267267
}
268268
}
269269

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ public class ConfigConstrant {
5353
public static final String SQL_BUFFER_TIMEOUT_MILLIS = "sql.buffer.timeout.millis";
5454

5555
public static final String FLINK_TIME_CHARACTERISTIC_KEY = "time.characteristic";
56+
// default 200ms
57+
public static final String AUTO_WATERMARK_INTERVAL_KEY = "autoWatermarkInterval";
5658

5759
public static final String SQL_TTL_MINTIME = "sql.ttl.min";
58-
5960
public static final String SQL_TTL_MAXTIME = "sql.ttl.max";
6061

6162
public static final String STATE_BACKEND_KEY = "state.backend";

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.util;
2222

@@ -196,7 +196,7 @@ public static URL[] getPluginJarUrls(String pluginDir) throws MalformedURLExcept
196196
}
197197
return urlList.toArray(new URL[urlList.size()]);
198198
}
199-
199+
200200
public static String getCoreJarFileName (String path, String prefix) throws Exception {
201201
String coreJarFileName = null;
202202
File pluginDir = new File(path);

docs/impalaSink.md

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,80 @@ CREATE TABLE tableName(
4949

5050
## 5.样例:
5151
```
52+
CREATE TABLE MyTable(
53+
channel VARCHAR,
54+
pt int,
55+
xctime varchar,
56+
name varchar
57+
)WITH(
58+
type ='kafka11',
59+
bootstrapServers ='172.16.8.107:9092',
60+
zookeeperQuorum ='172.16.8.107:2181/kafka',
61+
offsetReset ='latest',
62+
topic ='mqTest03'
63+
);
64+
5265
CREATE TABLE MyResult(
53-
channel VARCHAR,
54-
pv VARCHAR
66+
a STRING,
67+
b STRING
68+
)WITH(
69+
type ='impala',
70+
url ='jdbc:impala://172.16.101.252:21050/hxbho_pub',
71+
userName ='root',
72+
password ='pwd',
73+
authMech ='3',
74+
tableName ='tb_result_4',
75+
parallelism ='1',
76+
-- 指定分区
77+
partitionFields = 'pt=1001,name="name1001" ',
78+
batchSize = '1000',
79+
parallelism ='2'
80+
);
81+
82+
CREATE TABLE MyResult1(
83+
a STRING,
84+
b STRING,
85+
pt int,
86+
name STRING
5587
)WITH(
5688
type ='impala',
57-
url ='jdbc:impala://localhost:21050/mytest',
58-
userName ='dtstack',
59-
password ='abc123',
60-
authMech = '3',
61-
tableName ='pv2',
62-
parallelism ='1'
63-
)
89+
url ='jdbc:impala://172.16.101.252:21050/hxbho_pub',
90+
userName ='root',
91+
password ='Wscabc123..@',
92+
authMech ='3',
93+
tableName ='tb_result_4',
94+
parallelism ='1',
95+
enablePartition ='true',
96+
-- 动态分区
97+
partitionFields = 'pt,name ',
98+
batchSize = '1000',
99+
parallelism ='2'
100+
);
101+
102+
103+
insert
104+
into
105+
MyResult1
106+
select
107+
xctime AS b,
108+
channel AS a,
109+
pt,
110+
name
111+
from
112+
MyTable;
113+
114+
115+
116+
insert
117+
into
118+
MyResult
119+
select
120+
xctime AS b,
121+
channel AS a
122+
from
123+
MyTable;
124+
125+
126+
127+
64128
```

docs/polardbSide.md

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName cloType,
5+
...
6+
PRIMARY KEY(keyInfo),
7+
PERIOD FOR SYSTEM_TIME
8+
)WITH(
9+
type='polardb',
10+
url='jdbcUrl',
11+
userName='dbUserName',
12+
password='dbPwd',
13+
tableName='tableName',
14+
cache ='LRU',
15+
cacheSize ='10000',
16+
cacheTTLMs ='60000',
17+
parallelism ='1',
18+
partitionedJoin='false'
19+
);
20+
```
21+
22+
# 2.支持版本
23+
mysql-8.0.16
24+
25+
## 3.表结构定义
26+
27+
|参数名称|含义|
28+
|----|---|
29+
| tableName | polardb表名称|
30+
| colName | 列名称|
31+
| colType | 列类型 [colType支持的类型](colType.md)|
32+
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
33+
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
34+
35+
## 4.参数
36+
37+
|参数名称|含义|是否必填|默认值|
38+
|----|---|---|----|
39+
| type | 表明维表的类型 polardb |||
40+
| url | 连接polardb数据库 jdbcUrl |||
41+
| userName | ploardb连接用户名 |||
42+
| password | ploardb连接密码|||
43+
| tableName | ploardb表名称|||
44+
| cache | 维表缓存策略(NONE/LRU)||NONE|
45+
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)||false|
46+
47+
----------
48+
> 缓存策略
49+
* NONE: 不做内存缓存
50+
* LRU:
51+
* cacheSize: 缓存的条目数量
52+
* cacheTTLMs:缓存的过期时间(ms)
53+
* cacheMode: (unordered|ordered)异步加载是有序还是无序,默认有序。
54+
* asyncCapacity:异步请求容量,默认1000
55+
* asyncTimeout:异步请求超时时间,默认10000毫秒
56+
57+
## 5.样例
58+
```
59+
create table sideTable(
60+
channel varchar,
61+
xccount int,
62+
PRIMARY KEY(channel),
63+
PERIOD FOR SYSTEM_TIME
64+
)WITH(
65+
type='polardb',
66+
url='jdbc:mysql://xxx.xxx.xxx:3306/test?charset=utf8',
67+
userName='dtstack',
68+
password='abc123',
69+
tableName='sidetest',
70+
cache ='LRU',
71+
cacheSize ='10000',
72+
cacheTTLMs ='60000',
73+
cacheMode='unordered',
74+
asyncCapacity='1000',
75+
asyncTimeout='10000'
76+
parallelism ='1',
77+
partitionedJoin='false'
78+
);
79+
```

docs/polardbSink.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
## 1.格式:
2+
```
3+
CREATE TABLE tableName(
4+
colName colType,
5+
...
6+
colNameX colType
7+
)WITH(
8+
type ='polardb',
9+
url ='jdbcUrl',
10+
userName ='userName',
11+
password ='pwd',
12+
tableName ='tableName',
13+
parallelism ='parllNum'
14+
);
15+
16+
```
17+
18+
## 2.支持版本
19+
mysql-8.0.16
20+
21+
## 3.表结构定义
22+
23+
|参数名称|含义|
24+
|----|---|
25+
| tableName| polardb表名称|
26+
| colName | 列名称|
27+
| colType | 列类型 [colType支持的类型](colType.md)|
28+
29+
## 4.参数:
30+
31+
|参数名称|含义|是否必填|默认值|
32+
|----|----|----|----|
33+
|type |表名 输出表类型 polardb|||
34+
|url | 连接polardb数据库 jdbcUrl |||
35+
|userName | polardb连接用户名 |||
36+
| password | polardb连接密码|||
37+
| tableName | polardb表名称|||
38+
| parallelism | 并行度设置||1|
39+
40+
## 5.样例:
41+
```
42+
CREATE TABLE MyResult(
43+
channel VARCHAR,
44+
pv VARCHAR
45+
)WITH(
46+
type ='polardb',
47+
url ='jdbc:mysql://xxx.xxx.xxx:3306/test?charset=utf8',
48+
userName ='dtstack',
49+
password ='abc123',
50+
tableName ='pv2',
51+
parallelism ='1'
52+
);
53+
```

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,14 +181,14 @@ private void loadData(Map<String, Map<String, Object>> tmpCache) throws SQLExcep
181181
tmpCache.put(new String(r.getRow()), kv);
182182
}
183183
} catch (IOException e) {
184-
e.printStackTrace();
184+
LOG.error("", e);
185185
} finally {
186186
try {
187187
conn.close();
188188
table.close();
189189
resultScanner.close();
190190
} catch (IOException e) {
191-
e.printStackTrace();
191+
LOG.error("", e);
192192
}
193193
}
194194
}

impala/impala-side/impala-side-core/src/main/java/com/dtstack/flink/sql/side/impala/table/ImpalaSideParser.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flink.sql.table.TableInfo;
2323
import com.dtstack.flink.sql.util.MathUtil;
2424
import com.fasterxml.jackson.databind.ObjectMapper;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2527

2628
import java.math.BigDecimal;
2729
import java.sql.Date;
@@ -41,6 +43,7 @@
4143
*/
4244

4345
public class ImpalaSideParser extends RdbSideParser {
46+
private static final Logger LOG = LoggerFactory.getLogger(ImpalaSideParser.class);
4447

4548
private static final String CURR_TYPE = "impala";
4649

@@ -113,7 +116,7 @@ public Map setPartitionFieldValues(String partitionfieldValuesStr){
113116
}
114117
return fieldValues;
115118
} catch (Exception e) {
116-
e.printStackTrace();
119+
LOG.error("",e);
117120
throw new RuntimeException(e);
118121
}
119122
}

impala/impala-sink/src/main/java/com/dtstack/flink/sql/sink/impala/ImpalaSink.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.sink.rdb.RdbSink;
2424
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2525
import com.dtstack.flink.sql.table.TargetTableInfo;
26+
import org.apache.commons.lang.StringUtils;
2627
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
2728
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
2829

@@ -89,6 +90,8 @@ public void buildInsertSql(String tableName, List<String> fields) {
8990
boolean enablePartition = impalaTableInfo.isEnablePartition();
9091
if (enablePartition) {
9192
String partitionFieldsStr = impalaTableInfo.getPartitionFields();
93+
partitionFieldsStr = !StringUtils.isEmpty(partitionFieldsStr) ? partitionFieldsStr.replaceAll("\"", "'") : partitionFieldsStr;
94+
9295
List<String> partitionFields = Arrays.asList(partitionFieldsStr.split(","));
9396
List<String> newFields = new ArrayList<>();
9497
for (String field : fields) {

0 commit comments

Comments
 (0)