Skip to content

Commit 6505da6

Browse files
committed
resolve conflict
2 parents d5cabbb + a63735b commit 6505da6

File tree

3 files changed

+96
-86
lines changed
  • kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka

3 files changed

+96
-86
lines changed

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import com.dtstack.flink.sql.table.TargetTableInfo;
2424
import org.apache.flink.api.common.serialization.SerializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2729
import org.apache.flink.streaming.api.datastream.DataStream;
2830
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
2931
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
3032
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3133
import org.apache.flink.table.api.TableSchema;
32-
import org.apache.flink.table.sinks.AppendStreamTableSink;
34+
import org.apache.flink.table.sinks.RetractStreamTableSink;
3335
import org.apache.flink.table.sinks.TableSink;
3436
import org.apache.flink.types.Row;
3537

@@ -43,44 +45,41 @@
4345
* @author DocLi
4446
* @modifyer maqi
4547
*/
46-
public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
48+
public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
4749

4850
protected String[] fieldNames;
4951

5052
protected TypeInformation<?>[] fieldTypes;
5153

52-
/** The schema of the table. */
53-
private TableSchema schema;
54-
55-
/** The Kafka topic to write to. */
5654
protected String topic;
5755

58-
/** Properties for the Kafka producer. */
5956
protected Properties properties;
6057

6158
/** Serialization schema for encoding records to Kafka. */
6259
protected SerializationSchema serializationSchema;
6360

61+
/** The schema of the table. */
62+
private TableSchema schema;
63+
6464
/** Partitioner to select Kafka partition for each item. */
6565
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
6666

67+
6768
@Override
6869
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
69-
7070
KafkaSinkTableInfo kafka09SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
7171
this.topic = kafka09SinkTableInfo.getTopic();
7272

73-
Properties props = new Properties();
74-
props.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
75-
76-
for (String key:kafka09SinkTableInfo.getKafkaParamKeys()) {
77-
props.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
73+
properties = new Properties();
74+
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
75+
for (String key : kafka09SinkTableInfo.getKafkaParamKeys()) {
76+
properties.setProperty(key, kafka09SinkTableInfo.getKafkaParam(key));
7877
}
79-
this.properties = props;
78+
8079
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
8180
this.fieldNames = kafka09SinkTableInfo.getFields();
8281
TypeInformation[] types = new TypeInformation[kafka09SinkTableInfo.getFields().length];
83-
for(int i = 0; i< kafka09SinkTableInfo.getFieldClasses().length; i++){
82+
for (int i = 0; i < kafka09SinkTableInfo.getFieldClasses().length; i++) {
8483
types[i] = TypeInformation.of(kafka09SinkTableInfo.getFieldClasses()[i]);
8584
}
8685
this.fieldTypes = types;
@@ -91,16 +90,17 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9190
}
9291
this.schema = schemaBuilder.build();
9392

94-
//this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
95-
if ("json".equalsIgnoreCase(kafka09SinkTableInfo.getSinkDataType())) {
96-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
97-
}
98-
93+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
9994
return this;
10095
}
10196

10297
@Override
103-
public void emitDataStream(DataStream<Row> dataStream) {
98+
public TypeInformation<Row> getRecordType() {
99+
return new RowTypeInfo(fieldTypes, fieldNames);
100+
}
101+
102+
@Override
103+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
104104
KafkaTableSinkBase kafkaTableSink = new CustomerKafka09JsonTableSink(
105105
schema,
106106
topic,
@@ -109,12 +109,16 @@ public void emitDataStream(DataStream<Row> dataStream) {
109109
serializationSchema
110110
);
111111

112-
kafkaTableSink.emitDataStream(dataStream);
112+
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
113+
return record.f1;
114+
}).returns(getOutputType().getTypeAt(1));
115+
116+
kafkaTableSink.emitDataStream(ds);
113117
}
114118

115119
@Override
116-
public TypeInformation<Row> getOutputType() {
117-
return new RowTypeInfo(fieldTypes, fieldNames);
120+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
121+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
118122
}
119123

120124
@Override
@@ -128,7 +132,7 @@ public TypeInformation<?>[] getFieldTypes() {
128132
}
129133

130134
@Override
131-
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
135+
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
132136
this.fieldNames = fieldNames;
133137
this.fieldTypes = fieldTypes;
134138
return this;

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import com.dtstack.flink.sql.table.TargetTableInfo;
2424
import org.apache.flink.api.common.serialization.SerializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.tuple.Tuple2;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2729
import org.apache.flink.streaming.api.datastream.DataStream;
2830
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
2931
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
3032
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3133
import org.apache.flink.table.api.TableSchema;
32-
import org.apache.flink.table.sinks.AppendStreamTableSink;
34+
import org.apache.flink.table.sinks.RetractStreamTableSink;
3335
import org.apache.flink.table.sinks.TableSink;
3436
import org.apache.flink.types.Row;
3537

