Skip to content

Commit 2ebf317

Browse files
author
yanxi0227
committed
fix conflict
1 parent d317786 commit 2ebf317

File tree

12 files changed

+72
-234
lines changed

12 files changed

+72
-234
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,15 +141,8 @@ public static void main(String[] args) throws Exception {
141141
}
142142

143143
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
144-
DtClassLoader dtClassLoader = new DtClassLoader(new URL[]{}, threadClassLoader);
145-
Thread.currentThread().setContextClassLoader(dtClassLoader);
146-
147-
URLClassLoader parentClassloader;
148-
if(!ClusterMode.local.name().equals(deployMode)){
149-
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
150-
}else{
151-
parentClassloader = dtClassLoader;
152-
}
144+
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
145+
Thread.currentThread().setContextClassLoader(parentClassloader);
153146

154147
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
155148
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
@@ -220,7 +213,7 @@ public static void main(String[] args) throws Exception {
220213

221214
if(env instanceof MyLocalStreamEnvironment) {
222215
List<URL> urlList = new ArrayList<>();
223-
urlList.addAll(Arrays.asList(dtClassLoader.getURLs()));
216+
urlList.addAll(Arrays.asList(parentClassloader.getURLs()));
224217
((MyLocalStreamEnvironment) env).setClasspaths(urlList);
225218
}
226219

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.dtstack.flink.sql.util.PluginUtil;
2828
import org.apache.commons.lang3.BooleanUtils;
2929
import org.apache.commons.lang3.StringUtils;
30-
import org.apache.flink.api.common.functions.RuntimeContext;
3130
import org.apache.flink.api.common.typeinfo.TypeInformation;
3231
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3332
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

kafka09/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
<modules>
1717
<module>kafka09-source</module>
18-
<module>kafka09-sink</module>
18+
<!--<module>kafka09-sink</module>-->
1919
</modules>
2020

2121
<dependencies>

kafka10/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
<modules>
1616
<module>kafka10-source</module>
17-
<module>kafka10-sink</module>
17+
<!--<module>kafka10-sink</module>-->
1818
</modules>
1919

2020
<dependencies>

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerCsvSerialization.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,4 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
135135
public TypeSerializerSnapshot<Row> snapshotConfiguration() {
136136
return null;
137137
}
138-
}
138+
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka11JsonTableSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.flink.api.common.serialization.SerializationSchema;
2121
import org.apache.flink.streaming.api.datastream.DataStream;
2222
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
23-
import org.apache.flink.streaming.connectors.kafka.KafkaJsonTableSink;
23+
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink;
2424
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
2525
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
2626
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -39,7 +39,7 @@
3939
*
4040
* @author maqi
4141
*/
42-
public class CustomerKafka11JsonTableSink extends KafkaJsonTableSink {
42+
public class CustomerKafka11JsonTableSink extends Kafka011TableSink {
4343

4444

4545
protected SerializationSchema schema;

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,22 @@
2222
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
2323
import com.dtstack.flink.sql.table.TargetTableInfo;
2424
import org.apache.flink.api.common.serialization.SerializationSchema;
25+
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
2526
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.formats.json.JsonRowSerializationSchema;
2729
import org.apache.flink.streaming.api.datastream.DataStream;
30+
31+
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink;
2832
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
33+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
34+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
35+
import org.apache.flink.table.api.TableSchema;
2936
import org.apache.flink.table.sinks.AppendStreamTableSink;
3037
import org.apache.flink.table.sinks.TableSink;
3138
import org.apache.flink.types.Row;
39+
40+
import java.util.Optional;
3241
import java.util.Properties;
3342

3443
/**
@@ -47,38 +56,62 @@ public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<
4756

4857
protected TypeInformation<?>[] fieldTypes;
4958

59+
/** The schema of the table. */
60+
private TableSchema schema;
61+
62+
/** The Kafka topic to write to. */
5063
protected String topic;
5164

65+
/** Properties for the Kafka producer. */
5266
protected Properties properties;
5367

5468
/** Serialization schema for encoding records to Kafka. */
5569
protected SerializationSchema serializationSchema;
5670

71+
/** Partitioner to select Kafka partition for each item. */
72+
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
73+
5774
@Override
5875
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
59-
KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
60-
this.topic = kafka11SinkTableInfo.getTopic();
61-
this.fieldNames = kafka11SinkTableInfo.getFields();
62-
TypeInformation[] types = new TypeInformation[kafka11SinkTableInfo.getFields().length];
63-
for (int i = 0; i < kafka11SinkTableInfo.getFieldClasses().length; i++) {
64-
types[i] = TypeInformation.of(kafka11SinkTableInfo.getFieldClasses()[i]);
76+
KafkaSinkTableInfo kafka011SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
77+
this.topic = kafka011SinkTableInfo.getKafkaParam("topic");
78+
79+
Properties props = new Properties();
80+
for (String key:kafka011SinkTableInfo.getKafkaParamKeys()) {
81+
props.setProperty(key, kafka011SinkTableInfo.getKafkaParam(key));
82+
}
83+
this.properties = props;
84+
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
85+
this.fieldNames = kafka011SinkTableInfo.getFields();
86+
TypeInformation[] types = new TypeInformation[kafka011SinkTableInfo.getFields().length];
87+
for(int i = 0; i< kafka011SinkTableInfo.getFieldClasses().length; i++){
88+
types[i] = TypeInformation.of(kafka011SinkTableInfo.getFieldClasses()[i]);
6589
}
6690
this.fieldTypes = types;
6791

68-
properties = new Properties();
69-
for (String key : kafka11SinkTableInfo.getKafkaParamKeys()) {
70-
properties.setProperty(key, kafka11SinkTableInfo.getKafkaParam(key));
92+
TableSchema.Builder schemaBuilder = TableSchema.builder();
93+
for (int i=0;i<fieldNames.length;i++){
94+
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
95+
}
96+
this.schema = schemaBuilder.build();
97+
98+
//this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
99+
if ("json".equalsIgnoreCase(kafka011SinkTableInfo.getSinkDataType())) {
100+
this.serializationSchema = new JsonRowSerializationSchema(getOutputType());
101+
} else if ("csv".equalsIgnoreCase(kafka011SinkTableInfo.getSinkDataType())){
102+
this.serializationSchema = new TypeInformationSerializationSchema(TypeInformation.of(Row.class),
103+
new CustomerCsvSerialization(kafka011SinkTableInfo.getFieldDelimiter(),fieldTypes));
71104
}
72-
properties.setProperty("bootstrap.servers", kafka11SinkTableInfo.getBootstrapServers());
73-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
74105
return this;
75106
}
76107

77108
@Override
78109
public void emitDataStream(DataStream<Row> dataStream) {
79-
KafkaTableSinkBase kafkaTableSink = new CustomerKafka11JsonTableSink(
110+
KafkaTableSinkBase kafkaTableSink = new Kafka011TableSink(
111+
schema,
80112
topic,
81113
properties,
114+
partitioner,
82115
serializationSchema
83116
);
84117

@@ -106,4 +139,5 @@ public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldT
106139
this.fieldTypes = fieldTypes;
107140
return this;
108141
}
142+
109143
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,19 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
3838
KafkaSinkTableInfo kafka11SinkTableInfo = new KafkaSinkTableInfo();
3939
kafka11SinkTableInfo.setName(tableName);
4040
parseFieldsInfo(fieldsInfo, kafka11SinkTableInfo);
41+
kafka11SinkTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase())));
42+
if (props.get(KafkaSinkTableInfo.SINK_DATA_TYPE) != null) {
43+
kafka11SinkTableInfo.setSinkDataType(props.get(KafkaSinkTableInfo.SINK_DATA_TYPE).toString());
44+
}
45+
if (props.get(KafkaSinkTableInfo.FIELD_DELINITER) != null) {
46+
kafka11SinkTableInfo.setFieldDelimiter(props.get(KafkaSinkTableInfo.FIELD_DELINITER).toString());
47+
}
4148

42-
43-
kafka11SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
44-
kafka11SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
45-
for (String key : props.keySet()) {
49+
for (String key:props.keySet()) {
4650
if (!key.isEmpty() && key.startsWith("kafka.")) {
4751
kafka11SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
4852
}
4953
}
50-
kafka11SinkTableInfo.check();
5154
return kafka11SinkTableInfo;
5255
}
53-
}
56+
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 8 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package com.dtstack.flink.sql.sink.kafka.table;
2020

