Skip to content

Commit a40a6b0

Browse files
author
gituser
committed
Merge branch 'v1.5.0_dev' into v1.5_v3.6.0_beta_1.0
2 parents b730a0f + c098bd9 commit a40a6b0

File tree

27 files changed

+210
-107
lines changed

27 files changed

+210
-107
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideInfo;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
2627
import org.apache.calcite.sql.SqlBasicCall;
2728
import org.apache.calcite.sql.SqlKind;
2829
import org.apache.calcite.sql.SqlNode;
@@ -86,11 +87,7 @@ public void parseSelectFields(JoinInfo joinInfo) {
8687
SqlNode conditionNode = joinInfo.getCondition();
8788

8889
List<SqlNode> sqlNodeList = Lists.newArrayList();
89-
if (conditionNode.getKind() == SqlKind.AND) {
90-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall) conditionNode).getOperands()));
91-
} else {
92-
sqlNodeList.add(conditionNode);
93-
}
90+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
9491

9592
for (SqlNode sqlNode : sqlNodeList) {
9693
dealOneEqualCon(sqlNode, sideTableName);

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.side.SideInfo;
2424
import com.dtstack.flink.sql.side.SideTableInfo;
2525
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
26+
import com.dtstack.flink.sql.util.ParseUtils;
2627
import org.apache.calcite.sql.SqlBasicCall;
2728
import org.apache.calcite.sql.SqlIdentifier;
2829
import org.apache.calcite.sql.SqlKind;
@@ -55,11 +56,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5556
SqlNode conditionNode = joinInfo.getCondition();
5657

5758
List<SqlNode> sqlNodeList = Lists.newArrayList();
58-
if (conditionNode.getKind() == SqlKind.AND) {
59-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall) conditionNode).getOperands()));
60-
} else {
61-
sqlNodeList.add(conditionNode);
62-
}
59+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6360

6461
for (SqlNode sqlNode : sqlNodeList) {
6562
dealOneEqualCon(sqlNode, sideTableName);

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
4747
import org.apache.flink.api.common.time.Time;
4848
import org.apache.flink.api.common.typeinfo.TypeInformation;
49+
import org.apache.flink.api.java.tuple.Tuple2;
4950
import org.apache.flink.api.java.typeutils.RowTypeInfo;
5051
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
5152
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
@@ -139,15 +140,8 @@ public static void main(String[] args) throws Exception {
139140
}
140141

141142
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
142-
DtClassLoader dtClassLoader = new DtClassLoader(new URL[]{}, threadClassLoader);
143-
Thread.currentThread().setContextClassLoader(dtClassLoader);
144-
145-
URLClassLoader parentClassloader;
146-
if(!ClusterMode.local.name().equals(deployMode)){
147-
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
148-
}else{
149-
parentClassloader = dtClassLoader;
150-
}
143+
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
144+
Thread.currentThread().setContextClassLoader(parentClassloader);
151145

152146
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
153147
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
@@ -218,7 +212,7 @@ public static void main(String[] args) throws Exception {
218212

219213
if(env instanceof MyLocalStreamEnvironment) {
220214
List<URL> urlList = new ArrayList<>();
221-
urlList.addAll(Arrays.asList(dtClassLoader.getURLs()));
215+
urlList.addAll(Arrays.asList(parentClassloader.getURLs()));
222216
((MyLocalStreamEnvironment) env).setClasspaths(urlList);
223217
}
224218

@@ -254,7 +248,6 @@ private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLo
254248
if (classLoader == null) {
255249
classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
256250
}
257-
classLoader.loadClass(funcInfo.getClassName());
258251
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
259252
tableEnv, classLoader);
260253
}
@@ -279,7 +272,10 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
279272
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
280273

281274
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
282-
DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
275+
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
276+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
277+
.returns(typeInfo);
278+
283279
String fields = String.join(",", typeInfo.getFieldNames());
284280

