Skip to content

Commit 18e1753

Browse files
committed
Merge branch 'v1.5.0_dev_bugfix' into v1.5.0_dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
2 parents 91efc6e + 2c784e6 commit 18e1753

File tree

5 files changed

+26
-12
lines changed

5 files changed

+26
-12
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
4747
import org.apache.flink.api.common.time.Time;
4848
import org.apache.flink.api.common.typeinfo.TypeInformation;
49+
import org.apache.flink.api.java.tuple.Tuple2;
4950
import org.apache.flink.api.java.typeutils.RowTypeInfo;
5051
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
5152
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
@@ -271,7 +272,10 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
271272
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
272273

273274
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
274-
DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
275+
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
276+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
277+
.returns(typeInfo);
278+
275279
String fields = String.join(",", typeInfo.getFieldNames());
276280

277281
if(waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)){

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.side;
2222

@@ -41,13 +41,15 @@
4141
import org.apache.calcite.sql.parser.SqlParserPos;
4242
import org.apache.commons.collections.CollectionUtils;
4343
import org.apache.flink.api.common.typeinfo.TypeInformation;
44+
import org.apache.flink.api.java.tuple.Tuple2;
4445
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4546
import org.apache.flink.calcite.shaded.com.google.common.collect.HashBasedTable;
4647
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
4748
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
4849
import org.apache.flink.streaming.api.datastream.DataStream;
4950
import org.apache.flink.table.api.Table;
5051
import org.apache.flink.table.api.java.StreamTableEnvironment;
52+
import org.apache.flink.types.Row;
5153

5254
import java.util.*;
5355

@@ -593,7 +595,11 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
593595
}
594596

595597
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
596-
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class);
598+
599+
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
600+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
601+
.returns(Row.class);
602+
597603
//join side table before keyby ===> Reducing the size of each dimension table cache of async
598604
if(sideTableInfo.isPartitionedJoin()){
599605
List<String> leftJoinColList = getConditionFields(joinInfo.getCondition(), joinInfo.getLeftTableAlias());

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,15 @@ public static URL getRemoteJarFilePath(String pluginType, String tableType, Stri
111111
String dirName = pluginType + tableType.toLowerCase();
112112
String prefix = String.format("%s-%s", pluginType, tableType.toLowerCase());
113113
String jarPath = remoteSqlRootDir + SP + dirName;
114-
String jarName = getCoreJarFileName(jarPath, prefix);
114+
String jarName = getCoreJarFileName(jarPath, prefix, false);
115115
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
116116
}
117117

118118
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir) throws Exception {
119119
String dirName = pluginType + sideOperator + tableType.toLowerCase();
120120
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
121121
String jarPath = remoteSqlRootDir + SP + dirName;
122-
String jarName = getCoreJarFileName(jarPath, prefix);
122+
String jarName = getCoreJarFileName(jarPath, prefix, false);
123123
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
124124
}
125125

@@ -144,7 +144,11 @@ public static void addPluginJar(String pluginDir, DtClassLoader classLoader) thr
144144
}
145145
}
146146

147-
public static String getCoreJarFileName (String path, String prefix) throws Exception {
147+
public static String getCoreJarFileName (String path, String prefix, boolean existCheck) throws Exception {
148+
if (!existCheck) {
149+
return prefix.toLowerCase() + ".jar";
150+
}
151+
148152
String coreJarFileName = null;
149153
File pluginDir = new File(path);
150154
if (pluginDir.exists() && pluginDir.isDirectory()){

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class LauncherMain {
5959

6060

6161
private static String getLocalCoreJarPath(String localSqlRootJar) throws Exception {
62-
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR);
62+
String jarPath = PluginUtil.getCoreJarFileName(localSqlRootJar, CORE_JAR, true);
6363
String corePath = localSqlRootJar + SP + jarPath;
6464
return corePath;
6565
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,16 @@ public class PerJobSubmitter {
5252
public static String submit(LauncherOptions launcherOptions, JobGraph jobGraph) throws Exception {
5353

5454
fillJobGraphClassPath(jobGraph);
55-
56-
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
57-
if (StringUtils.isNotBlank(addjarPath) ){
55+
if (!StringUtils.isBlank(launcherOptions.getAddjar())) {
56+
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
5857
List<String> paths = getJarPaths(addjarPath);
59-
paths.forEach( path ->{
58+
paths.forEach( path -> {
6059
jobGraph.addJar(new Path("file://" + path));
6160
});
62-
6361
}
6462

63+
64+
6565
String confProp = launcherOptions.getConfProp();
6666
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
6767
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);

0 commit comments

Comments
 (0)