Skip to content

Commit 8bcc35e

Browse files
author
yanxi0227
committed
support flink180
1 parent 5fb4098 commit 8bcc35e

File tree

14 files changed

+57
-53
lines changed

14 files changed

+57
-53
lines changed

core/pom.xml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,22 @@
5252
<version>${flink.version}</version>
5353
</dependency>
5454

55-
<dependency>
55+
<!--<dependency>
5656
<groupId>org.apache.flink</groupId>
5757
<artifactId>flink-table_2.11</artifactId>
58-
<version>${flink.version}</version>
58+
<version>1.7.2</version>
59+
</dependency>-->
60+
61+
<dependency>
62+
<groupId>org.apache.flink</groupId>
63+
<artifactId>flink-table-planner_2.11</artifactId>
64+
<version>1.8.0</version>
65+
</dependency>
66+
67+
<dependency>
68+
<groupId>org.apache.flink</groupId>
69+
<artifactId>flink-table-common</artifactId>
70+
<version>1.8.0</version>
5971
</dependency>
6072

6173
<dependency>
@@ -80,7 +92,7 @@
8092
<dependency>
8193
<groupId>org.apache.flink</groupId>
8294
<artifactId>flink-shaded-hadoop2</artifactId>
83-
<version>${flink.version}</version>
95+
<version>1.7.2</version>
8496
</dependency>
8597

8698
<dependency>

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import org.apache.flink.api.common.InvalidProgramException;
2222
import org.apache.flink.api.common.JobExecutionResult;
2323
import org.apache.flink.api.java.ExecutionEnvironment;
24-
import org.apache.flink.configuration.ConfigConstants;
2524
import org.apache.flink.configuration.Configuration;
2625
import org.apache.flink.configuration.TaskManagerOptions;
2726
import org.apache.flink.runtime.jobgraph.JobGraph;
28-
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
27+
import org.apache.flink.runtime.minicluster.MiniCluster;
28+
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
2929
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
3030
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3131
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -106,18 +106,21 @@ public JobExecutionResult execute(String jobName) throws Exception {
106106
// add (and override) the settings with what the user defined
107107
configuration.addAll(this.conf);
108108

109+
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
110+
configBuilder.setConfiguration(configuration);
111+
109112
if (LOG.isInfoEnabled()) {
110113
LOG.info("Running job on local embedded Flink mini cluster");
111114
}
112115

113-
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
116+
MiniCluster exec = new MiniCluster(configBuilder.build());
114117
try {
115118
exec.start();
116-
return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
119+
return exec.executeJobBlocking(jobGraph);
117120
}
118121
finally {
119122
transformations.clear();
120-
exec.stop();
123+
exec.closeAsync();
121124
}
122125
}
123126
}

core/src/main/java/com/dtstack/flink/yarn/YarnClusterDescriptor.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.configuration.JobManagerOptions;
2424
import org.apache.flink.runtime.jobgraph.JobGraph;
2525
import org.apache.flink.yarn.*;
26+
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
2627
import org.apache.hadoop.fs.FileStatus;
2728
import org.apache.hadoop.fs.FileSystem;
2829
import org.apache.hadoop.fs.Path;
@@ -86,7 +87,7 @@ public YarnClusterDescriptor(
8687
@Override
8788
protected String getYarnSessionClusterEntrypoint()
8889
{
89-
return YarnApplicationMasterRunner.class.getName();
90+
return YarnSessionClusterEntrypoint.class.getName();
9091
}
9192

9293
/**
@@ -95,7 +96,7 @@ protected String getYarnSessionClusterEntrypoint()
9596
@Override
9697
protected String getYarnJobClusterEntrypoint()
9798
{
98-
return YarnApplicationMasterRunner.class.getName();
99+
return YarnSessionClusterEntrypoint.class.getName();
99100
}
100101

101102
@Override
@@ -113,7 +114,7 @@ public YarnClient getYarnClient()
113114
return this.yarnClient;
114115
}
115116

116-
public YarnClusterClient deploy()
117+
public RestClusterClient deploy()
117118
{
118119
ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class);
119120
context.setApplicationId(yarnAppId);
@@ -124,10 +125,13 @@ public YarnClusterClient deploy()
124125
conf.setString(JobManagerOptions.ADDRESS.key(), report.getHost());
125126
conf.setInteger(JobManagerOptions.PORT.key(), report.getRpcPort());
126127

127-
return new YarnClusterClient(this,
128+
/*return new RestClusterClient(this,
128129
appConf.getTaskManagerCount(),
129130
appConf.getTaskManagerSlots(),
130-
report, conf, false);
131+
report, conf, false);*/
132+
return new RestClusterClient<>(
133+
conf,
134+
report.getApplicationId());
131135
}
132136
catch (Exception e) {
133137
throw new RuntimeException(e);

kafka08/kafka08-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerCsvSerialization.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.apache.flink.api.common.ExecutionConfig;
66
import org.apache.flink.api.common.typeinfo.TypeInformation;
77
import org.apache.flink.api.common.typeutils.TypeSerializer;
8+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
89
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
910
import org.apache.flink.core.memory.DataInputView;
1011
import org.apache.flink.core.memory.DataOutputView;
@@ -116,13 +117,7 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
116117
}
117118