@@ -45,44 +47,43 @@
4547
* @modifyer maqi
4648
*
4749
*/
48-
public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
50+
public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
4951

5052

5153
protected String[] fieldNames;
5254

5355
protected TypeInformation<?>[] fieldTypes;
5456

55-
/** The schema of the table. */
56-
private TableSchema schema;
57-
58-
/** The Kafka topic to write to. */
5957
protected String topic;
6058

61-
/** Properties for the Kafka producer. */
6259
protected Properties properties;
6360

6461
/** Serialization schema for encoding records to Kafka. */
6562
protected SerializationSchema serializationSchema;
6663

64+
/** The schema of the table. */
65+
private TableSchema schema;
66+
6767
/** Partitioner to select Kafka partition for each item. */
6868
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
69+
6970
@Override
7071
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
71-
KafkaSinkTableInfo kafka010SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
72-
this.topic = kafka010SinkTableInfo.getTopic();
72+
KafkaSinkTableInfo kafka10SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
73+
this.topic = kafka10SinkTableInfo.getTopic();
7374

74-
Properties props = new Properties();
75-
props.setProperty("bootstrap.servers", kafka010SinkTableInfo.getBootstrapServers());
75+
properties = new Properties();
76+
properties.setProperty("bootstrap.servers", kafka10SinkTableInfo.getBootstrapServers());
7677

77-
for (String key:kafka010SinkTableInfo.getKafkaParamKeys()) {
78-
props.setProperty(key, kafka010SinkTableInfo.getKafkaParam(key));
78+
for (String key : kafka10SinkTableInfo.getKafkaParamKeys()) {
79+
properties.setProperty(key, kafka10SinkTableInfo.getKafkaParam(key));
7980
}
80-
this.properties = props;
81+
8182
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
82-
this.fieldNames = kafka010SinkTableInfo.getFields();
83-
TypeInformation[] types = new TypeInformation[kafka010SinkTableInfo.getFields().length];
84-
for(int i = 0; i< kafka010SinkTableInfo.getFieldClasses().length; i++){
85-
types[i] = TypeInformation.of(kafka010SinkTableInfo.getFieldClasses()[i]);
83+
this.fieldNames = kafka10SinkTableInfo.getFields();
84+
TypeInformation[] types = new TypeInformation[kafka10SinkTableInfo.getFields().length];
85+
for (int i = 0; i < kafka10SinkTableInfo.getFieldClasses().length; i++) {
86+
types[i] = TypeInformation.of(kafka10SinkTableInfo.getFieldClasses()[i]);
8687
}
8788
this.fieldTypes = types;
8889

@@ -92,15 +93,17 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9293
}
9394
this.schema = schemaBuilder.build();
9495

95-
//this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
96-
if ("json".equalsIgnoreCase(kafka010SinkTableInfo.getSinkDataType())) {
97-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
98-
}
96+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
9997
return this;
10098
}
10199

102100
@Override
103-
public void emitDataStream(DataStream<Row> dataStream) {
101+
public TypeInformation<Row> getRecordType() {
102+
return new RowTypeInfo(fieldTypes, fieldNames);
103+
}
104+
105+
@Override
106+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
104107
KafkaTableSinkBase kafkaTableSink = new CustomerKafka10JsonTableSink(
105108
schema,
106109
topic,
@@ -109,12 +112,16 @@ public void emitDataStream(DataStream<Row> dataStream) {
109112
serializationSchema
110113
);
111114

112-
kafkaTableSink.emitDataStream(dataStream);
115+
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
116+
return record.f1;
117+
}).returns(getOutputType().getTypeAt(1));
118+
119+
kafkaTableSink.emitDataStream(ds);
113120
}
114121

115122
@Override
116-
public TypeInformation<Row> getOutputType() {
117-
return new RowTypeInfo(fieldTypes, fieldNames);
123+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
124+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
118125
}
119126

120127
@Override
@@ -128,7 +135,7 @@ public TypeInformation<?>[] getFieldTypes() {
128135
}
129136

130137
@Override
131-
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
138+
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
132139
this.fieldNames = fieldNames;
133140
this.fieldTypes = fieldTypes;
134141
return this;

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,16 @@
2222
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
2323
import com.dtstack.flink.sql.table.TargetTableInfo;
2424
import org.apache.flink.api.common.serialization.SerializationSchema;
25-
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
2625
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.java.tuple.Tuple2;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28-
import org.apache.flink.formats.json.JsonRowSerializationSchema;
28+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
30-
31-
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink;
3230
import org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase;
3331
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
3432
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3533
import org.apache.flink.table.api.TableSchema;
36-
import org.apache.flink.table.sinks.AppendStreamTableSink;
34+
import org.apache.flink.table.sinks.RetractStreamTableSink;
3735
import org.apache.flink.table.sinks.TableSink;
3836
import org.apache.flink.types.Row;
3937

