Skip to content

Commit 0628f06

Browse files
author
dapeng
committed
test system.out
1 parent 214562d commit 0628f06

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKeyedSerializationSchema.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ public CustomerKeyedSerializationSchema(SerializationMetricWrapper serialization
2323
}
2424

2525
public byte[] serializeKey(Row element) {
26-
System.out.println("element = " + element+"|partitionKeys=" + partitionKeys);
2726
if(partitionKeys == null || partitionKeys.length <=0){
2827
return null;
29-
}
28+
}
3029
SerializationSchema<Row> serializationSchema = serializationMetricWrapper.getSerializationSchema();
3130
if(serializationSchema instanceof JsonRowSerializationSchema){
31+
System.out.println("serial start, element = " + element);
3232
return serializeJsonKey((JsonRowSerializationSchema) serializationSchema, element);
3333
}
3434
return null;
@@ -46,13 +46,15 @@ private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationS
4646
try {
4747
byte[] data = jsonRowSerializationSchema.serialize(element);
4848
ObjectNode objectNode = mapper.readValue(data, ObjectNode.class);
49+
System.out.println("objectNode = " + objectNode);
4950
StringBuilder sb = new StringBuilder();
5051
for(String key : partitionKeys){
5152
if(objectNode.has(key)){
53+
System.out.println("key = " + key+ ", value = " + objectNode.get(key));
5254
sb.append(objectNode.get(key));
5355
}
5456
}
55-
return null;
57+
return sb.toString().getBytes();
5658
}catch (Exception e){
5759

5860
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
102102
if (parallelism != null) {
103103
this.parallelism = parallelism;
104104
}
105-
106105
this.kafkaProducer010 = (FlinkKafkaProducer010<Row>) new KafkaProducer010Factory().createKafkaProducer(kafka10SinkTableInfo, getOutputType().getTypeAt(1), properties, partitioner, partitionKeys);
107106
return this;
108107
}
@@ -145,13 +144,15 @@ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInform
145144
}
146145

147146
private FlinkKafkaPartitioner getFlinkPartitioner(KafkaSinkTableInfo kafkaSinkTableInfo){
147+
System.out.println("enablePartition =" + kafkaSinkTableInfo.getEnableKeyPartition());
148148
if("true".equalsIgnoreCase(kafkaSinkTableInfo.getEnableKeyPartition())){
149149
return new CustomerFlinkPartition<>();
150150
}
151151
return new FlinkFixedPartitioner<>();
152152
}
153153

154154
private String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo){
155+
System.out.println("partitionKeys =" + kafkaSinkTableInfo.getPartitionKeys());
155156
if(StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())){
156157
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
157158
}

0 commit comments

Comments
 (0)