Skip to content

Commit 41b2b8a

Browse files
committed
Merge branch '1.8_release_4.0.x' into temp_1.10_4.0.x_merge
# Conflicts: # flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java
2 parents 03d5cea + 015df15 commit 41b2b8a

File tree

8 files changed

+320
-226
lines changed

8 files changed

+320
-226
lines changed

flinkx-core/src/main/java/com/dtstack/flinkx/config/DataTransferConfig.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package com.dtstack.flinkx.config;
2020

21+
import com.dtstack.flinkx.util.GsonUtil;
2122
import com.dtstack.flinkx.util.MapUtil;
22-
import com.google.gson.Gson;
2323
import org.apache.flink.util.Preconditions;
2424

2525
import java.io.Reader;
@@ -110,17 +110,15 @@ private static void checkConfig(DataTransferConfig config) {
110110
}
111111

112112
public static DataTransferConfig parse(String json) {
113-
Gson gson = new Gson();
114-
Map<String,Object> map = gson.fromJson(json, Map.class);
113+
Map<String,Object> map = GsonUtil.GSON.fromJson(json, GsonUtil.gsonMapTypeToken);
115114
map = MapUtil.convertToHashMap(map);
116115
DataTransferConfig config = new DataTransferConfig(map);
117116
checkConfig(config);
118117
return config;
119118
}
120119

121120
public static DataTransferConfig parse(Reader reader) {
122-
Gson gson = new Gson();
123-
DataTransferConfig config = gson.fromJson(reader, DataTransferConfig.class);
121+
DataTransferConfig config = GsonUtil.GSON.fromJson(reader, DataTransferConfig.class);
124122
checkConfig(config);
125123
return config;
126124
}

flinkx-core/src/main/java/com/dtstack/flinkx/enums/ColumnType.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ public enum ColumnType {
3535
/**
3636
* string type
3737
*/
38-
STRING, VARCHAR,VARCHAR2, CHAR,NVARCHAR,TEXT,KEYWORD,BINARY,
38+
STRING, VARCHAR, VARCHAR2, CHAR, NVARCHAR, TEXT, KEYWORD, BINARY,
3939

4040
/**
4141
* number type
4242
*/
43-
INT, INT32, MEDIUMINT, TINYINT, DATETIME, SMALLINT, BIGINT,LONG, INT64 ,SHORT,INTEGER,
43+
INT, INT32, MEDIUMINT, TINYINT, DATETIME, SMALLINT, BIGINT, LONG, INT64 , SHORT, INTEGER, NUMBER,
4444

4545
/**
4646
* double type
@@ -51,19 +51,19 @@ public enum ColumnType {
5151
/**
5252
* date type
5353
*/
54-
DATE, TIMESTAMP,TIME,
55-
DECIMAL,YEAR,BIT;
54+
DATE, TIMESTAMP, TIME,
55+
DECIMAL, YEAR, BIT;
5656

5757
public static List<ColumnType> TIME_TYPE = Arrays.asList(
58-
DATE,DATETIME,TIME,TIMESTAMP
58+
DATE, DATETIME, TIME, TIMESTAMP
5959
);
6060

6161
public static List<ColumnType> NUMBER_TYPE = Arrays.asList(
62-
INT,INTEGER,MEDIUMINT,TINYINT,SMALLINT, BIGINT,LONG,SHORT,DOUBLE, FLOAT,DECIMAL
62+
INT, INTEGER, MEDIUMINT, TINYINT, SMALLINT, BIGINT, LONG, SHORT, DOUBLE, FLOAT, DECIMAL, NUMBER
6363
);
6464

6565
public static List<ColumnType> STRING_TYPE = Arrays.asList(
66-
STRING, VARCHAR,VARCHAR2, CHAR,NVARCHAR,TEXT,KEYWORD,BINARY
66+
STRING, VARCHAR, VARCHAR2, CHAR, NVARCHAR, TEXT, KEYWORD, BINARY
6767
);
6868

6969
public static ColumnType fromString(String type) {

flinkx-core/src/main/java/com/dtstack/flinkx/inputformat/BaseRichInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ private void showConfig(){
188188
writerParameter.put(KEY_PASSWORD, KEY_CONFUSED_PASSWORD);
189189
}
190190
}
191-
LOG.info("configInfo : {}", GsonUtil.GSON.toJson(map));
191+
LOG.info("configInfo : \n{}", GsonUtil.GSON.toJson(map));
192192
}
193193

