Skip to content

Commit b68b197

Browse files
committed
Merge branch 'v1.5_v3.6.0_beta_1.0' into 1.5_v3.6.0
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/Main.java # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
2 parents 38f39fa + a40a6b0 commit b68b197

File tree

80 files changed

+2630
-96
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+2630
-96
lines changed

cassandra/cassandra-side/cassandra-all-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
</copy>
7777

7878
<move file="${basedir}/../../../plugins/cassandraallside/${project.artifactId}-${project.version}.jar"
79-
tofile="${basedir}/../../../plugins/cassandraallside/${project.name}.jar" />
79+
tofile="${basedir}/../../../plugins/cassandraallside/${project.name}-${git.branch}.jar" />
8080
</tasks>
8181
</configuration>
8282
</execution>

cassandra/cassandra-side/cassandra-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
</copy>
9393

9494
<move file="${basedir}/../../../plugins/cassandraasyncside/${project.artifactId}-${project.version}.jar"
95-
tofile="${basedir}/../../../plugins/cassandraasyncside/${project.name}.jar" />
95+
tofile="${basedir}/../../../plugins/cassandraasyncside/${project.name}-${git.branch}.jar" />
9696
</tasks>
9797
</configuration>
9898
</execution>

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
</fileset>
107107
</copy>
108108
<move file="${basedir}/../plugins/${project.artifactId}-${project.version}.jar"
109-
tofile="${basedir}/../plugins/${project.name}.jar" />
109+
tofile="${basedir}/../plugins/${project.name}-${git.branch}.jar" />
110110
</tasks>
111111
</configuration>
112112
</execution>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
273273

274274
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
275275
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
276-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
277-
.returns(typeInfo);
276+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
277+
.returns(typeInfo);
278278

279279
String fields = String.join(",", typeInfo.getFieldNames());
280280

@@ -288,18 +288,18 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
288288
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
289289
tableEnv.registerTable(tableInfo.getName(), regTable);
290290
registerTableCache.put(tableInfo.getName(), regTable);
291-
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath));
291+
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
292292
} else if (tableInfo instanceof TargetTableInfo) {
293293

294294
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
295295
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
296296
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
297-
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
297+
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
298298
} else if(tableInfo instanceof SideTableInfo){
299299

300300
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
301301
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
302-
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
302+
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
303303
}else {
304304
throw new RuntimeException("not support table type:" + tableInfo.getType());
305305
}

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,15 @@ public static SqlTree parseSql(String sql) throws Exception {
138138
if (!sqlTree.getTableInfoMap().keySet().contains(tableName)){
139139
CreateTableParser.SqlParserResult createTableResult = sqlTree.getPreDealTableMap().get(tableName);
140140
if(createTableResult == null){
141-
throw new RuntimeException("can't find table " + tableName);
141+
CreateTmpTableParser.SqlParserResult tmpTableResult = sqlTree.getTmpTableMap().get(tableName);
142+
if (tmpTableResult == null){
143+
throw new RuntimeException("can't find table " + tableName);
144+
}
145+
} else {
146+
TableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SOURCE.getType(),
147+
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
148+
sqlTree.addTableInfo(tableName, tableInfo);
142149
}
143-
144-
TableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SOURCE.getType(),
145-
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
146-
sqlTree.addTableInfo(tableName, tableInfo);
147150
}
148151
}
149152
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.side;
2222

@@ -599,9 +599,10 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
599599
}
600600

601601
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
602+
602603
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
603-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
604-
.returns(Row.class);
604+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
605+
.returns(Row.class);
605606

606607
//join side table before keyby ===> Reducing the size of each dimension table cache of async
607608
if(sideTableInfo.isPartitionedJoin()){

core/src/main/java/com/dtstack/flink/sql/sink/StreamSinkFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir)
5151
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
5252

5353
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), sqlRootDir);
54-
5554
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
56-
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
55+
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
56+
String className = PluginUtil.getSqlParserClassName(typeNoVersion, CURR_TYPE);
5757
Class<?> targetParser = dtClassLoader.loadClass(className);
5858

