Skip to content

Commit 9ca4a09

Browse files
committed
kafka sink use retractStream
1 parent 652f6be commit 9ca4a09

File tree

3 files changed

+83
-18
lines changed
  • kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka

3 files changed

+83
-18
lines changed

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,15 @@
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2828
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
30+
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
31+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
32+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
33+
import org.apache.flink.table.api.TableSchema;
3134
import org.apache.flink.table.sinks.RetractStreamTableSink;
3235
import org.apache.flink.table.sinks.TableSink;
3336
import org.apache.flink.types.Row;
37+
38+
import java.util.Optional;
3439
import java.util.Properties;
3540

3641
/**
@@ -53,22 +58,37 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
5358
/** Serialization schema for encoding records to Kafka. */
5459
protected SerializationSchema serializationSchema;
5560

61+
/** The schema of the table. */
62+
private TableSchema schema;
63+
64+
/** Partitioner to select Kafka partition for each item. */
65+
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
66+
67+
5668
@Override
5769
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
5870
KafkaSinkTableInfo kafka09SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
5971
this.topic = kafka09SinkTableInfo.getTopic();
72+
73+
properties = new Properties();
74+
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
75+
for (String key : kafka09SinkTableInfo.getKafkaParamKeys()) {
76+
properties.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
77+
}
78+
79+
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
6080
this.fieldNames = kafka09SinkTableInfo.getFields();
6181
TypeInformation[] types = new TypeInformation[kafka09SinkTableInfo.getFields().length];
6282
for (int i = 0; i < kafka09SinkTableInfo.getFieldClasses().length; i++) {
6383
types[i] = TypeInformation.of(kafka09SinkTableInfo.getFieldClasses()[i]);
6484
}
6585
this.fieldTypes = types;
6686

