Skip to content

Commit 4322150

Browse files
committed
Merge remote-tracking branch 'origin/1.10_release_4.0.x' into temp_1feat_1.10_4.0_x_restfulapi
2 parents 973c094 + f5d0c52 commit 4322150

File tree

128 files changed

+3471
-738
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

128 files changed

+3471
-738
lines changed

flinkx-binlog/flinkx-binlog-reader/src/main/java/com/dtstack/flinkx/binlog/reader/BinlogEventSink.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31+
import java.io.IOException;
3132
import java.net.InetSocketAddress;
3233
import java.util.Collections;
3334
import java.util.HashMap;
@@ -45,7 +46,7 @@ public class BinlogEventSink extends AbstractCanalLifeCycle implements com.aliba
4546

4647
private BinlogInputFormat format;
4748

48-
private BlockingQueue<Row> queue;
49+
private BlockingQueue<Map<String,Object>> queue;
4950

5051
private boolean pavingData;
5152

@@ -123,7 +124,7 @@ private void processRowChange(CanalEntry.RowChange rowChange, String schema, Str
123124
}
124125

125126
try {
126-
queue.put(Row.of(message));
127+
queue.put(message);
127128
} catch (InterruptedException e) {
128129
LOG.error("takeEvent interrupted message:{} error:{}", message, e);
129130
}
@@ -147,16 +148,30 @@ public void setPavingData(boolean pavingData) {
147148
this.pavingData = pavingData;
148149
}
149150

