Skip to content

Commit 65eee9e

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_4.0.x_30258' into 1.8_release_4.0.x
2 parents 7e2f9c3 + 92eb304 commit 65eee9e

File tree

11 files changed

+241
-22
lines changed

11 files changed

+241
-22
lines changed

flinkx-binlog/flinkx-binlog-reader/src/main/java/com/dtstack/flinkx/binlog/reader/BinlogEventSink.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31+
import java.io.IOException;
3132
import java.net.InetSocketAddress;
3233
import java.util.Collections;
3334
import java.util.HashMap;
@@ -45,7 +46,7 @@ public class BinlogEventSink extends AbstractCanalLifeCycle implements com.aliba
4546

4647
private BinlogInputFormat format;
4748

48-
private BlockingQueue<Row> queue;
49+
private BlockingQueue<Map<String,Object>> queue;
4950

5051
private boolean pavingData;
5152

@@ -123,7 +124,7 @@ private void processRowChange(CanalEntry.RowChange rowChange, String schema, Str
123124
}
124125

125126
try {
126-
queue.put(Row.of(message));
127+
queue.put(message);
127128
} catch (InterruptedException e) {
128129
LOG.error("takeEvent interrupted message:{} error:{}", message, e);
129130
}
@@ -147,16 +148,30 @@ public void setPavingData(boolean pavingData) {
147148
this.pavingData = pavingData;
148149
}
149150

