Skip to content

Commit 4a87626

Browse files
committed
Merge branch 'v1.8.0_dev_merge1.5.0' into 'v1.8.0_dev'
V1.8.0 dev merge1.5.0 1.8 合并1.5 , 调整按名称插入时的api See merge request !53
2 parents 838f481 + e8db082 commit 4a87626

File tree

32 files changed

+517
-182
lines changed

32 files changed

+517
-182
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ target/
1010
plugins/
1111
lib/
1212
.vertx/
13+
.DS_Store
1314
bin/nohup.out
14-
15+
.DS_Store
1516
bin/sideSql.txt

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public void onFailure(Throwable t) {
256256
t.getMessage());
257257
System.out.println("Failed to retrieve the data: " + t.getMessage());
258258
cluster.closeAsync();
259-
resultFuture.complete(null);
259+
resultFuture.completeExceptionally(t);
260260
}
261261
});
262262
}

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@
2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
2424
import com.dtstack.flink.sql.enums.ECacheType;
2525
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
26-
import com.dtstack.flink.sql.parser.*;
26+
import com.dtstack.flink.sql.exec.FlinkSQLExec;
27+
import com.dtstack.flink.sql.parser.CreateFuncParser;
28+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
29+
import com.dtstack.flink.sql.parser.InsertSqlParser;
30+
import com.dtstack.flink.sql.parser.SqlParser;
31+
import com.dtstack.flink.sql.parser.SqlTree;
2732
import com.dtstack.flink.sql.side.SideSqlExec;
2833
import com.dtstack.flink.sql.side.SideTableInfo;
2934
import com.dtstack.flink.sql.table.SourceTableInfo;
@@ -49,19 +54,21 @@
4954
import org.apache.flink.api.common.typeinfo.TypeInformation;
5055
import org.apache.flink.api.java.tuple.Tuple2;
5156
import org.apache.flink.api.java.typeutils.RowTypeInfo;
52-
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
53-
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
54-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
55-
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
56-
import org.apache.flink.calcite.shaded.com.google.common.collect.Sets;
5757
import org.apache.flink.client.program.ContextEnvironment;
5858
import org.apache.flink.configuration.Configuration;
59+
import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
60+
import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
61+
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
62+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
63+
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
5964
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
6065
import org.apache.flink.streaming.api.datastream.DataStream;
6166
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
67+
6268
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
6369
import org.apache.flink.table.api.Table;
6470
import org.apache.flink.table.api.java.StreamTableEnvironment;
71+
6572
import org.apache.flink.table.sinks.TableSink;
6673
import org.apache.flink.types.Row;
6774
import org.slf4j.Logger;
@@ -205,7 +212,10 @@ public static void main(String[] args) throws Exception {
205212
//sql-dimensional table contains the dimension table of execution
206213
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
207214
}else{
208-
tableEnv.sqlUpdate(result.getExecSql());
215+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
216+
if(LOG.isInfoEnabled()){
217+
LOG.info("exec sql: " + result.getExecSql());
218+
}
209219
}
210220
}
211221
}
@@ -288,6 +298,9 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
288298