@@ -50,44 +48,42 @@
5048
* @modifyer maqi
5149
*
5250
*/
53-
public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
51+
public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
5452

5553
protected String[] fieldNames;
5654

5755
protected TypeInformation<?>[] fieldTypes;
5856

59-
/** The schema of the table. */
60-
private TableSchema schema;
61-
62-
/** The Kafka topic to write to. */
6357
protected String topic;
6458

65-
/** Properties for the Kafka producer. */
6659
protected Properties properties;
6760

6861
/** Serialization schema for encoding records to Kafka. */
6962
protected SerializationSchema serializationSchema;
7063

64+
/** The schema of the table. */
65+
private TableSchema schema;
66+
7167
/** Partitioner to select Kafka partition for each item. */
7268
protected Optional<FlinkKafkaPartitioner<Row>> partitioner;
7369

70+
7471
@Override
7572
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
76-
KafkaSinkTableInfo kafka011SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
77-
this.topic = kafka011SinkTableInfo.getTopic();
73+
KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
74+
this.topic = kafka11SinkTableInfo.getTopic();
7875

79-
Properties props = new Properties();
80-
props.setProperty("bootstrap.servers", kafka011SinkTableInfo.getBootstrapServers());
76+
properties = new Properties();
77+
properties.setProperty("bootstrap.servers", kafka11SinkTableInfo.getBootstrapServers());
8178

82-
for (String key:kafka011SinkTableInfo.getKafkaParamKeys()) {
83-
props.setProperty(key, kafka011SinkTableInfo.getKafkaParam(key));
79+
for (String key : kafka11SinkTableInfo.getKafkaParamKeys()) {
80+
properties.setProperty(key, kafka11SinkTableInfo.getKafkaParam(key));
8481
}
85-
this.properties = props;
8682
this.partitioner = Optional.of(new FlinkFixedPartitioner<>());
87-
this.fieldNames = kafka011SinkTableInfo.getFields();
88-
TypeInformation[] types = new TypeInformation[kafka011SinkTableInfo.getFields().length];
89-
for(int i = 0; i< kafka011SinkTableInfo.getFieldClasses().length; i++){
90-
types[i] = TypeInformation.of(kafka011SinkTableInfo.getFieldClasses()[i]);
83+
this.fieldNames = kafka11SinkTableInfo.getFields();
84+
TypeInformation[] types = new TypeInformation[kafka11SinkTableInfo.getFields().length];
85+
for (int i = 0; i < kafka11SinkTableInfo.getFieldClasses().length; i++) {
86+
types[i] = TypeInformation.of(kafka11SinkTableInfo.getFieldClasses()[i]);
9187
}
9288
this.fieldTypes = types;
9389

@@ -96,30 +92,34 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9692
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
9793
}
9894
this.schema = schemaBuilder.build();
99-
100-
//this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
101-
if ("json".equalsIgnoreCase(kafka011SinkTableInfo.getSinkDataType())) {
102-
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
103-
}
95+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType().getTypeAt(1));
10496
return this;
10597
}
10698

10799
@Override
108-
public void emitDataStream(DataStream<Row> dataStream) {
100+
public TypeInformation<Row> getRecordType() {
101+
return new RowTypeInfo(fieldTypes, fieldNames);
102+
}
103+
104+
@Override
105+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
109106
KafkaTableSinkBase kafkaTableSink = new CustomerKafka11JsonTableSink(
110107
schema,
111108
topic,
112109
properties,
113110
partitioner,
114111
serializationSchema
115112
);
113+
DataStream<Row> ds = dataStream.map((Tuple2<Boolean, Row> record) -> {
114+
return record.f1;
115+
}).returns(getOutputType().getTypeAt(1));
116116

117-
kafkaTableSink.emitDataStream(dataStream);
117+
kafkaTableSink.emitDataStream(ds);
118118
}
119119

120120
@Override
121-
public TypeInformation<Row> getOutputType() {
122-
return new RowTypeInfo(fieldTypes, fieldNames);
121+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
122+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
123123
}
124124

125125
@Override
@@ -133,10 +133,9 @@ public TypeInformation<?>[] getFieldTypes() {
133133
}
134134

135135
@Override
136-
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
136+
public TableSink<Tuple2<Boolean, Row>>configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
137137
this.fieldNames = fieldNames;
138138
this.fieldTypes = fieldTypes;
139139
return this;
140140
}
141-
142-
}
141+
}

0 commit comments

Comments
 (0)