285281
if(waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)){
@@ -292,18 +288,18 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
292288
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
293289
tableEnv.registerTable(tableInfo.getName(), regTable);
294290
registerTableCache.put(tableInfo.getName(), regTable);
295-
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath));
291+
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
296292
} else if (tableInfo instanceof TargetTableInfo) {
297293

298294
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
299295
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
300296
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
301-
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
297+
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
302298
} else if(tableInfo instanceof SideTableInfo){
303299

304300
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
305301
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
306-
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
302+
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
307303
}else {
308304
throw new RuntimeException("not support table type:" + tableInfo.getType());
309305
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,16 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
159159
JoinInfo tableInfo = new JoinInfo();
160160
tableInfo.setLeftTableName(leftTbName);
161161
tableInfo.setRightTableName(rightTableName);
162-
tableInfo.setLeftTableAlias(leftTbAlias);
163-
tableInfo.setRightTableAlias(rightTableAlias);
162+
if (leftTbAlias.equals("")){
163+
tableInfo.setLeftTableAlias(leftTbName);
164+
} else {
165+
tableInfo.setLeftTableAlias(leftTbAlias);
166+
}
167+
if (leftTbAlias.equals("")){
168+
tableInfo.setRightTableAlias(rightTableName);
169+
} else {
170+
tableInfo.setRightTableAlias(rightTableAlias);
171+
}
164172
tableInfo.setLeftIsSideTable(leftIsSide);
165173
tableInfo.setRightIsSideTable(rightIsSide);
166174
tableInfo.setLeftNode(leftNode);

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.side;
2222

@@ -25,6 +25,7 @@
2525
import com.dtstack.flink.sql.side.operator.SideAsyncOperator;
2626
import com.dtstack.flink.sql.side.operator.SideWithAllCacheOperator;
2727
import com.dtstack.flink.sql.util.ClassUtil;
28+
import com.dtstack.flink.sql.util.ParseUtils;
2829
import org.apache.calcite.sql.SqlBasicCall;
2930
import org.apache.calcite.sql.SqlDataTypeSpec;
3031
import org.apache.calcite.sql.SqlIdentifier;
@@ -40,13 +41,15 @@
4041
import org.apache.calcite.sql.parser.SqlParserPos;
4142
import org.apache.commons.collections.CollectionUtils;
4243
import org.apache.flink.api.common.typeinfo.TypeInformation;
44+
import org.apache.flink.api.java.tuple.Tuple2;
4345
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4446
import org.apache.flink.calcite.shaded.com.google.common.collect.HashBasedTable;
4547
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
4648
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
4749
import org.apache.flink.streaming.api.datastream.DataStream;
4850
import org.apache.flink.table.api.Table;
4951
import org.apache.flink.table.api.java.StreamTableEnvironment;
52+
import org.apache.flink.types.Row;
5053

5154
import java.util.*;
5255

@@ -435,7 +438,7 @@ private SqlNode replaceSelectFieldName(SqlNode selectNode, HashBasedTable<String
435438
}
436439
}
437440

