Skip to content

Commit 3f89d1e

Browse files
committed
Merge branch 'v1.8.0_dev' into feat_1.9Merge1.8dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/Main.java # core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java # core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java # core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java # core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java # hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java # kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java # kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java # kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka11JsonTableSink.java # kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java # kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java # mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java # rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java
2 parents b259aa5 + d124305 commit 3f89d1e

File tree

380 files changed

+11854
-9619
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

380 files changed

+11854
-9619
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
149149
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
150150
* savePointPath:任务恢复点的路径(默认无)
151151
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
152+
* restore.enable:是否失败重启(默认是true)
153+
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
154+
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
155+
* logLevel: 日志级别动态配置(默认info)
152156
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
153157

154158

@@ -181,6 +185,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
181185
* 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid
182186
* 必选:否
183187
* 默认值:false
188+
184189

185190
## 2 结构
186191
### 2.1 源表插件

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.table.runtime.types.CRow;
23+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.util.Collector;
26+
2127
import com.datastax.driver.core.Cluster;
2228
import com.datastax.driver.core.ConsistencyLevel;
2329
import com.datastax.driver.core.HostDistance;
@@ -28,19 +34,16 @@
2834
import com.datastax.driver.core.SocketOptions;
2935
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3036
import com.datastax.driver.core.policies.RetryPolicy;
31-
import com.dtstack.flink.sql.side.AllReqRow;
37+
import com.dtstack.flink.sql.side.BaseAllReqRow;
3238
import com.dtstack.flink.sql.side.FieldInfo;
3339
import com.dtstack.flink.sql.side.JoinInfo;
34-
import com.dtstack.flink.sql.side.SideTableInfo;
40+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
3541
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
36-
import org.apache.calcite.sql.JoinType;
37-
import org.apache.commons.collections.CollectionUtils;
38-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3942
import com.google.common.collect.Lists;
4043
import com.google.common.collect.Maps;
41-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
42-
import org.apache.flink.types.Row;
43-
import org.apache.flink.util.Collector;
44+
import org.apache.calcite.sql.JoinType;
45+
import org.apache.commons.collections.CollectionUtils;
46+
import org.apache.commons.lang3.StringUtils;
4447
import org.slf4j.Logger;
4548
import org.slf4j.LoggerFactory;
4649

