Skip to content

Commit ea5b299

Browse files
committed
Merge remote-tracking branch 'origin/v1.5.0_dev' into v1.8.0_dev_merge1.5.0
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/Main.java # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java # kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java
2 parents 838f481 + 6b504ff commit ea5b299

File tree

32 files changed

+506
-181
lines changed

32 files changed

+506
-181
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ target/
1010
plugins/
1111
lib/
1212
.vertx/
13+
.DS_Store
1314
bin/nohup.out
14-
15+
.DS_Store
1516
bin/sideSql.txt

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public void onFailure(Throwable t) {
256256
t.getMessage());
257257
System.out.println("Failed to retrieve the data: " + t.getMessage());
258258
cluster.closeAsync();
259-
resultFuture.complete(null);
259+
resultFuture.completeExceptionally(t);
260260
}
261261
});
262262
}

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
2424
import com.dtstack.flink.sql.enums.ECacheType;
25-
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
26-
import com.dtstack.flink.sql.parser.*;
25+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
26+
import com.dtstack.flink.sql.parser.CreateFuncParser;
27+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
28+
import com.dtstack.flink.sql.parser.InsertSqlParser;
29+
import com.dtstack.flink.sql.parser.SqlParser;
30+
import com.dtstack.flink.sql.parser.SqlTree;
2731
import com.dtstack.flink.sql.side.SideSqlExec;
2832
import com.dtstack.flink.sql.side.SideTableInfo;
2933
import com.dtstack.flink.sql.table.SourceTableInfo;
@@ -205,7 +209,10 @@ public static void main(String[] args) throws Exception {
205209
//sql-dimensional table contains the dimension table of execution
206210
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
207211
}else{
208-
tableEnv.sqlUpdate(result.getExecSql());
212+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
213+
if(LOG.isInfoEnabled()){
214+
LOG.info("exec sql: " + result.getExecSql());
215+
}
209216
}
210217
}
211218
}
@@ -288,6 +295,9 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
288295

289296
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
290297
tableEnv.registerTable(tableInfo.getName(), regTable);
298+
if(LOG.isInfoEnabled()){
299+
LOG.info("registe table {} success.", tableInfo.getName());
300+
}
291301
registerTableCache.put(tableInfo.getName(), regTable);
292302
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
293303
} else if (tableInfo instanceof TargetTableInfo) {
@@ -321,7 +331,6 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
321331
StreamExecutionEnvironment.getExecutionEnvironment() :
322332
new MyLocalStreamEnvironment();
323333

324-
env.getConfig().disableClosureCleaner();
325334
env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
326335
Configuration globalJobParameters = new Configuration();
327336
Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.dtstack.flink.sql.exec;
2+
3+
import org.apache.calcite.sql.SqlIdentifier;
4+
import org.apache.calcite.sql.SqlInsert;
5+
import org.apache.calcite.sql.SqlNode;
6+
import org.apache.flink.table.api.Table;
7+
import org.apache.flink.table.api.TableEnvironment;
8+
import org.apache.flink.table.api.TableException;
9+
import org.apache.flink.table.api.ValidationException;
10+
import org.apache.flink.table.api.java.StreamTableEnvironment;
11+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
12+
import org.apache.flink.table.plan.logical.LogicalRelNode;
13+
import org.apache.flink.table.plan.schema.TableSinkTable;
14+
15+
import java.lang.reflect.Method;
16+
17+
/**
18+
* @description: mapping by name when insert into sink table
19+
* @author: maqi
20+
* @create: 2019/08/15 11:09
21+
*/
22+
public class FlinkSQLExec {
23+
24+
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
25+
26+
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
27+
SqlNode insert = planner.parse(stmt);
28+
29+
if (!(insert instanceof SqlInsert)) {
30+
throw new TableException(
31+
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.");
32+
}
33+
SqlNode query = ((SqlInsert) insert).getSource();
34+
35+
SqlNode validatedQuery = planner.validate(query);
36+
37+
Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel));
38+
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
39+
40+
Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
41+
method.setAccessible(true);
42+
43+
TableSinkTable targetTable = (TableSinkTable) method.invoke(tableEnv, targetTableName);
44+
String[] fieldNames = targetTable.tableSink().getFieldNames();
45+
46+
Table newTable = null;
47+
48+
try {
49+
newTable = queryResult.select(String.join(",", fieldNames));
50+
} catch (Exception e) {
51+
throw new ValidationException(
52+
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
53+
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
54+
"TableSink schema: " + String.join(",", fieldNames));
55+
}
56+
57+
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
58+
}
59+
}

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.side;
2222

@@ -28,9 +28,12 @@
2828
import org.apache.flink.configuration.Configuration;
2929
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3030
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
31+
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
3132
import org.apache.flink.types.Row;
3233

34+
import java.util.Collection;
3335
import java.util.Collections;
36+
import java.util.concurrent.TimeoutException;
3437

3538
/**
3639
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -50,6 +53,18 @@ public AsyncReqRow(SideInfo sideInfo){
5053
this.sideInfo = sideInfo;
5154
}
5255

56+
@Override
57+
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
58+
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
59+
try {
60+
if (null == future.get()) {
61+
new TimeoutException("Async function call has timed out.");
62+
}
63+
} catch (Exception e) {
64+
throw new Exception(e);
65+
}
66+
}
67+
5368
private void initCache(){
5469
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
5570
if(sideTableInfo.getCacheType() == null || ECacheType.NONE.name().equalsIgnoreCase(sideTableInfo.getCacheType())){

core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void parseSelectFields(JoinInfo joinInfo){
8989
for( int i=0; i<outFieldInfoList.size(); i++){
9090
FieldInfo fieldInfo = outFieldInfoList.get(i);
9191
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
92-
fields.add(fieldInfo.getFieldName());
92+
fields.add(sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName()));
9393
sideFieldIndex.put(i, sideIndex);
9494
sideFieldNameIndex.put(i, fieldInfo.getFieldName());
9595
sideIndex++;

0 commit comments

Comments
 (0)