Skip to content

Commit c494a58

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_4.0.x_30258' into 1.8_release_4.0.x
2 parents 65eee9e + 93df3e9 commit c494a58

File tree

6 files changed

+89
-6
lines changed

6 files changed

+89
-6
lines changed

flinkx-kafka/flinkx-kafka-writer/src/main/java/com/dtstack/flinkx/kafka/writer/KafkaOutputFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class KafkaOutputFormat extends KafkaBaseOutputFormat {
4242

4343
@Override
4444
public void configure(Configuration parameters) {
45+
super.configure(parameters);
4546
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
4647
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
4748
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 86400000);

flinkx-kafka09/flinkx-kafka09-writer/src/main/java/com/dtstack/flinkx/kafka09/writer/Kafka09OutputFormat.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package com.dtstack.flinkx.kafka09.writer;
1919

2020
import com.dtstack.flinkx.kafkabase.Formatter;
21+
import com.dtstack.flinkx.kafkabase.writer.AddressUtil;
2122
import com.dtstack.flinkx.kafkabase.writer.KafkaBaseOutputFormat;
22-
import com.dtstack.flinkx.util.ExceptionUtil;
2323
import org.apache.flink.configuration.Configuration;
2424
import org.apache.kafka.clients.producer.KafkaProducer;
2525
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -38,12 +38,12 @@ public class Kafka09OutputFormat extends KafkaBaseOutputFormat {
3838

3939
private String encoding;
4040
private String brokerList;
41-
private transient KafkaProducer<String,String> producer;
41+
private transient KafkaProducer<String, String> producer;
4242
private HeartBeatController heartBeatController;
4343

4444
@Override
4545
public void configure(Configuration parameters) {
46-
props.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
46+
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName());
4747
props.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName());
4848
props.put("producer.type", "sync");
4949
props.put("compression.codec", "none");
@@ -59,17 +59,25 @@ public void configure(Configuration parameters) {
5959
props.put("metadata.broker.list", brokerList);
6060
props.put("bootstrap.servers", brokerList);
6161
producer = new KafkaProducer<>(props);
62+
63+
LOG.info("brokerList {}", brokerList);
64+
String broker = brokerList.split(",")[0];
65+
String[] split = broker.split(":");
66+
67+
if (split.length != 2 || !AddressUtil.telnet(split[0], Integer.parseInt(split[1]))) {
68+
throw new RuntimeException("telnet error,brokerList" + brokerList);
69+
}
6270
}
6371

6472
@Override
6573
protected void emit(Map event) throws IOException {
6674
heartBeatController.acquire();
6775
String tp = Formatter.format(event, topic, timezone);
6876
producer.send(new ProducerRecord<>(tp, event.toString(), objectMapper.writeValueAsString(event)), (metadata, exception) -> {
69-
if(Objects.nonNull(exception)){
70-
LOG.warn("kafka writeSingleRecordInternal error:{}", exception.getMessage(),exception);
77+
if (Objects.nonNull(exception)) {
78+
LOG.warn("kafka writeSingleRecordInternal error:{}", exception.getMessage(), exception);
7179
heartBeatController.onFailed(exception);
72-
}else{
80+
} else {
7381
heartBeatController.onSuccess();
7482
}
7583
});

flinkx-kafka10/flinkx-kafka10-writer/src/main/java/com/dtstack/flinkx/kafka10/writer/Kafka10OutputFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class Kafka10OutputFormat extends KafkaBaseOutputFormat {
4141

4242
@Override
4343
public void configure(Configuration parameters) {
44+
super.configure(parameters);
4445
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
4546
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
4647
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 86400000);

flinkx-kafka11/flinkx-kafka11-writer/src/main/java/com/dtstack/flinkx/kafka11/writer/Kafka11OutputFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class Kafka11OutputFormat extends KafkaBaseOutputFormat {
4141

4242
@Override
4343
public void configure(Configuration parameters) {
44+
super.configure(parameters);
4445
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
4546
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
4647
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 86400000);
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flinkx.kafkabase.writer;
20+
21+
import org.apache.commons.net.telnet.TelnetClient;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.net.InetAddress;
26+
27+
/**
28+
* @author jiangbo
29+
* @date 2020/3/2
30+
*/
31+
public class AddressUtil {
32+
33+
private static Logger logger = LoggerFactory.getLogger(AddressUtil.class);
34+
35+
public static boolean telnet(String ip,int port){
36+
TelnetClient client = null;
37+
try{
38+
client = new TelnetClient();
39+
client.setConnectTimeout(3000);
40+
client.connect(ip,port);
41+
return true;
42+
}catch(Exception e){
43+
return false;
44+
} finally {
45+
try {
46+
if (client != null){
47+
client.disconnect();
48+
}
49+
} catch (Exception e){
50+
logger.error("{}",e);
51+
}
52+
}
53+
}
54+
55+
public static boolean ping(String ip){
56+
try{
57+
return InetAddress.getByName(ip).isReachable(3000);
58+
}catch(Exception e){
59+
return false;
60+
}
61+
}
62+
}

flinkx-kb/flinkx-kb-writer/src/main/java/com/dtstack/flinkx/kafkabase/writer/KafkaBaseOutputFormat.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ public class KafkaBaseOutputFormat extends BaseRichOutputFormat {
5757

5858
@Override
5959
public void configure(Configuration parameters) {
60+
if(producerSettings != null && producerSettings.get("bootstrap.servers") != null){
61+
String brokerList = producerSettings.get("bootstrap.servers");
62+
LOG.info("brokerList->{}",brokerList);
63+
String broker = brokerList.split(",")[0];
64+
String[] split = broker.split(":");
65+
66+
if( split.length !=2 ||!AddressUtil.telnet(split[0], Integer.parseInt(split[1]))){
67+
throw new RuntimeException("telnet error,brokerList"+brokerList+" please check dataSource is running");
68+
}
69+
}
6070
}
6171

6272
@Override

0 commit comments

Comments
 (0)