Skip to content

Commit d3882da

Browse files
committed
upgrade flink1.9
1 parent 2411ac6 commit d3882da

File tree

22 files changed

+273
-160
lines changed

22 files changed

+273
-160
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2929
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
3030
import org.apache.flink.streaming.api.datastream.DataStream;
31+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
3132
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
3233
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3334
import org.apache.flink.table.sinks.RetractStreamTableSink;
@@ -82,6 +83,11 @@ public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {
8283

8384
@Override
8485
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
86+
consumeDataStream(dataStream);
87+
}
88+
89+
@Override
90+
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
8591
CassandraOutputFormat.CassandraFormatBuilder builder = CassandraOutputFormat.buildOutputFormat();
8692
builder.setAddress(this.address)
8793
.setDatabase(this.database)
@@ -100,7 +106,8 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
100106

101107
CassandraOutputFormat outputFormat = builder.finish();
102108
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
103-
dataStream.addSink(richSinkFunction);
109+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
110+
return dataStreamSink;
104111
}
105112

106113
@Override

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleSink.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2626
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2727
import org.apache.flink.streaming.api.datastream.DataStream;
28+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
2829
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
2930
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3031
import org.apache.flink.table.sinks.RetractStreamTableSink;
@@ -71,16 +72,22 @@ public TypeInformation<?>[] getFieldTypes() {
7172

7273
@Override
7374
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
74-
ConsoleOutputFormat.ConsoleOutputFormatBuilder builder = ConsoleOutputFormat.buildOutputFormat();
75-
builder.setFieldNames(this.fieldNames)
76-
.setFieldTypes(this.fieldTypes);
77-
ConsoleOutputFormat outputFormat = builder.finish();
78-
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
79-
dataStream.addSink(richSinkFunction);
75+
// flink 1.9 use consumeDataStream
8076
}
8177

8278
@Override
8379
public ConsoleSink genStreamSink(TargetTableInfo targetTableInfo) {
8480
return this;
8581
}
82+
83+
@Override
84+
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
85+
ConsoleOutputFormat.ConsoleOutputFormatBuilder builder = ConsoleOutputFormat.buildOutputFormat();
86+
builder.setFieldNames(this.fieldNames)
87+
.setFieldTypes(this.fieldTypes);
88+
ConsoleOutputFormat outputFormat = builder.finish();
89+
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
90+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
91+
return dataStreamSink;
92+
}
8693
}

core/pom.xml

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,28 @@
2323
</properties>
2424

2525
<dependencies>
26+
<!-- blink table module-->
2627
<dependency>
27-
<groupId>junit</groupId>
28-
<artifactId>junit</artifactId>
29-
<version>4.12</version>
30-
<scope>test</scope>
28+
<groupId>org.apache.flink</groupId>
29+
<artifactId>flink-table-common</artifactId>
30+
<version>${flink.version}</version>
3131
</dependency>
3232

3333
<dependency>
34-
<groupId>joda-time</groupId>
35-
<artifactId>joda-time</artifactId>
36-
<version>2.5</version>
34+
<groupId>org.apache.flink</groupId>
35+
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
36+
<version>${flink.version}</version>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.apache.flink</groupId>
41+
<artifactId>flink-table-planner-blink_2.11</artifactId>
42+
<version>${flink.version}</version>
3743
</dependency>
3844

3945
<dependency>
4046
<groupId>org.apache.flink</groupId>
41-
<artifactId>flink-core</artifactId>
47+
<artifactId>flink-table-runtime-blink_2.11</artifactId>
4248
<version>${flink.version}</version>
4349
</dependency>
4450

@@ -56,16 +62,23 @@
5662

5763
<dependency>
5864
<groupId>org.apache.flink</groupId>
59-
<artifactId>flink-table-planner_2.11</artifactId>
65+
<artifactId>flink-cep-scala_2.11</artifactId>
6066
<version>${flink.version}</version>
6167
</dependency>
6268

6369
<dependency>
6470
<groupId>org.apache.flink</groupId>
65-
<artifactId>flink-table-common</artifactId>
71+
<artifactId>flink-yarn_2.11</artifactId>
6672
<version>${flink.version}</version>
6773
</dependency>
6874

75+
<dependency>
76+
<groupId>org.apache.flink</groupId>
77+
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
78+
<version>${flink.version}</version>
79+
</dependency>
80+
81+
6982
<dependency>
7083
<groupId>org.apache.calcite</groupId>
7184
<artifactId>calcite-server</artifactId>
@@ -92,28 +105,16 @@
92105
</dependency>
93106