5959
if(!AbsTableParser.class.isAssignableFrom(targetParser)){
@@ -77,7 +77,8 @@ public static TableSink getTableSink(TargetTableInfo targetTableInfo, String loc
7777

7878
PluginUtil.addPluginJar(pluginJarDirPath, dtClassLoader);
7979

80-
String className = PluginUtil.getGenerClassName(pluginType, CURR_TYPE);
80+
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
81+
String className = PluginUtil.getGenerClassName(typeNoVersion, CURR_TYPE);
8182
Class<?> sinkClass = dtClassLoader.loadClass(className);
8283

8384
if(!IStreamSinkGener.class.isAssignableFrom(sinkClass)){

core/src/main/java/com/dtstack/flink/sql/table/SourceTableInfo.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
package com.dtstack.flink.sql.table;
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
24+
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2425
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
26+
import org.apache.flink.util.StringUtils;
2527

28+
import java.util.ArrayList;
2629
import java.util.Map;
30+
import java.util.TimeZone;
2731

2832
/**
2933
* Reason:
@@ -36,6 +40,10 @@ public abstract class SourceTableInfo extends TableInfo {
3640

3741
public static final String SOURCE_SUFFIX = "Source";
3842

43+
public static final String TIME_ZONE_KEY="timezone";
44+
45+
private String timeZone=TimeZone.getDefault().getID();
46+
3947
private String eventTimeField;
4048

4149
private Integer maxOutOrderness = 10;
@@ -63,7 +71,6 @@ public void setMaxOutOrderness(Integer maxOutOrderness) {
6371
if(maxOutOrderness == null){
6472
return;
6573
}
66-
6774
this.maxOutOrderness = maxOutOrderness;
6875
}
6976

@@ -101,4 +108,23 @@ public String getAdaptSelectSql(){
101108
public String getAdaptName(){
102109
return getName() + "_adapt";
103110
}
111+
112+
public String getTimeZone() {
113+
return timeZone;
114+
}
115+
116+
public void setTimeZone(String timeZone) {
117+
if (StringUtils.isNullOrWhitespaceOnly(timeZone)){
118+
return;
119+
}
120+
timeZoneCheck(timeZone);
121+
this.timeZone = timeZone;
122+
}
123+
124+
private void timeZoneCheck(String timeZone) {
125+
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
126+
if (!zones.contains(timeZone)){
127+
throw new IllegalArgumentException(" timezone is Incorrect!");
128+
}
129+
}
104130
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.dtstack.flink.sql.udf;
2+
3+
import org.apache.flink.table.functions.FunctionContext;
4+
import org.apache.flink.table.functions.ScalarFunction;
5+
6+
import java.sql.Timestamp;
7+
8+
public class TimestampUdf extends ScalarFunction {
9+
@Override
10+
public void open(FunctionContext context) {
11+
}
12+
public static Timestamp eval(String timestamp) {
13+
if (timestamp.length() == 13){
14+
return new Timestamp(Long.parseLong(timestamp));
15+
}else if (timestamp.length() == 10){
16+
return new Timestamp(Long.parseLong(timestamp)*1000);
17+
} else{
18+
return Timestamp.valueOf(timestamp);
19+
}
20+
}
21+
@Override
22+
public void close() {
23+
}
24+
}

core/src/main/java/com/dtstack/flink/sql/util/FlinkUtil.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,13 @@ public static URLClassLoader loadExtraJar(List<URL> jarURLList, URLClassLoader c
267267
return classLoader;
268268
}
269269

270-
private static void urlClassLoaderAddUrl(URLClassLoader classLoader, URL url) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
271-
Method method = classLoader.getClass().getDeclaredMethod("addURL", URL.class);
270+
private static void urlClassLoaderAddUrl(URLClassLoader classLoader, URL url) throws InvocationTargetException, IllegalAccessException {
271+
Method method = ReflectionUtils.getDeclaredMethod(classLoader, "addURL", URL.class);
272+
273+
if(method == null){
274+
throw new RuntimeException("can't not find declared method addURL, curr classLoader is " + classLoader.getClass());
275+
}
276+
272277
method.setAccessible(true);
273278
method.invoke(classLoader, url);
274279
}

0 commit comments

Comments
 (0)