67-
properties = new Properties();
68-
for (String key : kafka09SinkTableInfo.getKafkaParamKeys()) {
69-
properties.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
87+
TableSchema.Builder schemaBuilder = TableSchema.builder();
88+
for (int i=0;i<fieldNames.length;i++){
89+
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
7090
}
71-
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
91+
this.schema = schemaBuilder.build();
7292

7393
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
7494
return this;
@@ -81,9 +101,11 @@ public TypeInformation<Row> getRecordType() {
81101

82102
@Override
83103
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
84-
KafkaTableSink kafkaTableSink = new CustomerKafka09JsonTableSink(
104+
KafkaTableSinkBase kafkaTableSink = new CustomerKafka09JsonTableSink(
105+
schema,
85106
topic,
86107
properties,
108+
partitioner,
87109
serializationSchema
88110
);
89111

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2828
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
30+
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
31+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
32+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
33+
import org.apache.flink.table.api.TableSchema;
3134
import org.apache.flink.table.sinks.RetractStreamTableSink;
3235
import org.apache.flink.table.sinks.TableSink;
3336
import org.apache.flink.types.Row;
@@ -58,22 +61,37 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
5861
/** Serialization schema for encoding records to Kafka. */
5962
protected SerializationSchema serializationSchema;
6063

64+
/** The schema of the table. */
65+
private TableSchema schema;
66+
67+
/** Partitioner to select Kafka partition for each item. */
68+
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
69+
6170
@Override
6271
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6372
KafkaSinkTableInfo kafka10SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
6473
this.topic = kafka10SinkTableInfo.getTopic();
74+
75+
properties = new Properties();
76+
properties.setProperty("bootstrap.servers", kafka10SinkTableInfo.getBootstrapServers());
77+
78+
for (String key : kafka10SinkTableInfo.getKafkaParamKeys()) {
79+
properties.setProperty(key, kafka10SinkTableInfo.getKafkaParam(key));
80+
}
81+
82+
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
6583
this.fieldNames = kafka10SinkTableInfo.getFields();
6684
TypeInformation[] types = new TypeInformation[kafka10SinkTableInfo.getFields().length];
6785
for (int i = 0; i < kafka10SinkTableInfo.getFieldClasses().length; i++) {
6886
types[i] = TypeInformation.of(kafka10SinkTableInfo.getFieldClasses()[i]);
6987
}
7088
this.fieldTypes = types;
7189

72-
properties = new Properties();
73-
for (String key : kafka10SinkTableInfo.getKafkaParamKeys()) {
74-
properties.setProperty(key, kafka10SinkTableInfo.getKafkaParam(key));
90+
TableSchema.Builder schemaBuilder = TableSchema.builder();
91+
for (int i=0;i<fieldNames.length;i++){
92+
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
7593
}
76-
properties.setProperty("bootstrap.servers", kafka10SinkTableInfo.getBootstrapServers());
94+
this.schema = schemaBuilder.build();
7795

7896
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
7997
return this;
@@ -86,9 +104,11 @@ public TypeInformation<Row> getRecordType() {
86104

87105
@Override
88106
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
89-
KafkaTableSink kafkaTableSink = new CustomerKafka10JsonTableSink(
107+
KafkaTableSinkBase kafkaTableSink = new CustomerKafka10JsonTableSink(
108+
schema,
90109
topic,
91110
properties,
111+
partitioner,
92112
serializationSchema
93113
);
94114

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,15 @@
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2828
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
30+
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
31+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
32+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
33+
import org.apache.flink.table.api.TableSchema;
3134
import org.apache.flink.table.sinks.RetractStreamTableSink;
3235
import org.apache.flink.table.sinks.TableSink;
3336
import org.apache.flink.types.Row;
37+
38+
import java.util.Optional;
3439
import java.util.Properties;
3540

3641
/**
@@ -56,22 +61,38 @@ public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener
5661
/** Serialization schema for encoding records to Kafka. */
5762
protected SerializationSchema serializationSchema;
5863

64+
/** The schema of the table. */
65+
private TableSchema schema;
66+
67+
/** Partitioner to select Kafka partition for each item. */
68+
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
69+
70+
5971
@Override
6072
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6173
KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
6274
this.topic = kafka11SinkTableInfo.getTopic();
75+
76+
properties = new Properties();
77+
properties.setProperty("bootstrap.servers", kafka11SinkTableInfo.getBootstrapServers());
78+
79+
for (String key : kafka11SinkTableInfo.getKafkaParamKeys()) {
80+
properties.setProperty(key, kafka11SinkTableInfo.getKafkaParam(key));
81+
}
82+
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
6383
this.fieldNames = kafka11SinkTableInfo.getFields();
6484
TypeInformation[] types = new TypeInformation[kafka11SinkTableInfo.getFields().length];
6585
for (int i = 0; i < kafka11SinkTableInfo.getFieldClasses().length; i++) {
6686
types[i] = TypeInformation.of(kafka11SinkTableInfo.getFieldClasses()[i]);
6787
}
6888
this.fieldTypes = types;
6989

70-
properties = new Properties();
71-
for (String key : kafka11SinkTableInfo.getKafkaParamKeys()) {
72-
properties.setProperty(key, kafka11SinkTableInfo.getKafkaParam(key));
90+
TableSchema.Builder schemaBuilder = TableSchema.builder();
91+
for (int i=0;i<fieldNames.length;i++){
92+
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
7393
}
74-
properties.setProperty("bootstrap.servers", kafka11SinkTableInfo.getBootstrapServers());
94+
this.schema = schemaBuilder.build();
95+
7596
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
7697
return this;
7798
}
@@ -83,9 +104,11 @@ public TypeInformation<Row> getRecordType() {
83104

84105
@Override
85106
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
86-
KafkaTableSink kafkaTableSink = new CustomerKafka11JsonTableSink(
107+
KafkaTableSinkBase kafkaTableSink = new CustomerKafka11JsonTableSink(
108+
schema,
87109
topic,
88110
properties,
111+
partitioner,
89112
serializationSchema
90113
);
91114

0 commit comments

Comments
 (0)