Skip to content

Commit a9c28cd

Browse files
committed
字符串增量字段
1 parent 78878c3 commit a9c28cd

File tree

1 file changed

+21
-3
lines changed

1 file changed

+21
-3
lines changed

flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
import java.io.IOException;
5252
import java.sql.*;
53+
import java.text.SimpleDateFormat;
5354
import java.util.Date;
5455
import java.util.*;
5556

@@ -119,6 +120,8 @@ public class JdbcInputFormat extends RichInputFormat {
119120

120121
protected Row lastRow = null;
121122

123+
protected transient final static ThreadLocal<SimpleDateFormat> sdf = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMddHHmmss"));
124+
122125
/**
123126
* The hadoop config for metric
124127
*/
@@ -576,7 +579,12 @@ protected String getLocationSql(String incrementColType, String incrementCol, St
576579
} else if(ColumnType.isNumberType(incrementColType)){
577580
endLocationSql = incrementCol + operator + location;
578581
} else {
579-
endTimeStr = String.format("'%s'",location);
582+
// FIXME 京东方特殊逻辑
583+
String part1 = location.substring(0, 13);
584+
String part2 = location.substring(13);
585+
String timeStr = sdf.get().format(new Date(Long.parseLong(part1))) + part2;
586+
587+
endTimeStr = String.format("'%s'",timeStr);
580588
endLocationSql = incrementCol + operator + endTimeStr;
581589
}
582590

@@ -655,7 +663,7 @@ private String getMaxValueFromDb() {
655663
* @return
656664
*/
657665
private String getLocation(String columnType, Object columnVal){
658-
String location;
666+
String location = null;
659667
if (columnVal == null){
660668
return null;
661669
}
@@ -682,7 +690,17 @@ private String getLocation(String columnType, Object columnVal){
682690
} else if(ColumnType.isNumberType(columnType)){
683691
location = String.valueOf(columnVal);
684692
} else {
685-
location = String.valueOf(columnVal);
693+
// FIXME 京东方逻辑
694+
String timeStr = columnVal.toString();
695+
String part1 = timeStr.substring(0, 14);
696+
String part2 = timeStr.substring(14);
697+
698+
try {
699+
Date date = sdf.get().parse(part1);
700+
location = date.getTime() + part2;
701+
} catch (Exception e) {
702+
LOG.warn("Parse increment column error:", e);
703+
}
686704
}
687705

688706
return location;

0 commit comments

Comments
 (0)