Skip to content

Commit 3b9b0ba

Browse files
committed
format kafka sink
1 parent 17891c0 commit 3b9b0ba

File tree

4 files changed

+14
-181
lines changed

4 files changed

+14
-181
lines changed

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkKafkaProducer09.java

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -62,63 +62,7 @@ public void open(Configuration configuration) {
6262

6363
schema.setCounter(counter);
6464

65-
if (null != flinkKafkaPartitioner) {
66-
if (flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
67-
((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
68-
getPartitionsByTopic(this.defaultTopicId, this.producer));
69-
}
70-
flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
71-
}
72-
73-
74-
// register Kafka metrics to Flink accumulators
75-
if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
76-
Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
77-
78-
if (metrics == null) {
79-
// MapR's Kafka implementation returns null here.
80-
} else {
81-
final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
82-
for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
83-
kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
84-
}
85-
}
86-
}
87-
88-
if (flushOnCheckpoint && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
89-
flushOnCheckpoint = false;
90-
}
91-
92-
if (logFailuresOnly) {
93-
callback = new Callback() {
94-
@Override
95-
public void onCompletion(RecordMetadata metadata, Exception e) {
96-
if (e != null) {
97-
}
98-
acknowledgeMessage();
99-
}
100-
};
101-
} else {
102-
callback = new Callback() {
103-
@Override
104-
public void onCompletion(RecordMetadata metadata, Exception exception) {
105-
if (exception != null && asyncException == null) {
106-
asyncException = exception;
107-
}
108-
acknowledgeMessage();
109-
}
110-
};
111-
}
65+
super.open(configuration);
11266
}
11367

114-
private void acknowledgeMessage() {
115-
if (flushOnCheckpoint) {
116-
synchronized (pendingRecordsLock) {
117-
pendingRecords--;
118-
if (pendingRecords == 0) {
119-
pendingRecordsLock.notifyAll();
120-
}
121-
}
122-
}
123-
}
12468
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkKafkaProducer010.java

Lines changed: 2 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -62,63 +62,8 @@ public void open(Configuration configuration) {
6262

6363
schema.setCounter(counter);
6464

65-
if (null != flinkKafkaPartitioner) {
66-
if (flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
67-
((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
68-
getPartitionsByTopic(this.defaultTopicId, this.producer));
69-
}
70-
flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
71-
}
72-
73-
74-
// register Kafka metrics to Flink accumulators
75-
if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
76-
Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
77-
78-
if (metrics == null) {
79-
// MapR's Kafka implementation returns null here.
80-
} else {
81-
final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
82-
for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
83-
kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
84-
}
85-
}
86-
}
87-
88-
if (flushOnCheckpoint && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
89-
flushOnCheckpoint = false;
90-
}
91-
92-
if (logFailuresOnly) {
93-
callback = new Callback() {
94-
@Override
95-
public void onCompletion(RecordMetadata metadata, Exception e) {
96-
if (e != null) {
97-
}
98-
acknowledgeMessage();
99-
}
100-
};
101-
} else {
102-
callback = new Callback() {
103-
@Override
104-
public void onCompletion(RecordMetadata metadata, Exception exception) {
105-
if (exception != null && asyncException == null) {
106-
asyncException = exception;
107-
}
108-
acknowledgeMessage();
109-
}
110-
};
111-
}
65+
super.open(configuration);
11266
}
11367

114-
private void acknowledgeMessage() {
115-
if (flushOnCheckpoint) {
116-
synchronized (pendingRecordsLock) {
117-
pendingRecords--;
118-
if (pendingRecords == 0) {
119-
pendingRecordsLock.notifyAll();
120-
}
121-
}
122-
}
123-
}
68+
12469
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerFlinkKafkaProducer011.java

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -62,63 +62,7 @@ public void open(Configuration configuration) {
6262

6363
schema.setCounter(counter);
6464

65-
if (null != flinkKafkaPartitioner) {
66-
if (flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
67-
((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
68-
getPartitionsByTopic(this.defaultTopicId, this.producer));
69-
}
70-
flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
71-
}
72-
73-
74-
// register Kafka metrics to Flink accumulators
75-
if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
76-
Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
77-
78-
if (metrics == null) {
79-
// MapR's Kafka implementation returns null here.
80-
} else {
81-
final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
82-
for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
83-
kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
84-
}
85-
}
86-
}
87-
88-
if (flushOnCheckpoint && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
89-
flushOnCheckpoint = false;
90-
}
91-
92-
if (logFailuresOnly) {
93-
callback = new Callback() {
94-
@Override
95-
public void onCompletion(RecordMetadata metadata, Exception e) {
96-
if (e != null) {
97-
}
98-
acknowledgeMessage();
99-
}
100-
};
101-
} else {
102-
callback = new Callback() {
103-
@Override
104-
public void onCompletion(RecordMetadata metadata, Exception exception) {
105-
if (exception != null && asyncException == null) {
106-
asyncException = exception;
107-
}
108-
acknowledgeMessage();
109-
}
110-
};
111-
}
65+
super.open(configuration);
11266
}
11367

114-
private void acknowledgeMessage() {
115-
if (flushOnCheckpoint) {
116-
synchronized (pendingRecordsLock) {
117-
pendingRecords--;
118-
if (pendingRecords == 0) {
119-
pendingRecordsLock.notifyAll();
120-
}
121-
}
122-
}
123-
}
12468
}

pom.xml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@
1313
<module>kafka09</module>
1414
<module>kafka10</module>
1515
<module>kafka11</module>
16-
<!--<module>mysql</module>-->
17-
<!--<module>hbase</module>-->
18-
<!--<module>elasticsearch5</module>-->
19-
<!--<module>mongo</module>-->
20-
<!--<module>redis5</module>-->
21-
<!--<module>launcher</module>-->
22-
<!--<module>rdb</module>-->
23-
<!--<module>sqlserver</module>-->
24-
<!--<module>oracle</module>-->
25-
<!--<module>cassandra</module>-->
16+
<module>mysql</module>
17+
<module>hbase</module>
18+
<module>elasticsearch5</module>
19+
<module>mongo</module>
20+
<module>redis5</module>
21+
<module>launcher</module>
22+
<module>rdb</module>
23+
<module>sqlserver</module>
24+
<module>oracle</module>
25+
<module>cassandra</module>
2626
</modules>
2727

2828

0 commit comments

Comments
 (0)