150-
public Row takeEvent() {
151+
public Row takeEvent() throws IOException {
151152
Row row = null;
152153
try {
153-
row = queue.take();
154+
Map<String, Object> map = queue.take();
155+
//@see com.dtstack.flinkx.binlog.reader.HeartBeatController.onFailed 检测到异常之后 会添加key为e的错误数据
156+
if(map.size() == 1 && map.containsKey("e")){
157+
throw new RuntimeException((String) map.get("e"));
158+
}else{
159+
row = Row.of(map);
160+
}
154161
} catch (InterruptedException e) {
155162
LOG.error("takeEvent interrupted error:{}", ExceptionUtil.getErrorMessage(e));
156163
}
157164
return row;
158165
}
159166

167+
public void processEvent(Map<String, Object> event) {
168+
try {
169+
queue.put(event);
170+
} catch (InterruptedException e) {
171+
LOG.error("takeEvent interrupted event:{} error:{}", event, ExceptionUtil.getErrorMessage(e));
172+
}
173+
}
174+
160175
@Override
161176
public void interrupt() {
162177
LOG.info("BinlogEventSink is interrupted");

flinkx-binlog/flinkx-binlog-reader/src/main/java/com/dtstack/flinkx/binlog/reader/BinlogInputFormat.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,10 @@ protected void openInternal(InputSplit inputSplit) throws IOException {
205205
controller.setEventSink(sink);
206206

207207
controller.setLogPositionManager(new BinlogPositionManager(this));
208-
208+
//添加connection心跳回调处理器
209+
HeartBeatController heartBeatController =new HeartBeatController();
210+
heartBeatController.setBinlogEventSink(binlogEventSink);
211+
controller.setHaController( heartBeatController);
209212
EntryPosition startPosition = findStartPosition();
210213
if (startPosition != null) {
211214
controller.setMasterPosition(startPosition);
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flinkx.binlog.reader;
19+
20+
import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
21+
import com.alibaba.otter.canal.parse.ha.CanalHAController;
22+
import com.alibaba.otter.canal.parse.ha.HeartBeatHAController;
23+
import com.alibaba.otter.canal.parse.inbound.HeartBeatCallback;
24+
import com.dtstack.flinkx.util.ExceptionUtil;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.util.Collections;
29+
30+
/**
31+
* HeartBeatController
32+
*
33+
* @author by [email protected]
34+
* @Date 2020/9/11
35+
*/
36+
public class HeartBeatController extends AbstractCanalLifeCycle implements CanalHAController, HeartBeatCallback {
37+
private static final Logger logger = LoggerFactory.getLogger(HeartBeatHAController.class);
38+
// default 10 times 心跳执行是3秒一次,连续错误3次之后,关闭任务,即宕机后 9s断开连接
39+
private int detectingRetryTimes = 3;
40+
private int failedTimes = 0;
41+
private BinlogEventSink binlogEventSink;
42+
43+
public HeartBeatController() {
44+
45+
}
46+
47+
public void onSuccess(long costTime) {
48+
failedTimes = 0;
49+
}
50+
51+
@Override
52+
public void onFailed(Throwable e) {
53+
failedTimes++;
54+
// 检查一下是否超过失败次数
55+
synchronized (this) {
56+
String msg = String.format("HeartBeat failed %s times,please check your source is working,error info->%s", failedTimes, ExceptionUtil.getErrorMessage(e));
57+
logger.error(msg);
58+
if (failedTimes >= detectingRetryTimes) {
59+
binlogEventSink.processEvent(Collections.singletonMap("e", msg));
60+
}
61+
}
62+
}
63+
64+
public void setBinlogEventSink(BinlogEventSink binlogEventSink) {
65+
this.binlogEventSink = binlogEventSink;
66+
}
67+
}
68+

flinkx-binlog/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@
2323
<artifactId>flinkx-core</artifactId>
2424
<version>1.6</version>
2525
<scope>provided</scope>
26+
<exclusions>
27+
<exclusion>
28+
<groupId>ch.qos.logback</groupId>
29+
<artifactId>logback-classic</artifactId>
30+
</exclusion>
31+
<exclusion>
32+
<groupId>ch.qos.logback</groupId>
33+
<artifactId>logback-core</artifactId>
34+
</exclusion>
35+
</exclusions>
2636
</dependency>
2737
</dependencies>
2838

flinkx-core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626

2727
<dependency>
2828
<groupId>org.slf4j</groupId>
29-
<artifactId>slf4j-api</artifactId>
30-
<version>1.7.20</version>
29+
<artifactId>slf4j-log4j12</artifactId>
30+
<version>1.7.10</version>
3131
</dependency>
3232

3333
<dependency>

flinkx-core/src/main/java/com/dtstack/flinkx/Main.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ private static void speedTest(DataTransferConfig config) {
198198
} else if (WRITER.equalsIgnoreCase(testConfig.getSpeedTest())){
199199
ContentConfig contentConfig = config.getJob().getContent().get(0);
200200
contentConfig.getReader().setName(STREAM_READER);
201+
}else {
202+
return;
201203
}
202204

203205
config.getJob().getSetting().getSpeed().setBytes(-1);

flinkx-core/src/main/java/com/dtstack/flinkx/config/AbstractConfig.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
package com.dtstack.flinkx.config;
2020

21+
import com.dtstack.flinkx.util.GsonUtil;
2122
import com.google.gson.internal.LinkedTreeMap;
2223

2324
import java.io.Serializable;
2425
import java.math.BigDecimal;
2526
import java.math.BigInteger;
2627
import java.util.HashMap;
2728
import java.util.Map;
29+
import java.util.Properties;
2830

2931
/**
3032
* Abstract Config
@@ -129,7 +131,7 @@ public int getIntVal(String key, int defaultValue) {
129131
if(ret instanceof BigDecimal) {
130132
return ((BigDecimal)ret).intValue();
131133
}
132-
throw new RuntimeException("can't cast " + key + " from " + ret.getClass().getName() + " to Integer");
134+
throw new RuntimeException(String.format("cant't %s from %s to int, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
133135
}
134136

135137
public long getLongVal(String key, long defaultValue) {
@@ -158,7 +160,7 @@ public long getLongVal(String key, long defaultValue) {
158160
if(ret instanceof BigDecimal) {
159161
return ((BigDecimal)ret).longValue();
160162
}
161-
throw new RuntimeException("can't cast " + key + " from " + ret.getClass().getName() + " to Long");
163+
throw new RuntimeException(String.format("cant't %s from %s to long, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
162164
}
163165

164166
public double getDoubleVal(String key, double defaultValue) {
@@ -187,7 +189,7 @@ public double getDoubleVal(String key, double defaultValue) {
187189
if (ret instanceof BigDecimal) {
188190
return ((BigDecimal) ret).doubleValue();
189191
}
190-
throw new RuntimeException("can't cast " + key + " from " + ret.getClass().getName() + " to Long");
192+
throw new RuntimeException(String.format("cant't %s from %s to double, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
191193
}
192194

193195

@@ -199,7 +201,33 @@ public boolean getBooleanVal(String key, boolean defaultValue) {
199201
if (ret instanceof Boolean) {
200202
return (Boolean) ret;
201203
}
202-
throw new RuntimeException("can't cast " + key + " from " + ret.getClass().getName() + " to Long");
204+
throw new RuntimeException(String.format("cant't %s from %s to boolean, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
205+
}
206+
207+
/**
208+
* 从指定key中获取Properties配置信息
209+
* @param key
210+
* @param p
211+
* @return
212+
*/
213+
@SuppressWarnings("unchecked")
214+
public Properties getProperties(String key, Properties p ){
215+
Object ret = internalMap.get(key);
216+
if(p == null){
217+
p = new Properties();
218+
}
219+
if (ret == null) {
220+
return p;
221+
}
222+
if(ret instanceof Map){
223+
Map<String, Object> map = (Map<String, Object>) ret;
224+
for (Map.Entry<String, Object> entry : map.entrySet()) {
225+
p.setProperty(entry.getKey(), String.valueOf(entry.getValue()));
226+
}
227+
return p;
228+
}else{
229+
throw new RuntimeException(String.format("cant't %s from %s to map, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
230+
}
203231
}
204232

205233
}

flinkx-core/src/main/java/com/dtstack/flinkx/config/ReaderConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public void setColumn(List column) {
9595
public class ConnectionConfig extends AbstractConfig {
9696

9797
public static final String KEY_TABLE_LIST = "table";
98+
public static final String KEY_SCHEMA = "schema";
9899
public static final String KEY_JDBC_URL_LIST = "jdbcUrl";
99100
public static final String KEY_JDBC_USERNAME = "username";
100101
public static final String KEY_JDBC_PASSWORD = "password";
@@ -111,6 +112,14 @@ public void setTable(List<String> table) {
111112
setVal(KEY_TABLE_LIST, table);
112113
}
113114

115+
public String getSchema(){
116+
return (String) getVal(KEY_SCHEMA);
117+
}
118+
119+
public void setSchema(String schema){
120+
setVal(KEY_SCHEMA, schema);
121+
}
122+
114123
public List<String> getJdbcUrl() {
115124
return (List<String>) getVal(KEY_JDBC_URL_LIST);
116125
}

flinkx-core/src/main/java/com/dtstack/flinkx/config/WriterConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,11 @@ public void setConnection(List<ConnectionConfig> connection) {
9696
public class ConnectionConfig extends AbstractConfig {
9797
private static final String KEY_JDBC_URL = "jdbcUrl";
9898
private static final String KEY_TABLE_LIST = "table";
99+
public static final String KEY_SCHEMA = "schema";
99100

100101
private String jdbcUrl;
101102
private List<String> table;
103+
private String schema;
102104

103105
public ConnectionConfig(Map<String, Object> map) {
104106
super(map);
@@ -109,6 +111,7 @@ public ConnectionConfig(Map<String, Object> map) {
109111
jdbcUrl = ((List) jdbcUrlObj).get(0).toString();
110112
}
111113
table = (List<String>) getVal(KEY_TABLE_LIST);
114+
schema = (String) getVal(KEY_SCHEMA);
112115
}
113116

114117
public String getJdbcUrl() {
@@ -126,6 +129,10 @@ public List<String> getTable() {
126129
public void setTable(List<String> table) {
127130
this.table = table;
128131
}
132+
133+
public String getSchema(){return schema;}
134+
135+
public void setSchema(String schema){this.schema = schema;}
129136
}
130137

131138
}

flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConstantValue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ConstantValue {
2727

2828
public static final String STAR_SYMBOL = "*";
2929
public static final String POINT_SYMBOL = ".";
30+
public static final String TWO_POINT_SYMBOL = "..";
3031
public static final String EQUAL_SYMBOL = "=";
3132
public static final String SINGLE_QUOTE_MARK_SYMBOL = "'";
3233
public static final String DOUBLE_QUOTE_MARK_SYMBOL = "\"";
@@ -38,6 +39,10 @@ public class ConstantValue {
3839
public static final String LEFT_PARENTHESIS_SYMBOL = "(";
3940
public static final String RIGHT_PARENTHESIS_SYMBOL = ")";
4041

42+
43+
public static final String DATA_TYPE_UNSIGNED = "UNSIGNED";
44+
45+
4146
public static final String KEY_HTTP = "http";
4247

4348
public static final String PROTOCOL_HTTP = "http://";

0 commit comments

Comments
 (0)