150-
public Row takeEvent() {
151+
public Row takeEvent() throws IOException {
151152
Row row = null;
152153
try {
153-
row = queue.take();
154+
Map<String, Object> map = queue.take();
155+
//@see com.dtstack.flinkx.binlog.reader.HeartBeatController.onFailed 检测到异常之后 会添加key为e的错误数据
156+
if(map.size() == 1 && map.containsKey("e")){
157+
throw new RuntimeException((String) map.get("e"));
158+
}else{
159+
row = Row.of(map);
160+
}
154161
} catch (InterruptedException e) {
155162
LOG.error("takeEvent interrupted error:{}", ExceptionUtil.getErrorMessage(e));
156163
}
157164
return row;
158165
}
159166

167+
public void processEvent(Map<String, Object> event) {
168+
try {
169+
queue.put(event);
170+
} catch (InterruptedException e) {
171+
LOG.error("takeEvent interrupted event:{} error:{}", event, ExceptionUtil.getErrorMessage(e));
172+
}
173+
}
174+
160175
@Override
161176
public void interrupt() {
162177
LOG.info("BinlogEventSink is interrupted");

flinkx-binlog/flinkx-binlog-reader/src/main/java/com/dtstack/flinkx/binlog/reader/BinlogInputFormat.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,10 @@ protected void openInternal(InputSplit inputSplit) throws IOException {
205205
controller.setEventSink(sink);
206206

207207
controller.setLogPositionManager(new BinlogPositionManager(this));
208-
208+
//添加connection心跳回调处理器
209+
HeartBeatController heartBeatController =new HeartBeatController();
210+
heartBeatController.setBinlogEventSink(binlogEventSink);
211+
controller.setHaController( heartBeatController);
209212
EntryPosition startPosition = findStartPosition();
210213
if (startPosition != null) {
211214
controller.setMasterPosition(startPosition);
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flinkx.binlog.reader;
19+
20+
import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
21+
import com.alibaba.otter.canal.parse.ha.CanalHAController;
22+
import com.alibaba.otter.canal.parse.ha.HeartBeatHAController;
23+
import com.alibaba.otter.canal.parse.inbound.HeartBeatCallback;
24+
import com.dtstack.flinkx.util.ExceptionUtil;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.util.Collections;
29+
30+
/**
31+
* HeartBeatController
32+
*
33+
* @author by [email protected]
34+
* @Date 2020/9/11
35+
*/
36+
public class HeartBeatController extends AbstractCanalLifeCycle implements CanalHAController, HeartBeatCallback {
37+
private static final Logger logger = LoggerFactory.getLogger(HeartBeatHAController.class);
38+
// default 10 times 心跳执行是3秒一次,连续错误3次之后,关闭任务,即宕机后 9s断开连接
39+
private int detectingRetryTimes = 3;
40+
private int failedTimes = 0;
41+
private BinlogEventSink binlogEventSink;
42+
43+
public HeartBeatController() {
44+
45+
}
46+
47+
public void onSuccess(long costTime) {
48+
failedTimes = 0;
49+
}
50+
51+
@Override
52+
public void onFailed(Throwable e) {
53+
failedTimes++;
54+
// 检查一下是否超过失败次数
55+
synchronized (this) {
56+
String msg = String.format("HeartBeat failed %s times,please check your source is working,error info->%s", failedTimes, ExceptionUtil.getErrorMessage(e));
57+
logger.error(msg);
58+
if (failedTimes >= detectingRetryTimes) {
59+
binlogEventSink.processEvent(Collections.singletonMap("e", msg));
60+
}
61+
}
62+
}
63+
64+
public void setBinlogEventSink(BinlogEventSink binlogEventSink) {
65+
this.binlogEventSink = binlogEventSink;
66+
}
67+
}
68+

flinkx-kafka/flinkx-kafka-writer/src/main/java/com/dtstack/flinkx/kafka/writer/KafkaOutputFormat.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.dtstack.flinkx.kafkabase.Formatter;
2222
import com.dtstack.flinkx.kafkabase.writer.KafkaBaseOutputFormat;
23+
import com.dtstack.flinkx.util.ExceptionUtil;
2324
import org.apache.flink.configuration.Configuration;
2425
import org.apache.kafka.clients.producer.KafkaProducer;
2526
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -28,6 +29,7 @@
2829

2930
import java.io.IOException;
3031
import java.util.Map;
32+
import java.util.Objects;
3133

3234
/**
3335
* Date: 2019/11/21
@@ -54,7 +56,13 @@ public void configure(Configuration parameters) {
5456
@Override
5557
protected void emit(Map event) throws IOException {
5658
String tp = Formatter.format(event, topic, timezone);
57-
producer.send(new ProducerRecord<>(tp, event.toString(), objectMapper.writeValueAsString(event)));
59+
producer.send(new ProducerRecord<>(tp, event.toString(), objectMapper.writeValueAsString(event)), (metadata, exception) -> {
60+
if(Objects.nonNull(exception)){
61+
String errorMessage = String.format("send data failed,data 【%s】 ,error info %s",event,ExceptionUtil.getErrorMessage(exception));
62+
LOG.warn(errorMessage);
63+
throw new RuntimeException(errorMessage);
64+
}
65+
});
5866
}
5967

6068
@Override
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flinkx.kafka09.writer;
19+
20+
import com.dtstack.flinkx.util.ExceptionUtil;
21+
import org.apache.kafka.common.errors.TimeoutException;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.io.Serializable;
26+
import java.util.Objects;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
/**
30+
* HeartBeatController
31+
*
32+
* @author by [email protected]
33+
* @Date 2020/9/11
34+
*/
35+
public class HeartBeatController implements Serializable {
36+
37+
private static final Logger logger = LoggerFactory.getLogger(HeartBeatController.class);
38+
private int detectingRetryTimes = 3;
39+
private AtomicInteger failedTimes = new AtomicInteger(0);
40+
private Throwable e;
41+
42+
public HeartBeatController() {
43+
44+
}
45+
46+
public HeartBeatController(int detectingRetryTimes, AtomicInteger failedTimes) {
47+
this.detectingRetryTimes = detectingRetryTimes;
48+
this.failedTimes = failedTimes;
49+
}
50+
51+
public void onSuccess() {
52+
failedTimes.set(0);
53+
this.e=null;
54+
}
55+
56+
public void onFailed(Throwable e) {
57+
failedTimes.incrementAndGet();
58+
this.e = e;
59+
60+
}
61+
62+
public void acquire() {
63+
if (Objects.isNull(e)) {
64+
return;
65+
}
66+
//连续发送3次数据错误或出现连接异常
67+
if (failedTimes.get() >= detectingRetryTimes || e instanceof TimeoutException ) {
68+
String message = "Error data is received 3 times continuously or datasource has error" + ExceptionUtil.getErrorMessage(e);
69+
logger.error(message);
70+
throw new RuntimeException(message, e);
71+
}
72+
}
73+
}
74+

flinkx-kafka09/flinkx-kafka09-writer/src/main/java/com/dtstack/flinkx/kafka09/writer/Kafka09OutputFormat.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919

2020
import com.dtstack.flinkx.kafkabase.Formatter;
2121
import com.dtstack.flinkx.kafkabase.writer.KafkaBaseOutputFormat;
22-
import kafka.javaapi.producer.Producer;
23-
import kafka.producer.KeyedMessage;
24-
import kafka.producer.ProducerConfig;
22+
import com.dtstack.flinkx.util.ExceptionUtil;
2523
import org.apache.flink.configuration.Configuration;
24+
import org.apache.kafka.clients.producer.KafkaProducer;
25+
import org.apache.kafka.clients.producer.ProducerRecord;
26+
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
2627

2728
import java.io.IOException;
2829
import java.util.Map;
30+
import java.util.Objects;
2931

3032
/**
3133
* @company: www.dtstack.com
@@ -36,32 +38,41 @@ public class Kafka09OutputFormat extends KafkaBaseOutputFormat {
3638

3739
private String encoding;
3840
private String brokerList;
39-
private transient Producer<String, byte[]> producer;
41+
private transient KafkaProducer<String,String> producer;
42+
private HeartBeatController heartBeatController;
4043

4144
@Override
4245
public void configure(Configuration parameters) {
43-
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
44-
props.put("value.serializer.class", "kafka.serializer.StringEncoder");
45-
props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
46+
props.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
47+
props.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName());
4648
props.put("producer.type", "sync");
4749
props.put("compression.codec", "none");
4850
props.put("request.required.acks", "1");
4951
props.put("batch.num.messages", "1024");
52+
props.put("partitioner.class", DefaultPartitioner.class.getName());
53+
5054
props.put("client.id", "");
5155

5256
if (producerSettings != null) {
5357
props.putAll(producerSettings);
5458
}
5559
props.put("metadata.broker.list", brokerList);
56-
57-
ProducerConfig producerConfig = new ProducerConfig(props);
58-
producer = new Producer<>(producerConfig);
60+
props.put("bootstrap.servers", brokerList);
61+
producer = new KafkaProducer<>(props);
5962
}
6063

6164
@Override
6265
protected void emit(Map event) throws IOException {
66+
heartBeatController.acquire();
6367
String tp = Formatter.format(event, topic, timezone);
64-
producer.send(new KeyedMessage<>(tp, event.toString(), objectMapper.writeValueAsString(event).getBytes(encoding)));
68+
producer.send(new ProducerRecord<>(tp, event.toString(), objectMapper.writeValueAsString(event)), (metadata, exception) -> {
69+
if(Objects.nonNull(exception)){
70+
LOG.warn("kafka writeSingleRecordInternal error:{}", exception.getMessage(),exception);
71+
heartBeatController.onFailed(exception);
72+
}else{
73+
heartBeatController.onSuccess();
74+
}
75+
});
6576
}
6677

6778
@Override
@@ -77,4 +88,8 @@ public void setEncoding(String encoding) {
7788
public void setBrokerList(String brokerList) {
7889
this.brokerList = brokerList;
7990
}
91+
92+
public void setHeartBeatController(HeartBeatController heartBeatController) {
93+
this.heartBeatController = heartBeatController;
94+
}
8095
}

flinkx-kafka09/flinkx-kafka09-writer/src/main/java/com/dtstack/flinkx/kafka09/writer/Kafka09Writer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
5858
format.setBrokerList(brokerList);
5959
format.setProducerSettings(producerSettings);
6060
format.setRestoreConfig(restoreConfig);
61+
format.setHeartBeatController(new HeartBeatController());
6162

6263
return createOutput(dataSet, format);
6364
}

flinkx-kafka10/flinkx-kafka10-writer/src/main/java/com/dtstack/flinkx/kafka10/writer/Kafka10OutputFormat.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.dtstack.flinkx.kafkabase.Formatter;
2121
import com.dtstack.flinkx.kafkabase.writer.KafkaBaseOutputFormat;
22+
import com.dtstack.flinkx.util.ExceptionUtil;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.kafka.clients.producer.KafkaProducer;
2425
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -27,6 +28,7 @@
2728

2829
import java.io.IOException;
2930
import java.util.Map;
31+
import java.util.Objects;
3032

3133
/**
3234
* @company: www.dtstack.com
@@ -53,7 +55,13 @@ public void configure(Configuration parameters) {
5355
@Override
5456
protected void emit(Map event) throws IOException {
5557
String tp = Formatter.format(event, topic, timezone);
56-
producer.send(new ProducerRecord<>(tp, event.toString(), objectMapper.writeValueAsString(event)));
58+
producer.send(new ProducerRecord<>(tp, event.toString(), objectMapper.writeValueAsString(event)), (metadata, exception) -> {
59+
if(Objects.nonNull(exception)){
60+
String errorMessage = String.format("send data failed,data 【%s】 ,error info %s",event,ExceptionUtil.getErrorMessage(exception));
61+
LOG.warn(errorMessage);
62+
throw new RuntimeException(errorMessage);
63+
}
64+
});
5765
}
5866

5967
@Override

flinkx-kafka11/flinkx-kafka11-writer/src/main/java/com/dtstack/flinkx/kafka11/writer/Kafka11OutputFormat.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.dtstack.flinkx.kafkabase.Formatter;
2121
import com.dtstack.flinkx.kafkabase.writer.KafkaBaseOutputFormat;
22+
import com.dtstack.flinkx.util.ExceptionUtil;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.kafka.clients.producer.KafkaProducer;
2425
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -27,6 +28,7 @@
2728

2829
import java.io.IOException;
2930
import java.util.Map;
31+
import java.util.Objects;
3032

3133
/**
3234
* @company: www.dtstack.com
@@ -53,7 +55,13 @@ public void configure(Configuration parameters) {
5355
@Override
5456
protected void emit(Map event) throws IOException {
5557
String tp = Formatter.format(event, topic, timezone);
56-
producer.send(new ProducerRecord<>(tp, event.toString(), objectMapper.writeValueAsString(event)));
58+
producer.send(new ProducerRecord<>(tp, event.toString(), objectMapper.writeValueAsString(event)), (metadata, exception) -> {
59+
if(Objects.nonNull(exception)){
60+
String errorMessage = String.format("send data failed,data 【%s】 ,error info %s",event,ExceptionUtil.getErrorMessage(exception));
61+
LOG.warn(errorMessage);
62+
throw new RuntimeException(errorMessage);
63+
}
64+
});
5765
}
5866

5967
@Override

0 commit comments

Comments
 (0)