94107
<dependency>
95-
<groupId>org.apache.flink</groupId>
96-
<artifactId>flink-cep-scala_2.11</artifactId>
97-
<version>${flink.version}</version>
98-
</dependency>
99-
100-
<dependency>
101-
<groupId>org.apache.flink</groupId>
102-
<artifactId>flink-scala_2.11</artifactId>
103-
<version>${flink.version}</version>
104-
</dependency>
105-
106-
<dependency>
107-
<groupId>org.apache.flink</groupId>
108-
<artifactId>flink-yarn_2.11</artifactId>
109-
<version>${flink.version}</version>
108+
<groupId>junit</groupId>
109+
<artifactId>junit</artifactId>
110+
<version>4.12</version>
111+
<scope>test</scope>
110112
</dependency>
111113

112-
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->
113114
<dependency>
114-
<groupId>org.apache.flink</groupId>
115-
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
116-
<version>${flink.version}</version>
115+
<groupId>joda-time</groupId>
116+
<artifactId>joda-time</artifactId>
117+
<version>2.5</version>
117118
</dependency>
118119

119120
</dependencies>

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import com.dtstack.flink.sql.enums.ClusterMode;
2727
import com.dtstack.flink.sql.enums.ECacheType;
2828
import com.dtstack.flink.sql.enums.EPluginLoadMode;
29+
//import com.dtstack.flink.sql.exec.FlinkSQLExec;
2930
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
30-
import com.dtstack.flink.sql.exec.FlinkSQLExec;
3131
import com.dtstack.flink.sql.option.OptionParser;
3232
import com.dtstack.flink.sql.parser.CreateFuncParser;
3333
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
@@ -66,8 +66,9 @@
6666
import org.apache.flink.streaming.api.datastream.DataStream;
6767
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
6868
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
69-
import org.apache.flink.table.api.StreamQueryConfig;
69+
import org.apache.flink.table.api.EnvironmentSettings;
7070
import org.apache.flink.table.api.Table;
71+
import org.apache.flink.table.api.TableEnvironment;
7172
import org.apache.flink.table.api.java.StreamTableEnvironment;
7273
import org.apache.flink.table.sinks.TableSink;
7374
import org.apache.flink.types.Row;
@@ -187,7 +188,8 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
187188
//sql-dimensional table contains the dimension table of execution
188189
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
189190
}else{
190-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
191+
// FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
192+
tableEnv.sqlUpdate(result.getExecSql());
191193
if(LOG.isInfoEnabled()){
192194
LOG.info("exec sql: " + result.getExecSql());
193195
}
@@ -216,7 +218,7 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
216218
}
217219
}
218220

219-
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTableEnvironment tableEnv)
221+
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, TableEnvironment tableEnv)
220222
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
221223
//register urf
222224
// udf和tableEnv须由同一个类加载器加载
@@ -249,7 +251,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
249251
String adaptSql = sourceTableInfo.getAdaptSelectSql();
250252
Table adaptTable = adaptSql == null ? table : tableEnv.sqlQuery(adaptSql);
251253

252-
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
254+
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
253255
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
254256
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
255257
.returns(typeInfo);
@@ -353,12 +355,16 @@ private static StreamExecutionEnvironment getStreamExeEnv(Properties confPropert
353355
* 获取StreamTableEnvironment并设置相关属性
354356
*
355357
* @param confProperties
356-
* @param env
357358
* @return
358359
*/
359360
private static StreamTableEnvironment getStreamTableEnv(Properties confProperties, StreamExecutionEnvironment env) {
361+
EnvironmentSettings settings = EnvironmentSettings.newInstance()
362+
.useBlinkPlanner()
363+
.inStreamingMode()
364+
.build();
365+
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
366+
360367
confProperties = PropertiesUtils.propertiesTrim(confProperties);
361-
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
362368
FlinkUtil.setTableEnvTTL(confProperties, tableEnv);
363369
return tableEnv;
364370
}

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ public JobExecutionResult execute(String jobName) throws Exception {
9393
// transform the streaming program into a JobGraph
9494
StreamGraph streamGraph = getStreamGraph();
9595
streamGraph.setJobName(jobName);
96+
return execute(streamGraph);
97+
}
9698

99+
@Override
100+
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
97101
JobGraph jobGraph = streamGraph.getJobGraph();
98102
jobGraph.setClasspaths(classpaths);
99103

0 commit comments

Comments
 (0)