Skip to content

Commit 838f481

Browse files
committed
Merge branch 'v1.8.0_dev_kafkasink' into 'v1.8.0_dev'
V1.8.0 dev kafkasink flink1.8 kafkasink See merge request !47
2 parents 03bc6b6 + 21ad9a7 commit 838f481

File tree

14 files changed

+177
-240
lines changed

14 files changed

+177
-240
lines changed

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka09JsonTableSink.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import org.apache.flink.api.common.serialization.SerializationSchema;
2121
import org.apache.flink.streaming.api.datastream.DataStream;
2222
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
23-
import org.apache.flink.streaming.connectors.kafka.*;
24-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
23+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
24+
import org.apache.flink.streaming.connectors.kafka.Kafka09TableSink;
2525
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
2626
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
27-
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
28-
import org.apache.flink.table.util.TableConnectorUtil;
27+
import org.apache.flink.table.api.TableSchema;
28+
import org.apache.flink.table.utils.TableConnectorUtils;
2929
import org.apache.flink.types.Row;
3030

3131
import java.util.Optional;
@@ -38,30 +38,26 @@
3838
*
3939
* @author maqi
4040
*/
41-
public class CustomerKafka09JsonTableSink extends KafkaJsonTableSink {
41+
public class CustomerKafka09JsonTableSink extends Kafka09TableSink {
4242

4343

4444
protected SerializationSchema schema;
4545

46-
public CustomerKafka09JsonTableSink(String topic, Properties properties, SerializationSchema schema) {
47-
super(topic, properties, new FlinkFixedPartitioner<>());
48-
this.schema = schema;
49-
}
5046

51-
public CustomerKafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, SerializationSchema schema) {
52-
super(topic, properties, partitioner);
53-
this.schema = schema;
54-
}
5547

5648

5749
@Deprecated
58-
public CustomerKafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, SerializationSchema schema) {
59-
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
60-
this.schema = schema;
50+
public CustomerKafka09JsonTableSink(TableSchema schema,
51+
String topic,
52+
Properties properties,
53+
Optional<FlinkKafkaPartitioner<Row>> partitioner,
54+
SerializationSchema<Row> serializationSchema) {
55+
super(schema, topic, properties, partitioner, serializationSchema);
56+
this.schema = serializationSchema;
6157
}
6258

6359
@Override
64-
protected SinkFunction<Row> createKafkaProducer(String s, Properties properties, SerializationSchema<Row> serializationSchema, Optional<FlinkKafkaPartitioner<Row>> optional) {
60+
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String s, Properties properties, SerializationSchema<Row> serializationSchema, Optional<FlinkKafkaPartitioner<Row>> optional) {
6561
return new CustomerFlinkKafkaProducer09<>(topic, serializationSchema, properties);
6662
}
6763

@@ -70,6 +66,6 @@ public void emitDataStream(DataStream<Row> dataStream) {
7066
SinkFunction<Row> kafkaProducer = createKafkaProducer(topic, properties, schema, partitioner);
7167
// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
7268
//kafkaProducer.setFlushOnCheckpoint(true);
73-
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
69+
dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
7470
}
7571
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,14 @@
2626
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2727
import org.apache.flink.streaming.api.datastream.DataStream;
2828
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
29+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
30+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
31+
import org.apache.flink.table.api.TableSchema;
2932
import org.apache.flink.table.sinks.AppendStreamTableSink;
3033
import org.apache.flink.table.sinks.TableSink;
3134
import org.apache.flink.types.Row;
35+
36+
import java.util.Optional;
3237
import java.util.Properties;
3338

3439
/**
@@ -44,39 +49,63 @@ public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<K
4449

4550
protected TypeInformation<?>[] fieldTypes;
4651

52+
/** The schema of the table. */
53+
private TableSchema schema;
54+
55+
/** The Kafka topic to write to. */
4756
protected String topic;
4857

58+
/** Properties for the Kafka producer. */
4959
protected Properties properties;
5060

5161
/** Serialization schema for encoding records to Kafka. */
5262
protected SerializationSchema serializationSchema;
5363

64+
/** Partitioner to select Kafka partition for each item. */
65+
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
66+
5467
@Override
5568
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
69+
5670
KafkaSinkTableInfo kafka09SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
5771
this.topic = kafka09SinkTableInfo.getTopic();
72+
73+
Properties props = new Properties();
74+
props.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
75+
76+
for (String key:kafka09SinkTableInfo.getKafkaParamKeys()) {
77+
props.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
78+
}
79+
this.properties = props;
80+
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
5881
this.fieldNames = kafka09SinkTableInfo.getFields();
5982
TypeInformation[] types = new TypeInformation[kafka09SinkTableInfo.getFields().length];
60-
for (int i = 0; i < kafka09SinkTableInfo.getFieldClasses().length; i++) {
83+
for(int i = 0; i< kafka09SinkTableInfo.getFieldClasses().length; i++){
6184
types[i] = TypeInformation.of(kafka09SinkTableInfo.getFieldClasses()[i]);
6285
}
6386
this.fieldTypes = types;
6487

65-
properties = new Properties();
66-
for (String key : kafka09SinkTableInfo.getKafkaParamKeys()) {
67-
properties.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
88+
TableSchema.Builder schemaBuilder = TableSchema.builder();
89+
for (int i=0;i<fieldNames.length;i++){
90+
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
91+
}
92+
this.schema = schemaBuilder.build();
93+
94+
//this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
95+
if ("json".equalsIgnoreCase(kafka09SinkTableInfo.getSinkDataType())) {
96+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
6897
}
69-
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
7098

71-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
7299
return this;
73100
}
74101

75102
@Override
76103
public void emitDataStream(DataStream<Row> dataStream) {
77104
KafkaTableSinkBase kafkaTableSink = new CustomerKafka09JsonTableSink(
105+
schema,
78106
topic,
79107
properties,
108+
partitioner,
80109
serializationSchema
81110
);
82111

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
3636
//version
3737
private static final String CURR_TYPE = "kafka09";
3838

39+
public KafkaSinkTableInfo() {
40+
super.setType(CURR_TYPE);
41+
}
42+
3943
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
4044

4145
public static final String TOPIC_KEY = "topic";
@@ -46,9 +50,6 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
4650

4751
public Map<String,String> kafkaParam = new HashMap<String,String>();
4852

49-
public KafkaSinkTableInfo() {
50-
super.setType(CURR_TYPE);
51-
}
5253

5354
public void addKafkaParam(String key,String value){
5455
kafkaParam.put(key,value);

kafka09/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
<modules>
1717
<module>kafka09-source</module>
18-
<!--<module>kafka09-sink</module>-->
18+
<module>kafka09-sink</module>
1919
</modules>
2020

2121
<dependencies>

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka10JsonTableSink.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@
1919

2020
import org.apache.flink.api.common.serialization.SerializationSchema;
2121
import org.apache.flink.streaming.api.datastream.DataStream;
22+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
2223
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
2324
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
24-
import org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSink;
25-
import org.apache.flink.streaming.connectors.kafka.KafkaJsonTableSink;
26-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
27-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
25+
import org.apache.flink.streaming.connectors.kafka.Kafka010TableSink;
2826
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
29-
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
27+
import org.apache.flink.table.api.TableSchema;
3028
import org.apache.flink.table.util.TableConnectorUtil;
29+
import org.apache.flink.table.utils.TableConnectorUtils;
3130
import org.apache.flink.types.Row;
3231

3332
import java.util.Optional;
@@ -40,38 +39,38 @@
4039
*
4140
* @author maqi
4241
*/
43-
public class CustomerKafka10JsonTableSink extends KafkaJsonTableSink {
42+
public class CustomerKafka10JsonTableSink extends Kafka010TableSink {
4443

4544

4645
protected SerializationSchema schema;
4746

48-
public CustomerKafka10JsonTableSink(String topic, Properties properties, SerializationSchema schema) {
49-
super(topic, properties, new FlinkFixedPartitioner<>());
50-
this.schema = schema;
51-
}
52-
53-
public CustomerKafka10JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, SerializationSchema schema) {
54-
super(topic, properties, partitioner);
55-
this.schema = schema;
47+
public CustomerKafka10JsonTableSink(TableSchema schema,
48+
String topic,
49+
Properties properties,
50+
Optional<FlinkKafkaPartitioner<Row>> partitioner,
51+
SerializationSchema<Row> serializationSchema) {
52+
super(schema, topic, properties, partitioner, serializationSchema);
53+
this.schema = serializationSchema;
5654
}
5755

5856

59-
@Deprecated
60-
public CustomerKafka10JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, SerializationSchema schema) {
61-
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
62-
this.schema = schema;
63-
}
6457

6558
@Override
66-
protected SinkFunction<Row> createKafkaProducer(String s, Properties properties, SerializationSchema<Row> serializationSchema, Optional<FlinkKafkaPartitioner<Row>> optional) {
59+
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
60+
String topic,
61+
Properties properties,
62+
SerializationSchema<Row> serializationSchema,
63+
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
6764
return new CustomerFlinkKafkaProducer010<Row>(topic, serializationSchema, properties);
6865
}
6966

67+
7068
@Override
7169
public void emitDataStream(DataStream<Row> dataStream) {
7270
SinkFunction<Row> kafkaProducer = createKafkaProducer(topic, properties, schema, partitioner);
7371
// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
7472
//kafkaProducer.setFlushOnCheckpoint(true);
75-
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
73+
dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
74+
7675
}
7776
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@
2626
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2727
import org.apache.flink.streaming.api.datastream.DataStream;
2828
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
29+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
30+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
31+
import org.apache.flink.table.api.TableSchema;
2932
import org.apache.flink.table.sinks.AppendStreamTableSink;
3033
import org.apache.flink.table.sinks.TableSink;
3134
import org.apache.flink.types.Row;
3235

36+
import java.util.Optional;
3337
import java.util.Properties;
3438
/**
3539
*
@@ -48,39 +52,60 @@ public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<K
4852

4953
protected TypeInformation<?>[] fieldTypes;
5054

55+
/** The schema of the table. */
56+
private TableSchema schema;
57+
58+
/** The Kafka topic to write to. */
5159
protected String topic;
5260

61+
/** Properties for the Kafka producer. */
5362
protected Properties properties;
5463

5564
/** Serialization schema for encoding records to Kafka. */
5665
protected SerializationSchema serializationSchema;
5766

67+
/** Partitioner to select Kafka partition for each item. */
68+
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
5869
@Override
5970
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
60-
KafkaSinkTableInfo kafka10SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
61-
this.topic = kafka10SinkTableInfo.getTopic();
62-
this.fieldNames = kafka10SinkTableInfo.getFields();
63-
TypeInformation[] types = new TypeInformation[kafka10SinkTableInfo.getFields().length];
64-
for (int i = 0; i < kafka10SinkTableInfo.getFieldClasses().length; i++) {
65-
types[i] = TypeInformation.of(kafka10SinkTableInfo.getFieldClasses()[i]);
71+
KafkaSinkTableInfo kafka010SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
72+
this.topic = kafka010SinkTableInfo.getTopic();
73+
74+
Properties props = new Properties();
75+
props.setProperty("bootstrap.servers", kafka010SinkTableInfo.getBootstrapServers());
76+
77+
for (String key:kafka010SinkTableInfo.getKafkaParamKeys()) {
78+
props.setProperty(key, kafka010SinkTableInfo.getKafkaParam(key));
79+
}
80+
this.properties = props;
81+
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
82+
this.fieldNames = kafka010SinkTableInfo.getFields();
83+
TypeInformation[] types = new TypeInformation[kafka010SinkTableInfo.getFields().length];
84+
for(int i = 0; i< kafka010SinkTableInfo.getFieldClasses().length; i++){
85+
types[i] = TypeInformation.of(kafka010SinkTableInfo.getFieldClasses()[i]);
6686
}
6787
this.fieldTypes = types;
6888

69-
properties = new Properties();
70-
for (String key : kafka10SinkTableInfo.getKafkaParamKeys()) {
71-
properties.setProperty(key, kafka10SinkTableInfo.getKafkaParam(key));
89+
TableSchema.Builder schemaBuilder = TableSchema.builder();
90+
for (int i=0;i<fieldNames.length;i++){
91+
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
7292
}
73-
properties.setProperty("bootstrap.servers", kafka10SinkTableInfo.getBootstrapServers());
93+
this.schema = schemaBuilder.build();
7494

75-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
95+
//this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
96+
if ("json".equalsIgnoreCase(kafka010SinkTableInfo.getSinkDataType())) {
97+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
98+
}
7699
return this;
77100
}
78101

79102
@Override
80103
public void emitDataStream(DataStream<Row> dataStream) {
81104
KafkaTableSinkBase kafkaTableSink = new CustomerKafka10JsonTableSink(
105+
schema,
82106
topic,
83107
properties,
108+
partitioner,
84109
serializationSchema
85110
);
86111

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package com.dtstack.flink.sql.sink.kafka.table;
2020

2121
import com.dtstack.flink.sql.table.TargetTableInfo;
22-
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
2322

2423
import java.util.HashMap;
2524
import java.util.Map;
@@ -38,6 +37,10 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
3837
//version
3938
private static final String CURR_TYPE = "kafka10";
4039

40+
public KafkaSinkTableInfo() {
41+
super.setType(CURR_TYPE);
42+
}
43+
4144
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
4245

4346
public static final String TOPIC_KEY = "topic";
@@ -48,9 +51,7 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
4851

4952
private String topic;
5053

51-
public KafkaSinkTableInfo() {
52-
super.setType(CURR_TYPE);
53-
}
54+
5455
public void addKafkaParam(String key,String value){
5556
kafkaParam.put(key,value);
5657
}
@@ -80,16 +81,18 @@ public void setTopic(String topic) {
8081
this.topic = topic;
8182
}
8283

84+
8385
@Override
8486
public boolean check() {
85-
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
86-
Preconditions.checkNotNull(topic, "kafka of topic is required");
87+
com.google.common.base.Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
88+
com.google.common.base.Preconditions.checkNotNull(topic, "kafka of topic is required");
89+
//Preconditions.checkNotNull(kafkaParam.get("groupId"), "kafka of groupId is required");
8790
return false;
8891
}
8992

9093
@Override
9194
public String getType() {
92-
// return super.getType() + SOURCE_SUFFIX;
9395
return super.getType();
9496
}
97+
9598
}

0 commit comments

Comments
 (0)