Skip to content

Commit 306c2f2

Browse files
committed
merge 1.8 dev
1 parent 3f89d1e commit 306c2f2

File tree

26 files changed

+260
-505
lines changed

26 files changed

+260
-505
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21+
import org.apache.flink.api.java.tuple.Tuple2;
2122
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22-
import org.apache.flink.table.runtime.types.CRow;
23-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2423
import org.apache.flink.types.Row;
2524
import org.apache.flink.util.Collector;
2625

@@ -125,14 +124,14 @@ protected void reloadCache() {
125124

126125

127126
@Override
128-
public void flatMap(CRow input, Collector<CRow> out) throws Exception {
127+
public void flatMap(Tuple2<Boolean,Row> input, Collector<Tuple2<Boolean,Row>> out) throws Exception {
129128
List<Object> inputParams = Lists.newArrayList();
130129
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
131-
Object equalObj = input.row().getField(conValIndex);
130+
Object equalObj = input.f1.getField(conValIndex);
132131
if (equalObj == null) {
133-
if(sideInfo.getJoinType() == JoinType.LEFT){
134-
Row data = fillData(input.row(), null);
135-
out.collect(new CRow(data, input.change()));
132+
if (sideInfo.getJoinType() == JoinType.LEFT) {
133+
Row data = fillData(input.f1, null);
134+
out.collect(Tuple2.of(input.f0, data));
136135
}
137136
return;
138137
}
@@ -144,8 +143,8 @@ public void flatMap(CRow input, Collector<CRow> out) throws Exception {
144143
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
145144
if (CollectionUtils.isEmpty(cacheList)) {
146145
if (sideInfo.getJoinType() == JoinType.LEFT) {
147-
Row row = fillData(input.row(), null);
148-
out.collect(new CRow(row, input.change()));
146+
Row row = fillData(input.f1, null);
147+
out.collect(Tuple2.of(input.f0, row));
149148
} else {
150149
return;
151150
}
@@ -154,7 +153,7 @@ public void flatMap(CRow input, Collector<CRow> out) throws Exception {
154153
}
155154

156155
for (Map<String, Object> one : cacheList) {
157-
out.collect(new CRow(fillData(input.row(), one), input.change()));
156+
out.collect(Tuple2.of(input.f0, fillData(input.f1, one)));
158157
}
159158

160159
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22+
import org.apache.flink.api.java.tuple.Tuple2;
2223
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2324
import org.apache.flink.configuration.Configuration;
2425
import org.apache.flink.streaming.api.functions.async.ResultFuture;
25-
import org.apache.flink.table.runtime.types.CRow;
26-
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2726
import org.apache.flink.types.Row;
2827

2928
import com.datastax.driver.core.Cluster;
@@ -162,15 +161,15 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
162161
}
163162

164163
@Override
165-
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
166-
CRow inputCopy = new CRow(input.row(), input.change());
164+
public void asyncInvoke(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture) throws Exception {
165+
Tuple2<Boolean, Row> inputCopy = Tuple2.of(input.f0, input.f1);
167166
JsonArray inputParams = new JsonArray();
168167
StringBuffer stringBuffer = new StringBuffer();
169168
String sqlWhere = " where ";
170169

171170
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
172171
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
173-
Object equalObj = inputCopy.row().getField(conValIndex);
172+
Object equalObj = inputCopy.f1.getField(conValIndex);
174173
if (equalObj == null) {
175174
dealMissKey(inputCopy, resultFuture);
176175
return;
@@ -199,10 +198,10 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
199198
dealMissKey(inputCopy, resultFuture);
200199
return;
201200
} else if (ECacheContentType.MultiLine == val.getType()) {
202-
List<CRow> rowList = Lists.newArrayList();
201+
List<Tuple2<Boolean,Row>> rowList = Lists.newArrayList();
203202
for (Object jsonArray : (List) val.getContent()) {
204-
Row row = fillData(inputCopy.row(), jsonArray);
205-
rowList.add(new CRow(row, inputCopy.change()));
203+
Row row = fillData(inputCopy.f1, jsonArray);
204+
rowList.add(Tuple2.of(inputCopy.f0, row));
206205
}
207206
resultFuture.complete(rowList);
208207
} else {
@@ -240,13 +239,13 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
240239
cluster.closeAsync();
241240
if (rows.size() > 0) {
242241
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
243-
List<CRow> rowList = Lists.newArrayList();
242+
List<Tuple2<Boolean,Row>> rowList = Lists.newArrayList();
244243
for (com.datastax.driver.core.Row line : rows) {
245-
Row row = fillData(inputCopy.row(), line);
244+
Row row = fillData(inputCopy.f1, line);
246245
if (openCache()) {
247246
cacheContent.add(line);
248247
}
249-
rowList.add(new CRow(row,inputCopy.change()));
248+
rowList.add(Tuple2.of(inputCopy.f0,row));
250249
}
251250
resultFuture.complete(rowList);
252251
if (openCache()) {

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import org.apache.flink.api.java.typeutils.RowTypeInfo;
6161
import org.apache.flink.streaming.api.datastream.DataStream;
6262
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
63-
import org.apache.flink.table.api.StreamQueryConfig;
6463
import org.apache.flink.table.api.Table;
6564
import org.apache.flink.table.api.TableEnvironment;
6665
import org.apache.flink.table.api.java.StreamTableEnvironment;
@@ -151,7 +150,6 @@ public static boolean checkRemoteSqlPluginPath(String remoteSqlPluginPath, Strin
151150
public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInfo) throws Exception {
152151
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode());
153152
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
154-
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, paramsInfo.getConfProp());
155153

156154
SqlParser.setLocalSqlPluginRoot(paramsInfo.getLocalSqlPluginPath());
157155
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql());
@@ -167,7 +165,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
167165
// cache classPathSets
168166
ExecuteProcessHelper.registerPluginUrlToCachedFile(env, classPathSets);
169167

170-
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), tableEnv, sqlTree, sideTableMap, registerTableCache, streamQueryConfig);
168+
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), tableEnv, sqlTree, sideTableMap, registerTableCache);
171169

172170
if (env instanceof MyLocalStreamEnvironment) {
173171
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -193,13 +191,12 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
193191
private static void sqlTranslation(String localSqlPluginPath,
194192
StreamTableEnvironment tableEnv,
195193
SqlTree sqlTree,Map<String, AbstractSideTableInfo> sideTableMap,
196-
Map<String, Table> registerTableCache,
197-
StreamQueryConfig queryConfig) throws Exception {
194+
Map<String, Table> registerTableCache) throws Exception {
198195

199196
SideSqlExec sideSqlExec = new SideSqlExec();
200197
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
201198
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
202-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result);
199+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, result);
203200
}
204201

205202
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
@@ -216,7 +213,7 @@ private static void sqlTranslation(String localSqlPluginPath,
216213
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
217214
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
218215
tmp.setExecSql(tmpSql);
219-
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp);
216+
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, tmp);
220217
} else {
221218
for (String sourceTable : result.getSourceTableList()) {
222219
if (sideTableMap.containsKey(sourceTable)) {
@@ -226,12 +223,12 @@ private static void sqlTranslation(String localSqlPluginPath,
226223
}
227224
if (isSide) {
228225
//sql-dimensional table contains the dimension table of execution
229-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null);
226+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, null);
230227
} else {
231228
System.out.println("----------exec sql without dimension join-----------");
232229
System.out.println("----------real sql exec is--------------------------");
233230
System.out.println(result.getExecSql());
234-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
231+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
235232
if (LOG.isInfoEnabled()) {
236233
System.out.println();
237234
LOG.info("exec sql: " + result.getExecSql());

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,19 @@
4545

4646

4747
/**
48-
* @description: mapping by name when insert into sink table
48+
* @description: mapping by name when insert into sink table
4949
* @author: maqi
5050
* @create: 2019/08/15 11:09
5151
*/
5252
public class FlinkSQLExec {
5353
private static final Logger LOG = LoggerFactory.getLogger(FlinkSQLExec.class);
54+
5455
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
5556
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
56-
StreamPlanner streamPlanner = (StreamPlanner)tableEnvImpl.getPlanner();
57+
StreamPlanner streamPlanner = (StreamPlanner) tableEnvImpl.getPlanner();
5758
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
5859

59-
RichSqlInsert insert = (RichSqlInsert)flinkPlanner.parse(stmt);
60+
RichSqlInsert insert = (RichSqlInsert) flinkPlanner.parse(stmt);
6061
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
6162

6263
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
@@ -78,9 +79,9 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
7879
newTable = queryResult.select(String.join(",", sinkFieldNames));
7980
} catch (Exception e) {
8081
throw new ValidationException(
81-
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
82-
"Query result schema: " + String.join(",", queryFieldNames) + "\n" +
83-
"TableSink schema: " + String.join(",", sinkFieldNames));
82+
"Field name of query result and registered TableSink " + targetTableName + " do not match.\n" +
83+
"Query result schema: " + String.join(",", queryFieldNames) + "\n" +
84+
"TableSink schema: " + String.join(",", sinkFieldNames));
8485
}
8586

8687
try {

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.factory.DTThreadFactory;
24-
import org.apache.calcite.sql.JoinType;
24+
import org.apache.flink.api.common.functions.RichFlatMapFunction;
25+
import org.apache.flink.api.java.tuple.Tuple2;
2526
import org.apache.flink.configuration.Configuration;
26-
import org.apache.flink.table.runtime.types.CRow;
2727
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
2828
import org.apache.flink.types.Row;
2929

@@ -41,7 +41,7 @@
4141
* @author xuchao
4242
*/
4343

44-
public abstract class BaseAllReqRow extends RichFlatMapFunction<CRow, CRow> implements ISideReqRow {
44+
public abstract class BaseAllReqRow extends RichFlatMapFunction<Tuple2<Boolean,Row>, Tuple2<Boolean,Row>> implements ISideReqRow {
4545

4646
protected BaseSideInfo sideInfo;
4747

@@ -78,15 +78,6 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
7878
return obj;
7979
}
8080

81-
protected void sendOutputRow(CRow value, Object sideInput, Collector<CRow> out) {
82-
if (sideInput == null && sideInfo.getJoinType() != JoinType.LEFT) {
83-
return;
84-
}
85-
86-
Row row = fillData(value.row(), sideInput);
87-
out.collect(new CRow(row, value.change()));
88-
}
89-
9081
@Override
9182
public void close() throws Exception {
9283
if (null != es && !es.isShutdown()) {

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,18 @@
2626
import com.dtstack.flink.sql.side.cache.CacheObj;
2727
import com.dtstack.flink.sql.side.cache.LRUSideCache;
2828
import org.apache.calcite.sql.JoinType;
29+
import org.apache.flink.api.java.tuple.Tuple2;
2930
import org.apache.flink.configuration.Configuration;
3031
import org.apache.flink.metrics.Counter;
3132
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3233
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
33-
import org.apache.flink.table.runtime.types.CRow;
34-
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
3534
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3635
import org.apache.flink.types.Row;
3736
import org.slf4j.Logger;
3837
import org.slf4j.LoggerFactory;
3938

4039
import java.sql.Timestamp;
4140
import java.time.LocalDateTime;
42-
import java.util.Collection;
4341
import java.util.Collections;
4442

4543
/**
@@ -50,7 +48,7 @@
5048
* @author xuchao
5149
*/
5250

53-
public abstract class BaseAsyncReqRow extends RichAsyncFunction<CRow, CRow> implements ISideReqRow {
51+
public abstract class BaseAsyncReqRow extends RichAsyncFunction<Tuple2<Boolean,Row>, Tuple2<Boolean,Row>> implements ISideReqRow {
5452
private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncReqRow.class);
5553
private static final long serialVersionUID = 2098635244857937717L;
5654

@@ -114,12 +112,12 @@ protected boolean openCache(){
114112
return sideInfo.getSideCache() != null;
115113
}
116114

117-
protected void dealMissKey(CRow input, ResultFuture<CRow> resultFuture){
115+
protected void dealMissKey(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture){
118116
if(sideInfo.getJoinType() == JoinType.LEFT){
119117
//Reserved left table data
120118
try {
121-
Row row = fillData(input.row(), null);
122-
resultFuture.complete(Collections.singleton(new CRow(row, input.change())));
119+
Row row = fillData(input.f1, null);
120+
resultFuture.complete(Collections.singleton(new Tuple2<>(input.f0, row)));
123121
} catch (Exception e) {
124122
dealFillDataError(resultFuture, e, input);
125123
}
@@ -135,7 +133,7 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
135133
}
136134

137135
@Override
138-
public void timeout(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
136+
public void timeout(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture) throws Exception {
139137

140138
if(timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0){
141139
LOG.info("Async function call has timed out. input:{}, timeOutNum:{}",input.toString(), timeOutNum);
@@ -150,7 +148,7 @@ public void timeout(CRow input, ResultFuture<CRow> resultFuture) throws Exceptio
150148
}
151149

152150

153-
protected void dealFillDataError(ResultFuture<CRow> resultFuture, Exception e, Object sourceData) {
151+
protected void dealFillDataError(ResultFuture<Tuple2<Boolean,Row>> resultFuture, Exception e, Object sourceData) {
154152
LOG.debug("source data {} join side table error ", sourceData);
155153
LOG.debug("async buid row error..{}", e);
156154
parseErrorRecords.inc();

0 commit comments

Comments
 (0)