118119
@Override
119-
public boolean canEqual(Object obj) {
120-
return obj instanceof CustomerCsvSerialization;
121-
}
122-
123-
@Override
124-
protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
125-
return super.isCompatibleSerializationFormatIdentifier(identifier)
126-
|| identifier.equals(StringValue.class.getCanonicalName());
120+
public TypeSerializerSnapshot<Row> snapshotConfiguration() {
121+
return null;
127122
}
128123
}

kafka08/kafka08-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.formats.json.JsonRowSerializationSchema;
2829
import org.apache.flink.streaming.api.datastream.DataStream;
2930
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;
3031
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
3132
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
3233
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
33-
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
3434
import org.apache.flink.table.api.TableSchema;
3535
import org.apache.flink.table.sinks.AppendStreamTableSink;
3636
import org.apache.flink.table.sinks.TableSink;

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerCsvSerialization.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.apache.flink.api.common.ExecutionConfig;
55
import org.apache.flink.api.common.typeinfo.TypeInformation;
66
import org.apache.flink.api.common.typeutils.TypeSerializer;
7+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
78
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
89
import org.apache.flink.core.memory.DataInputView;
910
import org.apache.flink.core.memory.DataOutputView;
@@ -113,13 +114,7 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
113114
}
114115

115116
@Override
116-
public boolean canEqual(Object obj) {
117-
return obj instanceof CustomerCsvSerialization;
118-
}
119-
120-
@Override
121-
protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
122-
return super.isCompatibleSerializationFormatIdentifier(identifier)
123-
|| identifier.equals(StringValue.class.getCanonicalName());
117+
public TypeSerializerSnapshot<Row> snapshotConfiguration() {
118+
return null;
124119
}
125120
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.formats.json.JsonRowSerializationSchema;
2829
import org.apache.flink.streaming.api.datastream.DataStream;
2930
import org.apache.flink.streaming.connectors.kafka.Kafka09TableSink;
3031
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
3132
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
3233
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
33-
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
3434
import org.apache.flink.table.api.TableSchema;
3535
import org.apache.flink.table.sinks.AppendStreamTableSink;
3636
import org.apache.flink.table.sinks.TableSink;

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerCsvSerialization.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.apache.flink.api.common.ExecutionConfig;
55
import org.apache.flink.api.common.typeinfo.TypeInformation;
66
import org.apache.flink.api.common.typeutils.TypeSerializer;
7+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
78
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
89
import org.apache.flink.core.memory.DataInputView;
910
import org.apache.flink.core.memory.DataOutputView;
@@ -115,13 +116,7 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
115116
}
116117

117118
@Override
118-
public boolean canEqual(Object obj) {
119-
return obj instanceof CustomerCsvSerialization;
120-
}
121-
122-
@Override
123-
protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
124-
return super.isCompatibleSerializationFormatIdentifier(identifier)
125-
|| identifier.equals(StringValue.class.getCanonicalName());
119+
public TypeSerializerSnapshot<Row> snapshotConfiguration() {
120+
return null;
126121
}
127122
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.formats.json.JsonRowSerializationSchema;
2829
import org.apache.flink.streaming.api.datastream.DataStream;
2930
import org.apache.flink.streaming.connectors.kafka.Kafka010TableSink;
3031
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
3132
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
3233
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
33-
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
3434
import org.apache.flink.table.api.TableSchema;
3535
import org.apache.flink.table.sinks.AppendStreamTableSink;
3636
import org.apache.flink.table.sinks.TableSink;

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerCsvSerialization.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.common.ExecutionConfig;
2222
import org.apache.flink.api.common.typeinfo.TypeInformation;
2323
import org.apache.flink.api.common.typeutils.TypeSerializer;
24+
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
2425
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
2526
import org.apache.flink.core.memory.DataInputView;
2627
import org.apache.flink.core.memory.DataOutputView;
@@ -131,13 +132,7 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
131132
}
132133

133134
@Override
134-
public boolean canEqual(Object obj) {
135-
return obj instanceof CustomerCsvSerialization;
136-
}
137-
138-
@Override
139-
protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
140-
return super.isCompatibleSerializationFormatIdentifier(identifier)
141-
|| identifier.equals(StringValue.class.getCanonicalName());
135+
public TypeSerializerSnapshot<Row> snapshotConfiguration() {
136+
return null;
142137
}
143138
}

0 commit comments

Comments
 (0)