194194
private void checkIfCreateSplitFailed(InputSplit inputSplit){

flinkx-core/src/main/java/com/dtstack/flinkx/util/DateUtil.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package com.dtstack.flinkx.util;
2020

21+
import org.apache.commons.lang3.StringUtils;
22+
2123
import java.sql.Timestamp;
2224
import java.text.ParseException;
2325
import java.text.SimpleDateFormat;
@@ -26,6 +28,7 @@
2628
import java.util.HashMap;
2729
import java.util.Map;
2830
import java.util.TimeZone;
31+
import java.util.regex.Pattern;
2932

3033
/**
3134
* Date Utilities
@@ -250,4 +253,67 @@ public static SimpleDateFormat buildDateFormatter(String timeFormat){
250253
sdf.setTimeZone(TimeZone.getTimeZone(TIME_ZONE));
251254
return sdf;
252255
}
256+
257+
/**
258+
* 常规自动日期格式识别
259+
* @param str 时间字符串
260+
* @return String DateFormat字符串如:yyyy-MM-dd HH:mm:ss
261+
*/
262+
public static String getDateFormat(String str) {
263+
if(StringUtils.isBlank(str)){
264+
return null;
265+
}
266+
boolean year = false;
267+
Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
268+
if(pattern.matcher(str.substring(0, 4)).matches()) {
269+
year = true;
270+
}
271+
StringBuilder sb = new StringBuilder();
272+
int index = 0;
273+
if(!year) {
274+
if(str.contains("月") || str.contains("-") || str.contains("/")) {
275+
if(Character.isDigit(str.charAt(0))) {
276+
index = 1;
277+
}
278+
}else {
279+
index = 3;
280+
}
281+
}
282+
for (int i = 0; i < str.length(); i++) {
283+
char chr = str.charAt(i);
284+
if(Character.isDigit(chr)) {
285+
if(index==0) {
286+
sb.append("y");
287+
}
288+
if(index==1) {
289+
sb.append("M");
290+
}
291+
if(index==2) {
292+
sb.append("d");
293+
}
294+
if(index==3) {
295+
sb.append("H");
296+
}
297+
if(index==4) {
298+
sb.append("m");
299+
}
300+
if(index==5) {
301+
sb.append("s");
302+
}
303+
if(index==6) {
304+
sb.append("S");
305+
}
306+
}else {
307+
if(i>0) {
308+
char lastChar = str.charAt(i-1);
309+
if(Character.isDigit(lastChar)) {
310+
index++;
311+
}
312+
}
313+
sb.append(chr);
314+
}
315+
}
316+
return sb.toString();
317+
}
318+
253319
}

flinkx-core/src/main/java/com/dtstack/flinkx/util/GsonUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ public class GsonUtil {
5050

5151
@SuppressWarnings("unchecked")
5252
private static Gson getGson() {
53-
GSON = new GsonBuilder().create();
53+
GSON = new GsonBuilder()
54+
.setPrettyPrinting()
55+
.create();
5456
try {
5557
Field factories = Gson.class.getDeclaredField("factories");
5658
factories.setAccessible(true);

flinkx-core/src/main/java/com/dtstack/flinkx/util/ResultPrintUtil.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ public static void printResult(JobExecutionResult result){
4949
}
5050
maxLength += 5;
5151

52-
StringBuilder builder = new StringBuilder();
52+
StringBuilder builder = new StringBuilder(128);
53+
builder.append("\n*********************************************\n");
5354
for (int i = 0; i < names.size(); i++) {
5455
String name = names.get(i);
5556
builder.append(name + StringUtils.repeat(" ", maxLength - name.length()));
@@ -59,9 +60,7 @@ public static void printResult(JobExecutionResult result){
5960
builder.append("\n");
6061
}
6162
}
62-
63-
LOG.info("---------------------------------");
63+
builder.append("\n*********************************************\n");
6464
LOG.info(builder.toString());
65-
LOG.info("---------------------------------");
6665
}
6766
}

flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020

2121
import com.dtstack.flinkx.enums.ColumnType;
2222
import com.dtstack.flinkx.exception.WriteRecordException;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.commons.lang3.time.DateUtils;
2325
import org.apache.flink.types.Row;
2426

2527
import java.math.BigDecimal;
2628
import java.sql.Date;
2729
import java.sql.Timestamp;
30+
import java.text.ParseException;
2831
import java.text.SimpleDateFormat;
2932
import java.util.ArrayList;
3033
import java.util.List;
@@ -296,4 +299,26 @@ public static List<String> splitIgnoreQuota(String str, char delimiter){
296299

297300
return tokensList;
298301
}
302+
303+
/**
304+
* 字符串转换成对应时间戳字符串
305+
* @param location
306+
* @return
307+
*/
308+
public static String stringToTimestampStr(String location, ColumnType type){
309+
//若为空字符串或本身就是时间戳则不需要转换
310+
if(StringUtils.isBlank(location) || StringUtils.isNumeric(location)){
311+
return location;
312+
}
313+
try {
314+
switch (type) {
315+
case TIMESTAMP: return String.valueOf(Timestamp.valueOf(location).getTime());
316+
case DATE: return String.valueOf(DateUtils.parseDate(location, DateUtil.getDateFormat(location)).getTime());
317+
default: return location;
318+
}
319+
}catch (ParseException e){
320+
String message = String.format("cannot transform 【%s】to 【%s】, e = %s", location, type, ExceptionUtil.getErrorMessage(e));
321+
throw new RuntimeException(message);
322+
}
323+
}
299324
}

0 commit comments

Comments
 (0)