Skip to content

Commit 9daa91f

Browse files
author
gituser
committed
Merge branch '1.8_release_3.10.x' into 1.8_release_4.0.x
2 parents 7661114 + 0dd1203 commit 9daa91f

File tree

11 files changed

+77
-11
lines changed

11 files changed

+77
-11
lines changed

flinkx-core/src/main/java/com/dtstack/flinkx/Main.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ private static void speedTest(DataTransferConfig config) {
189189
} else if (WRITER.equalsIgnoreCase(testConfig.getSpeedTest())){
190190
ContentConfig contentConfig = config.getJob().getContent().get(0);
191191
contentConfig.getReader().setName(STREAM_READER);
192+
}else {
193+
return;
192194
}
193195

194196
config.getJob().getSetting().getSpeed().setBytes(-1);

flinkx-core/src/main/java/com/dtstack/flinkx/reader/ByteRateLimiter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private void updateRate(){
8787
BigDecimal thisWriteRatio = BigDecimal.valueOf(totalRecords == 0 ? 0 : thisRecords / (double) totalRecords);
8888

8989
if (totalRecords > MIN_RECORD_NUMBER_UPDATE_RATE && totalBytes != 0
90-
&& thisWriteRatio.compareTo(new BigDecimal(0)) == 0) {
90+
&& thisWriteRatio.compareTo(BigDecimal.ZERO) != 0) {
9191
double bpr = totalBytes / (double)totalRecords;
9292
double permitsPerSecond = expectedBytePerSecond / bpr * thisWriteRatio.doubleValue();
9393
rateLimiter.setRate(permitsPerSecond);

flinkx-kudu/flinkx-kudu-core/src/main/java/com/dtstack/flinkx/kudu/core/KuduUtil.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
package com.dtstack.flinkx.kudu.core;
2121

22+
import com.dtstack.flinkx.authenticate.KerberosUtil;
2223
import com.dtstack.flinkx.constants.ConstantValue;
2324
import com.dtstack.flinkx.reader.MetaColumn;
25+
import com.dtstack.flinkx.util.FileSystemUtil;
2426
import org.apache.commons.lang.StringUtils;
2527
import org.apache.commons.lang.math.NumberUtils;
2628
import org.apache.hadoop.security.UserGroupInformation;
@@ -57,10 +59,18 @@ public class KuduUtil {
5759

5860
public final static String AUTHENTICATION_TYPE = "Kerberos";
5961

60-
public static KuduClient getKuduClient(KuduConfig config) throws IOException,InterruptedException {
61-
if(AUTHENTICATION_TYPE.equals(config.getAuthentication())){
62-
UserGroupInformation.loginUserFromKeytab(config.getPrincipal(), config.getKeytabFile());
63-
return UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<KuduClient>() {
62+
/**
63+
* 获取kudu的客户端
64+
* @param config kudu的配置信息
65+
* @param hadoopConfig hadoop相关信息 主要需要kerberos相关验证信息
66+
* @return
67+
* @throws IOException
68+
* @throws InterruptedException
69+
*/
70+
public static KuduClient getKuduClient(KuduConfig config, Map<String,Object> hadoopConfig) throws IOException,InterruptedException {
71+
if(AUTHENTICATION_TYPE.equals(config.getAuthentication()) && FileSystemUtil.isOpenKerberos(hadoopConfig)){
72+
UserGroupInformation ugi = FileSystemUtil.getUGI(hadoopConfig,null);
73+
return ugi.doAs(new PrivilegedExceptionAction<KuduClient>() {
6474
@Override
6575
public KuduClient run() throws Exception {
6676
return getKuduClientInternal(config);
@@ -81,9 +91,9 @@ private static KuduClient getKuduClientInternal(KuduConfig config) {
8191
.syncClient();
8292
}
8393

84-
public static List<KuduScanToken> getKuduScanToken(KuduConfig config, List<MetaColumn> columns, String filterString) throws IOException{
94+
public static List<KuduScanToken> getKuduScanToken(KuduConfig config, List<MetaColumn> columns, String filterString, Map<String,Object> hadoopConfig) throws IOException{
8595
try (
86-
KuduClient client = getKuduClient(config)
96+
KuduClient client = getKuduClient(config, hadoopConfig)
8797
) {
8898
KuduTable kuduTable = client.openTable(config.getTable());
8999

@@ -157,6 +167,7 @@ public static Type getType(String columnType){
157167
case "varchar":
158168
case "text":
159169
case "string" : return Type.STRING;
170+
case "unixtime_micros":
160171
case "timestamp" : return Type.UNIXTIME_MICROS;
161172
default:
162173
throw new IllegalArgumentException("Not support column type:" + columnType);

flinkx-kudu/flinkx-kudu-reader/src/main/java/com/dtstack/flinkx/kudu/reader/KuduInputFormat.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import java.io.IOException;
3636
import java.util.List;
37+
import java.util.Map;
3738

3839
/**
3940
* @author jiangbo
@@ -45,6 +46,8 @@ public class KuduInputFormat extends BaseRichInputFormat {
4546

4647
protected KuduConfig kuduConfig;
4748

49+
protected Map<String,Object> hadoopConfig;
50+
4851
private transient KuduClient client;
4952

5053
private transient KuduScanner scanner;
@@ -57,7 +60,7 @@ public void openInputFormat() throws IOException {
5760
super.openInputFormat();
5861

5962
try {
60-
client = KuduUtil.getKuduClient(kuduConfig);
63+
client = KuduUtil.getKuduClient(kuduConfig, hadoopConfig);
6164
} catch (IOException | InterruptedException e) {
6265
throw new RuntimeException("Get KuduClient error", e);
6366
}
@@ -122,7 +125,7 @@ private Object getValue(Type type, RowResult rowResult, String name) {
122125
@Override
123126
public InputSplit[] createInputSplitsInternal(int minNumSplits) throws IOException {
124127
LOG.info("execute createInputSplits,minNumSplits:{}", minNumSplits);
125-
List<KuduScanToken> scanTokens = KuduUtil.getKuduScanToken(kuduConfig, columns, kuduConfig.getFilterString());
128+
List<KuduScanToken> scanTokens = KuduUtil.getKuduScanToken(kuduConfig, columns, kuduConfig.getFilterString(), hadoopConfig);
126129
KuduTableSplit[] inputSplits = new KuduTableSplit[scanTokens.size()];
127130
for (int i = 0; i < scanTokens.size(); i++) {
128131
inputSplits[i] = new KuduTableSplit(scanTokens.get(i).serialize(), i);
@@ -168,4 +171,12 @@ public void closeInputFormat() throws IOException {
168171
client = null;
169172
}
170173
}
174+
175+
public Map<String, Object> getHadoopConfig() {
176+
return hadoopConfig;
177+
}
178+
179+
public void setHadoopConfig(Map<String, Object> hadoopConfig) {
180+
this.hadoopConfig = hadoopConfig;
181+
}
171182
}

flinkx-kudu/flinkx-kudu-reader/src/main/java/com/dtstack/flinkx/kudu/reader/KuduInputFormatBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.flinkx.reader.MetaColumn;
2626

2727
import java.util.List;
28+
import java.util.Map;
2829

2930
/**
3031
* @author jiangbo
@@ -46,6 +47,10 @@ public void setKuduConfig(KuduConfig kuduConfig){
4647
format.kuduConfig = kuduConfig;
4748
}
4849

50+
public void setHadoopConfig(Map<String,Object> hadoopConfig) {
51+
format.setHadoopConfig(hadoopConfig);
52+
}
53+
4954
@Override
5055
protected void checkFormat() {
5156
if (format.columns == null || format.columns.size() == 0){

flinkx-kudu/flinkx-kudu-reader/src/main/java/com/dtstack/flinkx/kudu/reader/KuduReader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.kudu.client.AsyncKuduClient;
3232

3333
import java.util.List;
34+
import java.util.Map;
3435

3536
import static com.dtstack.flinkx.kudu.core.KuduConfigKeys.KEY_ADMIN_OPERATION_TIMEOUT;
3637
import static com.dtstack.flinkx.kudu.core.KuduConfigKeys.KEY_AUTHENTICATION;
@@ -56,6 +57,8 @@ public class KuduReader extends BaseDataReader {
5657

5758
private KuduConfig kuduConfig;
5859

60+
protected Map<String,Object> hadoopConfig;
61+
5962
public KuduReader(DataTransferConfig config, StreamExecutionEnvironment env) {
6063
super(config, env);
6164

@@ -78,6 +81,8 @@ public KuduReader(DataTransferConfig config, StreamExecutionEnvironment env) {
7881
.withBatchSizeBytes(parameterConfig.getIntVal(KEY_BATCH_SIZE_BYTES, 1024*1024))
7982
.withFilter(parameterConfig.getStringVal(KEY_FILTER))
8083
.build();
84+
85+
hadoopConfig = (Map<String, Object>) readerConfig.getParameter().getVal("hadoopConfig");
8186
}
8287

8388
@Override
@@ -90,6 +95,7 @@ public DataStream<Row> readData() {
9095
builder.setKuduConfig(kuduConfig);
9196
builder.setTestConfig(testConfig);
9297
builder.setLogConfig(logConfig);
98+
builder.setHadoopConfig(hadoopConfig);
9399

94100
return createInput(builder.finish());
95101
}

flinkx-kudu/flinkx-kudu-writer/src/main/java/com/dtstack/flinkx/kudu/writer/KuduOutputFormat.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.sql.Date;
4444
import java.sql.Timestamp;
4545
import java.util.List;
46+
import java.util.Map;
4647

4748
/**
4849
* @author jiangbo
@@ -58,14 +59,16 @@ public class KuduOutputFormat extends BaseRichOutputFormat {
5859

5960
private transient KuduClient client;
6061

62+
protected Map<String,Object> hadoopConfig;
63+
6164
private transient KuduSession session;
6265

6366
private transient KuduTable kuduTable;
6467

6568
@Override
6669
protected void openInternal(int taskNumber, int numTasks) throws IOException {
6770
try{
68-
client = KuduUtil.getKuduClient(kuduConfig);
71+
client = KuduUtil.getKuduClient(kuduConfig, hadoopConfig);
6972
} catch (Exception e){
7073
throw new RuntimeException("Get KuduClient error", e);
7174
}
@@ -197,4 +200,12 @@ public void closeInternal() throws IOException {
197200
client.close();
198201
}
199202
}
203+
204+
public Map<String, Object> getHadoopConfig() {
205+
return hadoopConfig;
206+
}
207+
208+
public void setHadoopConfig(Map<String, Object> hadoopConfig) {
209+
this.hadoopConfig = hadoopConfig;
210+
}
200211
}

flinkx-kudu/flinkx-kudu-writer/src/main/java/com/dtstack/flinkx/kudu/writer/KuduOutputFormatBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.dtstack.flinkx.reader.MetaColumn;
2525

2626
import java.util.List;
27+
import java.util.Map;
2728

2829
/**
2930
* @author jiangbo
@@ -49,6 +50,9 @@ public void setWriteMode(String writeMode){
4950
format.writeMode = writeMode;
5051
}
5152

53+
public void setHadoopConfig(Map<String,Object> hadoopConfig) {
54+
format.setHadoopConfig(hadoopConfig);
55+
}
5256
@Override
5357
protected void checkFormat() {
5458
if (format.columns == null || format.columns.size() == 0){

flinkx-kudu/flinkx-kudu-writer/src/main/java/com/dtstack/flinkx/kudu/writer/KuduWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.kudu.client.AsyncKuduClient;
3232

3333
import java.util.List;
34+
import java.util.Map;
3435

3536
import static com.dtstack.flinkx.kudu.core.KuduConfigKeys.KEY_ADMIN_OPERATION_TIMEOUT;
3637
import static com.dtstack.flinkx.kudu.core.KuduConfigKeys.KEY_AUTHENTICATION;
@@ -57,6 +58,8 @@ public class KuduWriter extends BaseDataWriter {
5758

5859
private int batchInterval;
5960

61+
protected Map<String,Object> hadoopConfig;
62+
6063
public KuduWriter(DataTransferConfig config) {
6164
super(config);
6265

@@ -77,6 +80,8 @@ public KuduWriter(DataTransferConfig config) {
7780
.withTable(parameterConfig.getStringVal(KEY_TABLE))
7881
.withFlushMode(parameterConfig.getStringVal(KEY_FLUSH_MODE))
7982
.build();
83+
84+
hadoopConfig = (Map<String, Object>) parameterConfig.getVal("hadoopConfig");
8085
}
8186

8287
@Override
@@ -89,6 +94,7 @@ public DataStreamSink<?> writeData(DataStream<Row> dataSet) {
8994
builder.setBatchInterval(batchInterval);
9095
builder.setErrors(errors);
9196
builder.setErrorRatio(errorRatio);
97+
builder.setHadoopConfig(hadoopConfig);
9298
return createOutput(dataSet,builder.finish());
9399
}
94100
}

flinkx-postgresql/flinkx-postgresql-core/src/main/java/com/dtstack/flinkx/postgresql/PostgresqlTypeConverter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Arrays;
2626
import java.util.Collections;
2727
import java.util.List;
28+
import java.util.Locale;
2829

2930
/**
3031
* The type converter for PostgreSQL database
@@ -44,6 +45,9 @@ public class PostgresqlTypeConverter implements TypeConverterInterface {
4445

4546
private List<String> intTypes = Arrays.asList("int","int2","int4","int8");
4647

48+
protected static List<String> STRING_TYPES = Arrays.asList("CHAR", "VARCHAR","TINYBLOB","TINYTEXT","BLOB","TEXT", "MEDIUMBLOB", "MEDIUMTEXT", "LONGBLOB", "LONGTEXT");
49+
50+
4751
@Override
4852
public Object convert(Object data,String typeName) {
4953
if (data == null){
@@ -54,6 +58,10 @@ public Object convert(Object data,String typeName) {
5458
return dataValue;
5559
}
5660
if(StringUtils.isBlank(dataValue)){
61+
//如果是string类型 还应该返回空字符串而不是null
62+
if(STRING_TYPES.contains(typeName.toUpperCase(Locale.ENGLISH))){
63+
return dataValue;
64+
}
5765
return null;
5866
}
5967
if(doubleTypes.contains(typeName)){

0 commit comments

Comments
 (0)