Skip to content

Commit 389d7fc

Browse files
committed
Merge & fix compile problem
1 parent 405eb9a commit 389d7fc

File tree

4 files changed

+4
-136
lines changed

4 files changed

+4
-136
lines changed

flinkx-gbase/flinkx-gbase-reader/src/main/java/com/dtstack/flinkx/gbase/format/GbaseInputFormat.java

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,9 @@
1818
package com.dtstack.flinkx.gbase.format;
1919

2020
import com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat;
21-
import com.dtstack.flinkx.rdb.inputformat.JdbcInputSplit;
22-
import com.dtstack.flinkx.rdb.util.DbUtil;
23-
import com.dtstack.flinkx.reader.MetaColumn;
24-
import com.dtstack.flinkx.util.ClassUtil;
25-
import org.apache.commons.lang3.StringUtils;
26-
import org.apache.flink.core.io.InputSplit;
2721
import org.apache.flink.types.Row;
2822

2923
import java.io.IOException;
30-
import java.math.BigInteger;
31-
import java.sql.SQLException;
32-
import java.util.ArrayList;
3324

3425
import static com.dtstack.flinkx.rdb.util.DbUtil.clobToString;
3526

@@ -41,60 +32,6 @@
4132
*/
4233
public class GbaseInputFormat extends JdbcInputFormat {
4334

44-
@Override
45-
public void openInternal(InputSplit inputSplit) throws IOException {
46-
try {
47-
LOG.info(inputSplit.toString());
48-
49-
ClassUtil.forName(driverName, getClass().getClassLoader());
50-
initMetric(inputSplit);
51-
52-
String startLocation = incrementConfig.getStartLocation();
53-
if (incrementConfig.isPolling()) {
54-
if (StringUtils.isNotEmpty(startLocation)) {
55-
endLocationAccumulator.add(new BigInteger(startLocation));
56-
}
57-
isTimestamp = "timestamp".equalsIgnoreCase(incrementConfig.getColumnType());
58-
} else if ((incrementConfig.isIncrement() && incrementConfig.isUseMaxFunc())) {
59-
getMaxValue(inputSplit);
60-
}
61-
62-
if(!canReadData(inputSplit)){
63-
LOG.warn("Not read data when the start location are equal to end location");
64-
hasNext = false;
65-
return;
66-
}
67-
68-
fetchSize = Integer.MIN_VALUE;
69-
querySql = buildQuerySql(inputSplit);
70-
JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit;
71-
if (null != jdbcInputSplit.getStartLocation()) {
72-
startLocation = jdbcInputSplit.getStartLocation();
73-
}
74-
executeQuery(startLocation);
75-
columnCount = resultSet.getMetaData().getColumnCount();
76-
77-
boolean splitWithRowCol = numPartitions > 1 && StringUtils.isNotEmpty(splitKey) && splitKey.contains("(");
78-
if(splitWithRowCol){
79-
columnCount = columnCount-1;
80-
}
81-
82-
if (StringUtils.isEmpty(customSql)){
83-
descColumnTypeList = DbUtil.analyzeTable(dbUrl, username, password,databaseInterface,table,metaColumns);
84-
} else {
85-
descColumnTypeList = new ArrayList<>();
86-
for (MetaColumn metaColumn : metaColumns) {
87-
descColumnTypeList.add(metaColumn.getName());
88-
}
89-
}
90-
91-
} catch (SQLException se) {
92-
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
93-
}
94-
95-
LOG.info("JdbcInputFormat[{}]open: end", jobName);
96-
}
97-
9835
@Override
9936
public Row nextRecordInternal(Row row) throws IOException {
10037
if (!hasNext) {

flinkx-mysql/flinkx-mysql-reader/src/main/java/com/dtstack/flinkx/mysql/format/MysqlInputFormat.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@
2525
import org.apache.flink.types.Row;
2626

2727
import java.io.IOException;
28-
import java.math.BigInteger;
29-
import java.sql.SQLException;
30-
import java.util.ArrayList;
3128

3229
import static com.dtstack.flinkx.rdb.util.DbUtil.clobToString;
3330

flinkx-postgresql/flinkx-postgresql-reader/src/main/java/com/dtstack/flinkx/postgresql/format/PostgresqlInputFormat.java

Lines changed: 0 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,10 @@
1919
package com.dtstack.flinkx.postgresql.format;
2020

2121
import com.dtstack.flinkx.rdb.inputformat.JdbcInputFormat;
22-
import com.dtstack.flinkx.rdb.inputformat.JdbcInputSplit;
23-
import com.dtstack.flinkx.rdb.util.DbUtil;
24-
import com.dtstack.flinkx.reader.MetaColumn;
25-
import com.dtstack.flinkx.util.ClassUtil;
2622
import org.apache.commons.collections.CollectionUtils;
27-
import org.apache.commons.lang3.StringUtils;
28-
import org.apache.flink.core.io.InputSplit;
2923
import org.apache.flink.types.Row;
3024

3125
import java.io.IOException;
32-
import java.math.BigInteger;
33-
import java.sql.SQLException;
34-
import java.util.ArrayList;
3526

3627
import static com.dtstack.flinkx.rdb.util.DbUtil.clobToString;
3728

@@ -43,59 +34,6 @@
4334
*/
4435
public class PostgresqlInputFormat extends JdbcInputFormat {
4536

46-
@Override
47-
public void openInternal(InputSplit inputSplit) throws IOException {
48-
try {
49-
LOG.info(inputSplit.toString());
50-
51-
ClassUtil.forName(driverName, getClass().getClassLoader());
52-
initMetric(inputSplit);
53-
54-
String startLocation = incrementConfig.getStartLocation();
55-
if (incrementConfig.isPolling()) {
56-
if (StringUtils.isNotEmpty(startLocation)) {
57-
endLocationAccumulator.add(new BigInteger(startLocation));
58-
}
59-
isTimestamp = "timestamp".equalsIgnoreCase(incrementConfig.getColumnType());
60-
} else if ((incrementConfig.isIncrement() && incrementConfig.isUseMaxFunc())) {
61-
getMaxValue(inputSplit);
62-
}
63-
64-
if(!canReadData(inputSplit)){
65-
LOG.warn("Not read data when the start location are equal to end location");
66-
hasNext = false;
67-
return;
68-
}
69-
70-
querySql = buildQuerySql(inputSplit);
71-
JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit;
72-
if (null != jdbcInputSplit.getStartLocation()) {
73-
startLocation = jdbcInputSplit.getStartLocation();
74-
}
75-
executeQuery(startLocation);
76-
columnCount = resultSet.getMetaData().getColumnCount();
77-
78-
boolean splitWithRowCol = numPartitions > 1 && StringUtils.isNotEmpty(splitKey) && splitKey.contains("(");
79-
if(splitWithRowCol){
80-
columnCount = columnCount-1;
81-
}
82-
83-
if (StringUtils.isEmpty(customSql)){
84-
descColumnTypeList = DbUtil.analyzeTable(dbUrl, username, password,databaseInterface,table,metaColumns);
85-
} else {
86-
descColumnTypeList = new ArrayList<>();
87-
for (MetaColumn metaColumn : metaColumns) {
88-
descColumnTypeList.add(metaColumn.getName());
89-
}
90-
}
91-
92-
} catch (SQLException se) {
93-
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
94-
}
95-
96-
LOG.info("JdbcInputFormat[{}]open: end", jobName);
97-
}
98-
9937
@Override
10038
public Row nextRecordInternal(Row row) throws IOException {
10139
if (!hasNext) {

flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -865,20 +865,16 @@ private void queryStartLocation() throws SQLException{
865865
}
866866
}
867867

868-
protected Long getLocation() {
868+
protected String getLocation() {
869869
switch (type) {
870870
case TIMESTAMP: {
871-
return Timestamp.valueOf(generalStartLocation).getTime();
871+
return String.valueOf(Timestamp.valueOf(generalStartLocation).getTime());
872872
}
873873
case DATE: {
874-
return Date.valueOf(generalStartLocation).getTime();
874+
return String.valueOf(Date.valueOf(generalStartLocation).getTime());
875875
}
876876
default: {
877-
// 超长字符截取前十八位传值
878-
if(generalStartLocation.length() > 18){
879-
return Long.parseLong(StringUtils.substring(generalStartLocation, 0, 18));
880-
}
881-
return Long.parseLong(generalStartLocation);
877+
return generalStartLocation;
882878
}
883879
}
884880
}

0 commit comments

Comments
 (0)