438-
replaceSelectFieldName(elseNode, mappingTable, tableAlias);
441+
((SqlCase) selectNode).setOperand(3, replaceSelectFieldName(elseNode, mappingTable, tableAlias));
439442
return selectNode;
440443
}else if(selectNode.getKind() == OTHER){
441444
//不处理
@@ -461,12 +464,7 @@ private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias,
461464

462465
public List<String> getConditionFields(SqlNode conditionNode, String specifyTableName){
463466
List<SqlNode> sqlNodeList = Lists.newArrayList();
464-
if(conditionNode.getKind() == SqlKind.AND){
465-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
466-
}else{
467-
sqlNodeList.add(conditionNode);
468-
}
469-
467+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
470468
List<String> conditionFields = Lists.newArrayList();
471469
for(SqlNode sqlNode : sqlNodeList){
472470
if(sqlNode.getKind() != SqlKind.EQUALS){
@@ -597,7 +595,10 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
597595
}
598596

599597
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
600-
DataStream adaptStream = tableEnv.toAppendStream(targetTable, org.apache.flink.types.Row.class);
598+
599+
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
600+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
601+
.returns(Row.class);
601602

602603
//join side table before keyby ===> Reducing the size of each dimension table cache of async
603604
if(sideTableInfo.isPartitionedJoin()){
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.dtstack.flink.sql.udf;
2+
3+
import org.apache.flink.table.functions.FunctionContext;
4+
import org.apache.flink.table.functions.ScalarFunction;
5+
6+
import java.sql.Timestamp;
7+
8+
public class TimestampUdf extends ScalarFunction {
9+
@Override
10+
public void open(FunctionContext context) {
11+
}
12+
public static Timestamp eval(String timestamp) {
13+
if (timestamp.length() == 13){
14+
return new Timestamp(Long.parseLong(timestamp));
15+
}else if (timestamp.length() == 10){
16+
return new Timestamp(Long.parseLong(timestamp)*1000);
17+
} else{
18+
return Timestamp.valueOf(timestamp);
19+
}
20+
}
21+
@Override
22+
public void close() {
23+
}
24+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.dtstack.flink.sql.util;
2+
3+
import org.apache.calcite.sql.SqlBasicCall;
4+
import org.apache.calcite.sql.SqlKind;
5+
import org.apache.calcite.sql.SqlNode;
6+
7+
import java.util.List;
8+
9+
/**
10+
* @Auther: jiangjunjie
11+
* @Date: 2019-06-30 14:57
12+
* @Description:
13+
*/
14+
public class ParseUtils {
15+
public static void parseAnd(SqlNode conditionNode, List<SqlNode> sqlNodeList){
16+
if(conditionNode.getKind() == SqlKind.AND && ((SqlBasicCall)conditionNode).getOperandList().size()==2){
17+
parseAnd(((SqlBasicCall)conditionNode).getOperands()[0], sqlNodeList);
18+
sqlNodeList.add(((SqlBasicCall)conditionNode).getOperands()[1]);
19+
}else{
20+
sqlNodeList.add(conditionNode);
21+
}
22+
}
23+
}

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,18 @@ public static Properties stringToProperties(String str) throws IOException{
107107
return properties;
108108
}
109109

110-
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir) throws Exception {
110+
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
111111
String dirName = pluginType + tableType.toLowerCase();
112112
String prefix = String.format("%s-%s", pluginType, tableType.toLowerCase());
113-
String jarPath = remoteSqlRootDir + SP + dirName;
113+
String jarPath = localSqlPluginPath + SP + dirName;
114114
String jarName = getCoreJarFileName(jarPath, prefix);
115115
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
116116
}
117117

118-
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir) throws Exception {
118+
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath) throws Exception {
119119
String dirName = pluginType + sideOperator + tableType.toLowerCase();
120120
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
121-
String jarPath = remoteSqlRootDir + SP + dirName;
121+
String jarPath = localSqlPluginPath + SP + dirName;
122122
String jarName = getCoreJarFileName(jarPath, prefix);
123123
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
124124
}

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.dtstack.flink.sql.side.JoinInfo;
2525
import com.dtstack.flink.sql.side.SideInfo;
2626
import com.dtstack.flink.sql.side.SideTableInfo;
27+
import com.dtstack.flink.sql.util.ParseUtils;
2728
import org.apache.calcite.sql.SqlBasicCall;
2829
import org.apache.calcite.sql.SqlKind;
2930
import org.apache.calcite.sql.SqlNode;
@@ -53,11 +54,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5354
SqlNode conditionNode = joinInfo.getCondition();
5455

5556
List<SqlNode> sqlNodeList = Lists.newArrayList();
56-
if(conditionNode.getKind() == SqlKind.AND){
57-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
58-
}else{
59-
sqlNodeList.add(conditionNode);
60-
}
57+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6158

6259
for(SqlNode sqlNode : sqlNodeList){
6360
dealOneEqualCon(sqlNode, sideTableName);

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncSideInfo.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.dtstack.flink.sql.side.SideInfo;
66
import com.dtstack.flink.sql.side.SideTableInfo;
77
import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo;
8+
import com.dtstack.flink.sql.util.ParseUtils;
89
import org.apache.calcite.sql.SqlBasicCall;
910
import org.apache.calcite.sql.SqlKind;
1011
import org.apache.calcite.sql.SqlNode;
@@ -55,11 +56,7 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
5556
SqlNode conditionNode = joinInfo.getCondition();
5657

5758
List<SqlNode> sqlNodeList = Lists.newArrayList();
58-
if(conditionNode.getKind() == SqlKind.AND){
59-
sqlNodeList.addAll(Lists.newArrayList(((SqlBasicCall)conditionNode).getOperands()));
60-
}else{
61-
sqlNodeList.add(conditionNode);
62-
}
59+
ParseUtils.parseAnd(conditionNode, sqlNodeList);
6360

6461
for(SqlNode sqlNode : sqlNodeList){
6562
dealOneEqualCon(sqlNode, sideTableName);

0 commit comments

Comments
 (0)