289299
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
290300
tableEnv.registerTable(tableInfo.getName(), regTable);
301+
if(LOG.isInfoEnabled()){
302+
LOG.info("registe table {} success.", tableInfo.getName());
303+
}
291304
registerTableCache.put(tableInfo.getName(), regTable);
292305
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, remoteSqlPluginPath, localSqlPluginPath));
293306
} else if (tableInfo instanceof TargetTableInfo) {
@@ -320,7 +333,6 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
320333
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
321334
StreamExecutionEnvironment.getExecutionEnvironment() :
322335
new MyLocalStreamEnvironment();
323-
324336
env.getConfig().disableClosureCleaner();
325337
env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
326338
Configuration globalJobParameters = new Configuration();
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.dtstack.flink.sql.exec;
2+
3+
import org.apache.calcite.sql.SqlIdentifier;
4+
import org.apache.calcite.sql.SqlInsert;
5+
import org.apache.calcite.sql.SqlNode;
6+
import org.apache.flink.table.api.Table;
7+
import org.apache.flink.table.api.TableEnvironment;
8+
import org.apache.flink.table.api.TableException;
9+
import org.apache.flink.table.api.ValidationException;
10+
import org.apache.flink.table.api.java.StreamTableEnvironment;
11+
import org.apache.flink.table.calcite.FlinkPlannerImpl;
12+
import org.apache.flink.table.plan.logical.LogicalRelNode;
13+
import org.apache.flink.table.plan.schema.TableSinkTable;
14+
import org.apache.flink.table.plan.schema.TableSourceSinkTable;
15+
import scala.Option;
16+
17+
import java.lang.reflect.Method;
18+
19+
/**
20+
* @description: mapping by name when insert into sink table
21+
* @author: maqi
22+
* @create: 2019/08/15 11:09
23+
*/
24+
public class FlinkSQLExec {
25+
26+
public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throws Exception {
27+
28+
FlinkPlannerImpl planner = new FlinkPlannerImpl(tableEnv.getFrameworkConfig(), tableEnv.getPlanner(), tableEnv.getTypeFactory());
29+
SqlNode insert = planner.parse(stmt);
30+
31+
if (!(insert instanceof SqlInsert)) {
32+
throw new TableException(
33+
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.");
34+
}
35+
SqlNode query = ((SqlInsert) insert).getSource();
36+
37+
SqlNode validatedQuery = planner.validate(query);
38+
39+
Table queryResult = new Table(tableEnv, new LogicalRelNode(planner.rel(validatedQuery).rel));
40+
String targetTableName = ((SqlIdentifier) ((SqlInsert) insert).getTargetTable()).names.get(0);
41+
42+
Method method = TableEnvironment.class.getDeclaredMethod("getTable", String.class);
43+
method.setAccessible(true);
44+
Option sinkTab = (Option)method.invoke(tableEnv, targetTableName);
45+
46+
if (sinkTab.isEmpty()) {
47+
throw new ValidationException("Sink table " + targetTableName + "not found in flink");
48+
}
49+
50+
TableSourceSinkTable targetTable = (TableSourceSinkTable) sinkTab.get();
51+
TableSinkTable tableSinkTable = (TableSinkTable)targetTable.tableSinkTable().get();
52+
String[] fieldNames = tableSinkTable.tableSink().getFieldNames();
53+
54+
Table newTable = null;
55+
try {
56+
newTable = queryResult.select(String.join(",", fieldNames));
57+
} catch (Exception e) {
58+
throw new ValidationException(
59+
"Field name of query result and registered TableSink "+targetTableName +" do not match.\n" +
60+
"Query result schema: " + String.join(",", queryResult.getSchema().getColumnNames()) + "\n" +
61+
"TableSink schema: " + String.join(",", fieldNames));
62+
}
63+
64+
tableEnv.insertInto(newTable, targetTableName, tableEnv.queryConfig());
65+
}
66+
}

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.side;
2222

@@ -28,9 +28,12 @@
2828
import org.apache.flink.configuration.Configuration;
2929
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3030
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
31+
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
3132
import org.apache.flink.types.Row;
3233

34+
import java.util.Collection;
3335
import java.util.Collections;
36+
import java.util.concurrent.TimeoutException;
3437

3538
/**
3639
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -50,6 +53,18 @@ public AsyncReqRow(SideInfo sideInfo){
5053
this.sideInfo = sideInfo;
5154
}
5255

56+
@Override
57+
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
58+
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
59+
try {
60+
if (null == future.get()) {
61+
new TimeoutException("Async function call has timed out.");
62+
}
63+
} catch (Exception e) {
64+
throw new Exception(e);
65+
}
66+
}
67+
5368
private void initCache(){
5469
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
5570
if(sideTableInfo.getCacheType() == null || ECacheType.NONE.name().equalsIgnoreCase(sideTableInfo.getCacheType())){

core/src/main/java/com/dtstack/flink/sql/side/SideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void parseSelectFields(JoinInfo joinInfo){
8989
for( int i=0; i<outFieldInfoList.size(); i++){
9090
FieldInfo fieldInfo = outFieldInfoList.get(i);
9191
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
92-
fields.add(fieldInfo.getFieldName());
92+
fields.add(sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName()));
9393
sideFieldIndex.put(i, sideIndex);
9494
sideFieldNameIndex.put(i, fieldInfo.getFieldName());
9595
sideIndex++;

0 commit comments

Comments
 (0)