2121
import com.dtstack.flink.sql.table.TargetTableInfo;
22-
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
22+
import com.google.common.base.Preconditions;
2323

2424
import java.util.HashMap;
2525
import java.util.Map;
@@ -36,20 +36,13 @@
3636
public class KafkaSinkTableInfo extends TargetTableInfo {
3737
//version
3838
private static final String CURR_TYPE = "kafka11";
39-
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
4039

41-
public static final String TOPIC_KEY = "topic";
42-
43-
private String bootstrapServers;
44-
45-
private String topic;
46-
47-
public Map<String,String> kafkaParam = new HashMap<String,String>();
48-
49-
public KafkaSinkTableInfo() {
40+
public KafkaSinkTableInfo(){
5041
super.setType(CURR_TYPE);
5142
}
5243

44+
public Map<String,String> kafkaParam = new HashMap<String,String>();
45+
5346
public void addKafkaParam(String key,String value){
5447
kafkaParam.put(key,value);
5548
}
@@ -62,34 +55,16 @@ public Set<String> getKafkaParamKeys(){
6255
return kafkaParam.keySet();
6356
}
6457

65-
66-
67-
public String getBootstrapServers() {
68-
return bootstrapServers;
69-
}
70-
71-
public void setBootstrapServers(String bootstrapServers) {
72-
this.bootstrapServers = bootstrapServers;
73-
}
74-
75-
public String getTopic() {
76-
return topic;
77-
}
78-
79-
public void setTopic(String topic) {
80-
this.topic = topic;
81-
}
82-
8358
@Override
8459
public boolean check() {
85-
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
86-
Preconditions.checkNotNull(topic, "kafka of topic is required");
60+
Preconditions.checkNotNull(kafkaParam.get("bootstrap.servers"), "kafka of bootstrapServers is required");
61+
Preconditions.checkNotNull(kafkaParam.get("topic"), "kafka of topic is required");
62+
//Preconditions.checkNotNull(kafkaParam.get("groupId"), "kafka of groupId is required");
8763
return false;
8864
}
8965

9066
@Override
9167
public String getType() {
92-
// return super.getType() + SOURCE_SUFFIX;
9368
return super.getType();
9469
}
95-
}
70+
}

0 commit comments

Comments
 (0)