Skip to content

Commit 59d2099

Browse files
committed
[feat] add restapi
2 parents 5c0be9e + 170b772 commit 59d2099

File tree

116 files changed

+2639
-326
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

116 files changed

+2639
-326
lines changed

flinkx-binlog/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@
2323
<artifactId>flinkx-core</artifactId>
2424
<version>1.6</version>
2525
<scope>provided</scope>
26+
<exclusions>
27+
<exclusion>
28+
<groupId>ch.qos.logback</groupId>
29+
<artifactId>logback-classic</artifactId>
30+
</exclusion>
31+
<exclusion>
32+
<groupId>ch.qos.logback</groupId>
33+
<artifactId>logback-core</artifactId>
34+
</exclusion>
35+
</exclusions>
2636
</dependency>
2737
</dependencies>
2838

flinkx-core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626

2727
<dependency>
2828
<groupId>org.slf4j</groupId>
29-
<artifactId>slf4j-api</artifactId>
30-
<version>1.7.20</version>
29+
<artifactId>slf4j-log4j12</artifactId>
30+
<version>1.7.10</version>
3131
</dependency>
3232

3333
<dependency>

flinkx-core/src/main/java/com/dtstack/flinkx/config/AbstractConfig.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
package com.dtstack.flinkx.config;
2020

21+
import com.dtstack.flinkx.util.GsonUtil;
2122
import com.google.gson.internal.LinkedTreeMap;
2223

2324
import java.io.Serializable;
2425
import java.math.BigDecimal;
2526
import java.math.BigInteger;
2627
import java.util.HashMap;
2728
import java.util.Map;
29+
import java.util.Properties;
2830

2931
/**
3032
* Abstract Config
@@ -129,7 +131,7 @@ public int getIntVal(String key, int defaultValue) {
129131
if(ret instanceof BigDecimal) {
130132
return ((BigDecimal)ret).intValue();
131133
}
132-
throw new RuntimeException("can't cast " + key + " from " + ret.getClass().getName() + " to Integer");
134+
throw new RuntimeException(String.format("cant't %s from %s to int, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
133135
}
134136

135137
public long getLongVal(String key, long defaultValue) {
@@ -158,7 +160,7 @@ public long getLongVal(String key, long defaultValue) {
158160
if(ret instanceof BigDecimal) {
159161
return ((BigDecimal)ret).longValue();
160162
}
161-
throw new RuntimeException("can't cast " + key + " from " + ret.getClass().getName() + " to Long");
163+
throw new RuntimeException(String.format("cant't %s from %s to long, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
162164
}
163165

164166
public double getDoubleVal(String key, double defaultValue) {
@@ -187,7 +189,7 @@ public double getDoubleVal(String key, double defaultValue) {
187189
if (ret instanceof BigDecimal) {
188190
return ((BigDecimal) ret).doubleValue();
189191
}
190-
throw new RuntimeException("can't cast " + key + " from " + ret.getClass().getName() + " to Long");
192+
throw new RuntimeException(String.format("cant't %s from %s to double, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
191193
}
192194

193195

@@ -199,7 +201,33 @@ public boolean getBooleanVal(String key, boolean defaultValue) {
199201
if (ret instanceof Boolean) {
200202
return (Boolean) ret;
201203
}
202-
throw new RuntimeException("can't cast " + key + " from " + ret.getClass().getName() + " to Long");
204+
throw new RuntimeException(String.format("cant't %s from %s to boolean, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
205+
}
206+
207+
/**
208+
* 从指定key中获取Properties配置信息
209+
* @param key
210+
* @param p
211+
* @return
212+
*/
213+
@SuppressWarnings("unchecked")
214+
public Properties getProperties(String key, Properties p ){
215+
Object ret = internalMap.get(key);
216+
if(p == null){
217+
p = new Properties();
218+
}
219+
if (ret == null) {
220+
return p;
221+
}
222+
if(ret instanceof Map){
223+
Map<String, Object> map = (Map<String, Object>) ret;
224+
for (Map.Entry<String, Object> entry : map.entrySet()) {
225+
p.setProperty(entry.getKey(), String.valueOf(entry.getValue()));
226+
}
227+
return p;
228+
}else{
229+
throw new RuntimeException(String.format("cant't %s from %s to map, internalMap = %s", key, ret.getClass().getName(), GsonUtil.GSON.toJson(internalMap)));
230+
}
203231
}
204232

205233
}