@@ -59,14 +62,12 @@
5962
*
6063
* @author xuqianjin
6164
*/
62-
public class CassandraAllReqRow extends AllReqRow {
65+
public class CassandraAllReqRow extends BaseAllReqRow {
6366

6467
private static final long serialVersionUID = 54015343561288219L;
6568

6669
private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class);
6770

68-
private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver";
69-
7071
private static final int CONN_RETRY_NUM = 3;
7172

7273
private static final int FETCH_SIZE = 1000;
@@ -76,7 +77,7 @@ public class CassandraAllReqRow extends AllReqRow {
7677

7778
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
7879

79-
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
80+
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8081
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
8182
}
8283

@@ -124,14 +125,14 @@ protected void reloadCache() {
124125

125126

126127
@Override
127-
public void flatMap(Row value, Collector<Row> out) throws Exception {
128+
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
128129
List<Object> inputParams = Lists.newArrayList();
129130
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
130-
Object equalObj = value.getField(conValIndex);
131+
Object equalObj = input.row().getField(conValIndex);
131132
if (equalObj == null) {
132133
if(sideInfo.getJoinType() == JoinType.LEFT){
133-
Row data = fillData(value, null);
134-
out.collect(data);
134+
Row data = fillData(input.row(), null);
135+
out.collect(new CRow(data, input.change()));
135136
}
136137
return;
137138
}
@@ -143,8 +144,8 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
143144
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
144145
if (CollectionUtils.isEmpty(cacheList)) {
145146
if (sideInfo.getJoinType() == JoinType.LEFT) {
146-
Row row = fillData(value, null);
147-
out.collect(row);
147+
Row row = fillData(input.row(), null);
148+
out.collect(new CRow(row, input.change()));
148149
} else {
149150
return;
150151
}
@@ -153,7 +154,7 @@ public void flatMap(Row value, Collector<Row> out) throws Exception {
153154
}
154155

155156
for (Map<String, Object> one : cacheList) {
156-
out.collect(fillData(value, one));
157+
out.collect(new CRow(fillData(input.row(), one), input.change()));
157158
}
158159

159160
}
@@ -216,9 +217,9 @@ private Session getConn(CassandraSideTableInfo tableInfo) {
216217
//重试策略
217218
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
218219

219-
for (String server : address.split(",")) {
220-
cassandraPort = Integer.parseInt(server.split(":")[1]);
221-
serversList.add(InetAddress.getByName(server.split(":")[0]));
220+
for (String server : StringUtils.split(address, ",")) {
221+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
222+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
222223
}
223224

224225
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -272,7 +273,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
272273
//load data from table
273274
String sql = sideInfo.getSqlCondition() + " limit " + FETCH_SIZE;
274275
ResultSet resultSet = session.execute(sql);
275-
String[] sideFieldNames = sideInfo.getSideSelectFields().split(",");
276+
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ",");
276277
for (com.datastax.driver.core.Row row : resultSet) {
277278
Map<String, Object> oneRow = Maps.newHashMap();
278279
for (String fieldName : sideFieldNames) {

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.SideInfo;
24-
import com.dtstack.flink.sql.side.SideTableInfo;
23+
import com.dtstack.flink.sql.side.BaseSideInfo;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
2727
import org.apache.calcite.sql.SqlNode;
@@ -37,16 +37,16 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class CassandraAllSideInfo extends SideInfo {
40+
public class CassandraAllSideInfo extends BaseSideInfo {
4141

4242
private static final long serialVersionUID = -8690814317653033557L;
4343

44-
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
44+
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4545
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4646
}
4747

4848
@Override
49-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
49+
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
5050
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;
5151

5252
sqlCondition = "select ${selectField} from ${tableName} ";

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
23+
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
25+
import org.apache.flink.table.runtime.types.CRow;
26+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
27+
import org.apache.flink.types.Row;
28+
2229
import com.datastax.driver.core.Cluster;
2330
import com.datastax.driver.core.ConsistencyLevel;
2431
import com.datastax.driver.core.HostDistance;
@@ -30,32 +37,27 @@
3037
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3138
import com.datastax.driver.core.policies.RetryPolicy;
3239
import com.dtstack.flink.sql.enums.ECacheContentType;
33-
import com.dtstack.flink.sql.side.AsyncReqRow;
40+
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
3441
import com.dtstack.flink.sql.side.CacheMissVal;
3542
import com.dtstack.flink.sql.side.FieldInfo;
3643
import com.dtstack.flink.sql.side.JoinInfo;
37-
import com.dtstack.flink.sql.side.SideTableInfo;
44+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
3845
import com.dtstack.flink.sql.side.cache.CacheObj;
3946
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
4047
import com.google.common.base.Function;
48+
import com.google.common.collect.Lists;
4149
import com.google.common.util.concurrent.AsyncFunction;
4250
import com.google.common.util.concurrent.FutureCallback;
4351
import com.google.common.util.concurrent.Futures;
4452
import com.google.common.util.concurrent.ListenableFuture;
4553
import io.vertx.core.json.JsonArray;
46-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
47-
import org.apache.flink.configuration.Configuration;
48-
import com.google.common.collect.Lists;
49-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
50-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
51-
import org.apache.flink.types.Row;
54+
import org.apache.commons.lang3.StringUtils;
5255
import org.slf4j.Logger;
5356
import org.slf4j.LoggerFactory;
5457

5558
import java.net.InetAddress;
5659
import java.sql.Timestamp;
5760
import java.util.ArrayList;
58-
import java.util.Collections;
5961
import java.util.List;
6062
import java.util.Map;
6163

@@ -65,7 +67,7 @@
6567
*
6668
* @author xuqianjin
6769
*/
68-
public class CassandraAsyncReqRow extends AsyncReqRow {
70+
public class CassandraAsyncReqRow extends BaseAsyncReqRow {
6971

7072
private static final long serialVersionUID = 6631584128079864735L;
7173

@@ -81,7 +83,7 @@ public class CassandraAsyncReqRow extends AsyncReqRow {
8183
private transient ListenableFuture session;
8284
private transient CassandraSideTableInfo cassandraSideTableInfo;
8385

84-
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
86+
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
8587
super(new com.dtstack.flink.sql.side.cassandra.CassandraAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
8688
}
8789

@@ -133,9 +135,9 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
133135
//重试策略
134136
RetryPolicy retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
135137

136-
for (String server : address.split(",")) {
137-
cassandraPort = Integer.parseInt(server.split(":")[1]);
138-
serversList.add(InetAddress.getByName(server.split(":")[0]));
138+
for (String server : StringUtils.split(address, ",")) {
139+
cassandraPort = Integer.parseInt(StringUtils.split(server, ":")[1]);
140+
serversList.add(InetAddress.getByName(StringUtils.split(server, ":")[0]));
139141
}
140142

141143
if (userName == null || userName.isEmpty() || password == null || password.isEmpty()) {
@@ -160,17 +162,17 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
160162
}
161163

162164
@Override
163-
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
Row inputRow = Row.copy(input);
165+
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
166+
CRow inputCopy = new CRow(input.row(), input.change());
165167
JsonArray inputParams = new JsonArray();
166168
StringBuffer stringBuffer = new StringBuffer();
167169
String sqlWhere = " where ";
168170

169171
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
170172
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171-
Object equalObj = inputRow.getField(conValIndex);
173+
Object equalObj = inputCopy.row().getField(conValIndex);
172174
if (equalObj == null) {
173-
dealMissKey(inputRow, resultFuture);
175+
dealMissKey(inputCopy, resultFuture);
174176
return;
175177
}
176178
inputParams.add(equalObj);
@@ -194,13 +196,13 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
194196
if (val != null) {
195197

196198
if (ECacheContentType.MissVal == val.getType()) {
197-
dealMissKey(inputRow, resultFuture);
199+
dealMissKey(inputCopy, resultFuture);
198200
return;
199201
} else if (ECacheContentType.MultiLine == val.getType()) {
200-
List<Row> rowList = Lists.newArrayList();
202+
List<CRow> rowList = Lists.newArrayList();
201203
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(inputRow, jsonArray);
203-
rowList.add(row);
204+
Row row = fillData(inputCopy.row(), jsonArray);
205+
rowList.add(new CRow(row, inputCopy.change()));
204206
}
205207
resultFuture.complete(rowList);
206208
} else {
@@ -238,20 +240,20 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
238240
cluster.closeAsync();
239241
if (rows.size() > 0) {
240242
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241-
List<Row> rowList = Lists.newArrayList();
243+
List<CRow> rowList = Lists.newArrayList();
242244
for (com.datastax.driver.core.Row line : rows) {
243-
Row row = fillData(inputRow, line);
245+
Row row = fillData(inputCopy.row(), line);
244246
if (openCache()) {
245247
cacheContent.add(line);
246248
}
247-
rowList.add(row);
249+
rowList.add(new CRow(row,inputCopy.change()));
248250
}
249251
resultFuture.complete(rowList);
250252
if (openCache()) {
251253
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252254
}
253255
} else {
254-
dealMissKey(inputRow, resultFuture);
256+
dealMissKey(inputCopy, resultFuture);
255257
if (openCache()) {
256258
putCache(key, CacheMissVal.getMissKeyObj());
257259
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import com.dtstack.flink.sql.side.FieldInfo;
2222
import com.dtstack.flink.sql.side.JoinInfo;
23-
import com.dtstack.flink.sql.side.SideInfo;
24-
import com.dtstack.flink.sql.side.SideTableInfo;
23+
import com.dtstack.flink.sql.side.BaseSideInfo;
24+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2626
import com.dtstack.flink.sql.util.ParseUtils;
2727
import org.apache.calcite.sql.SqlBasicCall;
@@ -39,16 +39,16 @@
3939
*
4040
* @author xuqianjin
4141
*/
42-
public class CassandraAsyncSideInfo extends SideInfo {
42+
public class CassandraAsyncSideInfo extends BaseSideInfo {
4343

4444
private static final long serialVersionUID = -4403313049809013362L;
4545

46-
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
46+
public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
4747
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4848
}
4949

5050
@Override
51-
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
51+
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
5252
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;
5353

5454
String sideTableName = joinInfo.getSideTableName();

0 commit comments

Comments
 (0)