Skip to content

Commit 746542c

Browse files
committed
kafka sink use retractstream
1 parent add0cab commit 746542c

File tree

3 files changed

+60
-25
lines changed
  • kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka

3 files changed

+60
-25
lines changed

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import com.dtstack.flink.sql.table.TargetTableInfo;
2424
import org.apache.flink.api.common.serialization.SerializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2729
import org.apache.flink.streaming.api.datastream.DataStream;
2830
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
29-
import org.apache.flink.table.sinks.AppendStreamTableSink;
31+
import org.apache.flink.table.sinks.RetractStreamTableSink;
3032
import org.apache.flink.table.sinks.TableSink;
3133
import org.apache.flink.types.Row;
3234
import java.util.Properties;
@@ -38,7 +40,7 @@
3840
* @author DocLi
3941
* @modifyer maqi
4042
*/
41-
public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
43+
public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
4244

4345
protected String[] fieldNames;
4446

@@ -68,24 +70,34 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6870
}
6971
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
7072

71-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
73+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
7274
return this;
7375
}
7476

7577
@Override
76-
public void emitDataStream(DataStream<Row> dataStream) {
78+
public TypeInformation<Row> getRecordType() {
79+
return new RowTypeInfo(fieldTypes, fieldNames);
80+
}
81+
82+
@Override
83+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
7784
KafkaTableSink kafkaTableSink = new CustomerKafka09JsonTableSink(
7885
topic,
7986
properties,
8087
serializationSchema
8188
);
8289

83-
kafkaTableSink.emitDataStream(dataStream);
90+
91+
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
92+
return record.f1;
93+
}).returns(getOutputType().getTypeAt(1));
94+
95+
kafkaTableSink.emitDataStream(ds);
8496
}
8597

8698
@Override
87-
public TypeInformation<Row> getOutputType() {
88-
return new RowTypeInfo(fieldTypes, fieldNames);
99+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
100+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
89101
}
90102

91103
@Override
@@ -99,7 +111,7 @@ public TypeInformation<?>[] getFieldTypes() {
99111
}
100112

101113
@Override
102-
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
114+
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
103115
this.fieldNames = fieldNames;
104116
this.fieldTypes = fieldTypes;
105117
return this;

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import com.dtstack.flink.sql.table.TargetTableInfo;
2424
import org.apache.flink.api.common.serialization.SerializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2729
import org.apache.flink.streaming.api.datastream.DataStream;
2830
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
29-
import org.apache.flink.table.sinks.AppendStreamTableSink;
31+
import org.apache.flink.table.sinks.RetractStreamTableSink;
3032
import org.apache.flink.table.sinks.TableSink;
3133
import org.apache.flink.types.Row;
3234

@@ -42,7 +44,7 @@
4244
* @modifyer maqi
4345
*
4446
*/
45-
public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
47+
public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
4648

4749

4850
protected String[] fieldNames;
@@ -73,24 +75,34 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
7375
}
7476
properties.setProperty("bootstrap.servers", kafka10SinkTableInfo.getBootstrapServers());
7577

76-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
78+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
7779
return this;
7880
}
7981

8082
@Override
81-
public void emitDataStream(DataStream<Row> dataStream) {
83+
public TypeInformation<Row> getRecordType() {
84+
return new RowTypeInfo(fieldTypes, fieldNames);
85+
}
86+
87+
@Override
88+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
8289
KafkaTableSink kafkaTableSink = new CustomerKafka10JsonTableSink(
8390
topic,
8491
properties,
8592
serializationSchema
8693
);
8794

88-
kafkaTableSink.emitDataStream(dataStream);
95+
96+
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
97+
return record.f1;
98+
}).returns(getOutputType().getTypeAt(1));
99+
100+
kafkaTableSink.emitDataStream(ds);
89101
}
90102

91103
@Override
92-
public TypeInformation<Row> getOutputType() {
93-
return new RowTypeInfo(fieldTypes, fieldNames);
104+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
105+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
94106
}
95107

96108
@Override
@@ -104,7 +116,7 @@ public TypeInformation<?>[] getFieldTypes() {
104116
}
105117

106118
@Override
107-
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
119+
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
108120
this.fieldNames = fieldNames;
109121
this.fieldTypes = fieldTypes;
110122
return this;

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@
2323
import com.dtstack.flink.sql.table.TargetTableInfo;
2424
import org.apache.flink.api.common.serialization.SerializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27-
import org.apache.flink.formats.json.JsonRowSerializationSchema;
28+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2829
import org.apache.flink.streaming.api.datastream.DataStream;
2930
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
30-
import org.apache.flink.table.sinks.AppendStreamTableSink;
31+
import org.apache.flink.table.sinks.RetractStreamTableSink;
3132
import org.apache.flink.table.sinks.TableSink;
3233
import org.apache.flink.types.Row;
3334
import java.util.Properties;
@@ -42,7 +43,7 @@
4243
* @modifyer maqi
4344
*
4445
*/
45-
public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
46+
public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
4647

4748
protected String[] fieldNames;
4849

@@ -71,24 +72,34 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
7172
properties.setProperty(key, kafka11SinkTableInfo.getKafkaParam(key));
7273
}
7374
properties.setProperty("bootstrap.servers", kafka11SinkTableInfo.getBootstrapServers());
74-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
75+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
7576
return this;
7677
}
7778

7879
@Override
79-
public void emitDataStream(DataStream<Row> dataStream) {
80+
public TypeInformation<Row> getRecordType() {
81+
return new RowTypeInfo(fieldTypes, fieldNames);
82+
}
83+
84+
@Override
85+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
8086
KafkaTableSink kafkaTableSink = new CustomerKafka11JsonTableSink(
8187
topic,
8288
properties,
8389
serializationSchema
8490
);
8591

86-
kafkaTableSink.emitDataStream(dataStream);
92+
93+
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
94+
return record.f1;
95+
}).returns(getOutputType().getTypeAt(1));
96+
97+
kafkaTableSink.emitDataStream(ds);
8798
}
8899

89100
@Override
90-
public TypeInformation<Row> getOutputType() {
91-
return new RowTypeInfo(fieldTypes, fieldNames);
101+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
102+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
92103
}
93104

94105
@Override
@@ -102,7 +113,7 @@ public TypeInformation<?>[] getFieldTypes() {
102113
}
103114

104115
@Override
105-
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
116+
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
106117
this.fieldNames = fieldNames;
107118
this.fieldTypes = fieldTypes;
108119
return this;

0 commit comments

Comments
 (0)