flinkx-core/src/main/java/com/dtstack/flinkx/constants/ConstantValue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ConstantValue {
2727

2828
public static final String STAR_SYMBOL = "*";
2929
public static final String POINT_SYMBOL = ".";
30+
public static final String TWO_POINT_SYMBOL = "..";
3031
public static final String EQUAL_SYMBOL = "=";
3132
public static final String SINGLE_QUOTE_MARK_SYMBOL = "'";
3233
public static final String DOUBLE_QUOTE_MARK_SYMBOL = "\"";
@@ -38,6 +39,10 @@ public class ConstantValue {
3839
public static final String LEFT_PARENTHESIS_SYMBOL = "(";
3940
public static final String RIGHT_PARENTHESIS_SYMBOL = ")";
4041

42+
43+
public static final String DATA_TYPE_UNSIGNED = "UNSIGNED";
44+
45+
4146
public static final String KEY_HTTP = "http";
4247

4348
public static final String PROTOCOL_HTTP = "http://";

flinkx-core/src/main/java/com/dtstack/flinkx/enums/ColumnType.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package com.dtstack.flinkx.enums;
2020

2121
import com.dtstack.flinkx.constants.ConstantValue;
22+
import org.apache.commons.lang3.StringUtils;
2223

2324
import java.util.Arrays;
2425
import java.util.List;
26+
import java.util.Locale;
2527

2628
/**
2729
* Define standard column type for all the readers or writers that do not
@@ -66,6 +68,12 @@ public enum ColumnType {
6668
STRING, VARCHAR, VARCHAR2, CHAR, NVARCHAR, TEXT, KEYWORD, BINARY
6769
);
6870

71+
/**
72+
* 根据字段类型的字符串找出对应的枚举
73+
* 找不到直接报错 IllegalArgumentException
74+
* @param type
75+
* @return
76+
*/
6977
public static ColumnType fromString(String type) {
7078
if(type == null) {
7179
throw new RuntimeException("null ColumnType!");
@@ -75,15 +83,31 @@ public static ColumnType fromString(String type) {
7583
type = type.substring(0, type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL));
7684
}
7785

78-
return valueOf(type.toUpperCase());
86+
type = type.toUpperCase(Locale.ENGLISH);
87+
//为了支持无符号类型 如 int unsigned
88+
if(StringUtils.contains(type,ConstantValue.DATA_TYPE_UNSIGNED)){
89+
type = type.replace(ConstantValue.DATA_TYPE_UNSIGNED,"").trim();
90+
}
91+
return valueOf(type);
7992
}
8093

94+
/**
95+
* 根据字段类型的字符串找到对应的枚举 找不到就直接返回ColumnType.STRING;
96+
* @param type
97+
* @return
98+
*/
8199
public static ColumnType getType(String type){
100+
type = type.toUpperCase(Locale.ENGLISH);
82101
if(type.contains(ConstantValue.LEFT_PARENTHESIS_SYMBOL)){
83102
type = type.substring(0, type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL));
84103
}
85104

86-
if(type.toLowerCase().contains(ColumnType.TIMESTAMP.name().toLowerCase())){
105+
//为了支持无符号类型 如 int unsigned
106+
if(StringUtils.contains(type,ConstantValue.DATA_TYPE_UNSIGNED)){
107+
type = type.replaceAll(ConstantValue.DATA_TYPE_UNSIGNED,"").trim();
108+
}
109+
110+
if(type.contains(ColumnType.TIMESTAMP.name())){
87111
return TIMESTAMP;
88112
}
89113

flinkx-core/src/main/java/com/dtstack/flinkx/latch/MetricLatch.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flinkx.latch;
2020

2121
import com.dtstack.flinkx.constants.ConstantValue;
22+
import com.dtstack.flinkx.util.GsonUtil;
2223
import com.dtstack.flinkx.util.UrlUtil;
2324
import com.google.gson.Gson;
2425
import com.google.gson.internal.LinkedTreeMap;
@@ -76,10 +77,11 @@ private int getIntMetricVal(String requestUrl) {
7677
try(InputStream inputStream = UrlUtil.open(requestUrl)) {
7778
try(Reader rd = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
7879
Map<String,Object> map = gson.fromJson(rd, Map.class);
80+
LOG.info("requestUrl = {}, and return map = {}", requestUrl, GsonUtil.GSON.toJson(map));
7981
List<LinkedTreeMap> userTaskAccumulators = (List<LinkedTreeMap>) map.get("user-task-accumulators");
8082
for(LinkedTreeMap accumulator : userTaskAccumulators) {
8183
if(metricName != null && metricName.equals(accumulator.get("name"))) {
82-
return Integer.valueOf((String )accumulator.get("value"));
84+
return Integer.parseInt((String )accumulator.get("value"));
8385
}
8486
}
8587
} catch (Exception e) {

flinkx-core/src/main/java/com/dtstack/flinkx/metrics/BaseMetric.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void addMetric(String metricName, LongCounter counter, boolean meterView)
5656
metricCounters.put(metricName, counter);
5757
flinkxOutput.gauge(metricName, new SimpleAccumulatorGauge<>(counter));
5858
if (meterView){
59-
flinkxOutput.meter(metricName + Metrics.SUFFIX_RATE, new SimpleLongCounterMeterView(counter, 60));
59+
flinkxOutput.meter(metricName + Metrics.SUFFIX_RATE, new SimpleLongCounterMeterView(counter, 20));
6060
}
6161
}
6262

flinkx-core/src/main/java/com/dtstack/flinkx/options/Options.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ public class Options {
8282
@OptionRequired(description = "plugin load mode, by classpath or shipfile")
8383
private String pluginLoadMode = "shipfile";
8484

85+
@OptionRequired(description = "kerberos krb5conf")
86+
private String krb5conf ;
87+
88+
@OptionRequired(description = "kerberos keytabPath")
89+
private String keytab ;
90+
91+
@OptionRequired(description = "kerberos principal")
92+
private String principal ;
93+
8594
@OptionRequired(description = "applicationId on yarn cluster")
8695
private String appId;
8796

@@ -248,6 +257,30 @@ public void setP(String p) {
248257
this.p = p;
249258
}
250259

260+
public String getKrb5conf() {
261+
return krb5conf;
262+
}
263+
264+
public void setKrb5conf(String krb5conf) {
265+
this.krb5conf = krb5conf;
266+
}
267+
268+
public String getKeytab() {
269+
return keytab;
270+
}
271+
272+
public void setKeytab(String keytab) {
273+
this.keytab = keytab;
274+
}
275+
276+
public String getPrincipal() {
277+
return principal;
278+
}
279+
280+
public void setPrincipal(String principal) {
281+
this.principal = principal;
282+
}
283+
251284
@Override
252285
public String toString() {
253286
return "Options{" +
@@ -268,7 +301,9 @@ public String toString() {
268301
", pluginLoadMode='" + pluginLoadMode + '\'' +
269302
", appId='" + appId + '\'' +
270303
", remotePluginPath='" + remotePluginPath + '\'' +
271-
", flinkConfiguration=" + flinkConfiguration +
304+
", krb5conf='" + krb5conf + '\'' +
305+
", keytab='" + keytab + '\'' +
306+
", principal='" + principal + '\'' +
272307
'}';
273308
}
274309
}

flinkx-core/src/main/java/com/dtstack/flinkx/outputformat/BaseFileOutputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ protected void openInternal(int taskNumber, int numTasks) throws IOException {
106106
nextBlock();
107107
}
108108

109-
private void initPath(){
109+
protected void initPath(){
110110
if(StringUtils.isNotBlank(fileName)) {
111111
outputFilePath = path + SP + fileName;
112112
} else {
@@ -122,7 +122,7 @@ private void initPath(){
122122
taskNumber, currentBlockFileNamePrefix, tmpPath, finishedPath);
123123
}
124124

125-
private void initFileIndex() {
125+
protected void initFileIndex() {
126126
if (null != formatState && formatState.getFileIndex() > -1) {
127127
blockIndex = formatState.getFileIndex() + 1;
128128
}

flinkx-core/src/main/java/com/dtstack/flinkx/util/FileSystemUtil.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ public static void setHadoopUserName(Configuration conf){
6969
}
7070

7171
try {
72-
String ticketCachePath = conf.get("hadoop.security.kerberos.ticket.cache.path");
73-
UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, hadoopUserName);
72+
String previousUserName = UserGroupInformation.getLoginUser().getUserName();
73+
LOG.info("Hadoop user from '{}' switch to '{}' with SIMPLE auth", previousUserName, hadoopUserName);
74+
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hadoopUserName);
7475
UserGroupInformation.setLoginUser(ugi);
7576
} catch (Exception e) {
7677
LOG.warn("Set hadoop user name error:", e);
@@ -87,7 +88,6 @@ public static boolean isOpenKerberos(Map<String, Object> hadoopConfig){
8788

8889
private static FileSystem getFsWithKerberos(Map<String, Object> hadoopConfig, String defaultFs) throws Exception{
8990
UserGroupInformation ugi = getUGI(hadoopConfig, defaultFs);
90-
UserGroupInformation.setLoginUser(ugi);
9191

9292
return ugi.doAs(new PrivilegedAction<FileSystem>() {
9393
@Override
@@ -110,7 +110,6 @@ public static UserGroupInformation getUGI(Map<String, Object> hadoopConfig, Stri
110110
KerberosUtil.refreshConfig();
111111

112112
UserGroupInformation ugi = KerberosUtil.loginAndReturnUgi(getConfiguration(hadoopConfig, defaultFs), principal, keytabFileName);
113-
UserGroupInformation.setLoginUser(ugi);
114113

115114
return ugi;
116115
}

0 commit comments

Comments
 (0)