2323import com .dtstack .flink .sql .table .TargetTableInfo ;
2424import org .apache .flink .api .common .serialization .SerializationSchema ;
2525import org .apache .flink .api .common .typeinfo .TypeInformation ;
26+ import org .apache .flink .api .java .tuple .Tuple2 ;
2627import org .apache .flink .api .java .typeutils .RowTypeInfo ;
28+ import org .apache .flink .api .java .typeutils .TupleTypeInfo ;
2729import org .apache .flink .streaming .api .datastream .DataStream ;
28- import org .apache .flink .streaming .connectors .kafka .KafkaTableSinkBase ;
29- import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkFixedPartitioner ;
30- import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkKafkaPartitioner ;
31- import org .apache .flink .table .api .TableSchema ;
32- import org .apache .flink .table .sinks .AppendStreamTableSink ;
30+ import org .apache .flink .streaming .connectors .kafka .KafkaTableSink ;
31+ import org .apache .flink .table .sinks .RetractStreamTableSink ;
3332import org .apache .flink .table .sinks .TableSink ;
3433import org .apache .flink .types .Row ;
3534
4544 * @modifyer maqi
4645 *
4746 */
48- public class KafkaSink implements AppendStreamTableSink <Row >, IStreamSinkGener <KafkaSink > {
47+ public class KafkaSink implements RetractStreamTableSink <Row >, IStreamSinkGener <KafkaSink > {
4948
5049
5150 protected String [] fieldNames ;
5251
5352 protected TypeInformation <?>[] fieldTypes ;
5453
55- /** The schema of the table. */
56- private TableSchema schema ;
57-
58- /** The Kafka topic to write to. */
5954 protected String topic ;
6055
61- /** Properties for the Kafka producer. */
6256 protected Properties properties ;
6357
6458 /** Serialization schema for encoding records to Kafka. */
6559 protected SerializationSchema serializationSchema ;
6660
67- /** Partitioner to select Kafka partition for each item. */
68- protected Optional <FlinkKafkaPartitioner <Row >> partitioner ;
6961 @ Override
7062 public KafkaSink genStreamSink (TargetTableInfo targetTableInfo ) {
71- KafkaSinkTableInfo kafka010SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
72- this .topic = kafka010SinkTableInfo .getTopic ();
73-
74- Properties props = new Properties ();
75- props .setProperty ("bootstrap.servers" , kafka010SinkTableInfo .getBootstrapServers ());
76-
77- for (String key :kafka010SinkTableInfo .getKafkaParamKeys ()) {
78- props .setProperty (key , kafka010SinkTableInfo .getKafkaParam (key ));
79- }
80- this .properties = props ;
81- this .partitioner = Optional .of (new FlinkFixedPartitioner <>());
82- this .fieldNames = kafka010SinkTableInfo .getFields ();
83- TypeInformation [] types = new TypeInformation [kafka010SinkTableInfo .getFields ().length ];
84- for (int i = 0 ; i < kafka010SinkTableInfo .getFieldClasses ().length ; i ++){
85- types [i ] = TypeInformation .of (kafka010SinkTableInfo .getFieldClasses ()[i ]);
63+ KafkaSinkTableInfo kafka10SinkTableInfo = (KafkaSinkTableInfo ) targetTableInfo ;
64+ this .topic = kafka10SinkTableInfo .getTopic ();
65+ this .fieldNames = kafka10SinkTableInfo .getFields ();
66+ TypeInformation [] types = new TypeInformation [kafka10SinkTableInfo .getFields ().length ];
67+ for (int i = 0 ; i < kafka10SinkTableInfo .getFieldClasses ().length ; i ++) {
68+ types [i ] = TypeInformation .of (kafka10SinkTableInfo .getFieldClasses ()[i ]);
8669 }
8770 this .fieldTypes = types ;
8871
89- TableSchema . Builder schemaBuilder = TableSchema . builder ();
90- for (int i = 0 ; i < fieldNames . length ; i ++) {
91- schemaBuilder . field ( fieldNames [ i ], fieldTypes [ i ] );
72+ properties = new Properties ();
73+ for (String key : kafka10SinkTableInfo . getKafkaParamKeys ()) {
74+ properties . setProperty ( key , kafka10SinkTableInfo . getKafkaParam ( key ) );
9275 }
93- this . schema = schemaBuilder . build ( );
76+ properties . setProperty ( "bootstrap.servers" , kafka10SinkTableInfo . getBootstrapServers () );
9477
95- //this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
96- if ("json" .equalsIgnoreCase (kafka010SinkTableInfo .getSinkDataType ())) {
97- this .serializationSchema = new CustomerJsonRowSerializationSchema (getOutputType ());
98- }
78+ this .serializationSchema = new CustomerJsonRowSerializationSchema (getOutputType ().getTypeAt (1 ));
9979 return this ;
10080 }
10181
10282 @ Override
103- public void emitDataStream (DataStream <Row > dataStream ) {
104- KafkaTableSinkBase kafkaTableSink = new CustomerKafka10JsonTableSink (
105- schema ,
83+ public TypeInformation <Row > getRecordType () {
84+ return new RowTypeInfo (fieldTypes , fieldNames );
85+ }
86+
87+ @ Override
88+ public void emitDataStream (DataStream <Tuple2 <Boolean , Row >> dataStream ) {
89+ KafkaTableSink kafkaTableSink = new CustomerKafka10JsonTableSink (
10690 topic ,
10791 properties ,
108- partitioner ,
10992 serializationSchema
11093 );
11194
112- kafkaTableSink .emitDataStream (dataStream );
95+
96+ DataStream <Row > ds = dataStream .map ((Tuple2 <Boolean , Row > record ) -> {
97+ return record .f1 ;
98+ }).returns (getOutputType ().getTypeAt (1 ));
99+
100+ kafkaTableSink .emitDataStream (ds );
113101 }
114102
115103 @ Override
116- public TypeInformation < Row > getOutputType () {
117- return new RowTypeInfo (fieldTypes , fieldNames );
104+ public TupleTypeInfo < Tuple2 < Boolean , Row > > getOutputType () {
105+ return new TupleTypeInfo ( org . apache . flink . table . api . Types . BOOLEAN (), new RowTypeInfo (fieldTypes , fieldNames ) );
118106 }
119107
120108 @ Override
@@ -128,7 +116,7 @@ public TypeInformation<?>[] getFieldTypes() {
128116 }
129117
130118 @ Override
131- public TableSink <Row > configure (String [] fieldNames , TypeInformation <?>[] fieldTypes ) {
119+ public TableSink <Tuple2 < Boolean , Row > > configure (String [] fieldNames , TypeInformation <?>[] fieldTypes ) {
132120 this .fieldNames = fieldNames ;
133121 this .fieldTypes = fieldTypes ;
134122 return this ;
0 commit comments