Skip to content

Commit 21ad9a7

Browse files
committed
kafka09 sink
1 parent 6800b7a commit 21ad9a7

File tree

4 files changed

+54
-28
lines changed

4 files changed

+54
-28
lines changed

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka09JsonTableSink.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import org.apache.flink.api.common.serialization.SerializationSchema;
2121
import org.apache.flink.streaming.api.datastream.DataStream;
2222
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
23-
import org.apache.flink.streaming.connectors.kafka.*;
24-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
23+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
24+
import org.apache.flink.streaming.connectors.kafka.Kafka09TableSink;
2525
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
2626
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
27-
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
28-
import org.apache.flink.table.util.TableConnectorUtil;
27+
import org.apache.flink.table.api.TableSchema;
28+
import org.apache.flink.table.utils.TableConnectorUtils;
2929
import org.apache.flink.types.Row;
3030

3131
import java.util.Optional;
@@ -38,30 +38,26 @@
3838
*
3939
* @author maqi
4040
*/
41-
public class CustomerKafka09JsonTableSink extends KafkaJsonTableSink {
41+
public class CustomerKafka09JsonTableSink extends Kafka09TableSink {
4242

4343

4444
protected SerializationSchema schema;
4545

46-
public CustomerKafka09JsonTableSink(String topic, Properties properties, SerializationSchema schema) {
47-
super(topic, properties, new FlinkFixedPartitioner<>());
48-
this.schema = schema;
49-
}
5046

51-
public CustomerKafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, SerializationSchema schema) {
52-
super(topic, properties, partitioner);
53-
this.schema = schema;
54-
}
5547

5648

5749
@Deprecated
58-
public CustomerKafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, SerializationSchema schema) {
59-
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
60-
this.schema = schema;
50+
public CustomerKafka09JsonTableSink(TableSchema schema,
51+
String topic,
52+
Properties properties,
53+
Optional<FlinkKafkaPartitioner<Row>> partitioner,
54+
SerializationSchema<Row> serializationSchema) {
55+
super(schema, topic, properties, partitioner, serializationSchema);
56+
this.schema = serializationSchema;
6157
}
6258

6359
@Override
64-
protected SinkFunction<Row> createKafkaProducer(String s, Properties properties, SerializationSchema<Row> serializationSchema, Optional<FlinkKafkaPartitioner<Row>> optional) {
60+
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String s, Properties properties, SerializationSchema<Row> serializationSchema, Optional<FlinkKafkaPartitioner<Row>> optional) {
6561
return new CustomerFlinkKafkaProducer09<>(topic, serializationSchema, properties);
6662
}
6763

@@ -70,6 +66,6 @@ public void emitDataStream(DataStream<Row> dataStream) {
7066
SinkFunction<Row> kafkaProducer = createKafkaProducer(topic, properties, schema, partitioner);
7167
// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
7268
//kafkaProducer.setFlushOnCheckpoint(true);
73-
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
69+
dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
7470
}
7571
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,14 @@
2626
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2727
import org.apache.flink.streaming.api.datastream.DataStream;
2828
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
29+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
30+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
31+
import org.apache.flink.table.api.TableSchema;
2932
import org.apache.flink.table.sinks.AppendStreamTableSink;
3033
import org.apache.flink.table.sinks.TableSink;
3134
import org.apache.flink.types.Row;
35+
36+
import java.util.Optional;
3237
import java.util.Properties;
3338

3439
/**
@@ -44,39 +49,63 @@ public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<K
4449

4550
protected TypeInformation<?>[] fieldTypes;
4651

52+
/** The schema of the table. */
53+
private TableSchema schema;
54+
55+
/** The Kafka topic to write to. */
4756
protected String topic;
4857

58+
/** Properties for the Kafka producer. */
4959
protected Properties properties;
5060

5161
/** Serialization schema for encoding records to Kafka. */
5262
protected SerializationSchema serializationSchema;
5363

64+
/** Partitioner to select Kafka partition for each item. */
65+
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
66+
5467
@Override
5568
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
69+
5670
KafkaSinkTableInfo kafka09SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
5771
this.topic = kafka09SinkTableInfo.getTopic();
72+
73+
Properties props = new Properties();
74+
props.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
75+
76+
for (String key:kafka09SinkTableInfo.getKafkaParamKeys()) {
77+
props.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
78+
}
79+
this.properties = props;
80+
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
5881
this.fieldNames = kafka09SinkTableInfo.getFields();
5982
TypeInformation[] types = new TypeInformation[kafka09SinkTableInfo.getFields().length];
60-
for (int i = 0; i < kafka09SinkTableInfo.getFieldClasses().length; i++) {
83+
for(int i = 0; i< kafka09SinkTableInfo.getFieldClasses().length; i++){
6184
types[i] = TypeInformation.of(kafka09SinkTableInfo.getFieldClasses()[i]);
6285
}
6386
this.fieldTypes = types;
6487

65-
properties = new Properties();
66-
for (String key : kafka09SinkTableInfo.getKafkaParamKeys()) {
67-
properties.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
88+
TableSchema.Builder schemaBuilder = TableSchema.builder();
89+
for (int i=0;i<fieldNames.length;i++){
90+
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
91+
}
92+
this.schema = schemaBuilder.build();
93+
94+
//this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
95+
if ("json".equalsIgnoreCase(kafka09SinkTableInfo.getSinkDataType())) {
96+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
6897
}
69-
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
7098

71-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
7299
return this;
73100
}
74101

75102
@Override
76103
public void emitDataStream(DataStream<Row> dataStream) {
77104
KafkaTableSinkBase kafkaTableSink = new CustomerKafka09JsonTableSink(
105+
schema,
78106
topic,
79107
properties,
108+
partitioner,
80109
serializationSchema
81110
);
82111

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
3636
//version
3737
private static final String CURR_TYPE = "kafka09";
3838

39+
public KafkaSinkTableInfo() {
40+
super.setType(CURR_TYPE);
41+
}
42+
3943
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
4044

4145
public static final String TOPIC_KEY = "topic";
@@ -46,9 +50,6 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
4650

4751
public Map<String,String> kafkaParam = new HashMap<String,String>();
4852

49-
public KafkaSinkTableInfo() {
50-
super.setType(CURR_TYPE);
51-
}
5253

5354
public void addKafkaParam(String key,String value){
5455
kafkaParam.put(key,value);

kafka09/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
<modules>
1717
<module>kafka09-source</module>
18-
<!--<module>kafka09-sink</module>-->
18+
<module>kafka09-sink</module>
1919
</modules>
2020

2121
<dependencies>

0 commit comments

Comments
 (0)