Skip to content

Commit 8a85162

Browse files
committed
Merge remote-tracking branch 'origin/1.8_release_3.10.x' into tmp_1.8_4.0_x_merge
# Conflicts: # flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java
2 parents 9d41bef + af32a3b commit 8a85162

File tree

1 file changed

+23
-4
lines changed

1 file changed

+23
-4
lines changed

flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.HashMap;
5858
import java.util.List;
5959
import java.util.Map;
60+
import java.util.Objects;
6061
import java.util.concurrent.TimeUnit;
6162

6263
/**
@@ -173,7 +174,9 @@ public void openInternal(InputSplit inputSplit) throws IOException {
173174
querySql = buildQuerySql(inputSplit);
174175
try {
175176
executeQuery(((JdbcInputSplit) inputSplit).getStartLocation());
176-
columnCount = resultSet.getMetaData().getColumnCount();
177+
if(!resultSet.isClosed()){
178+
columnCount = resultSet.getMetaData().getColumnCount();
179+
}
177180
} catch (SQLException se) {
178181
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
179182
}
@@ -213,11 +216,11 @@ public boolean reachedEnd() {
213216
if (incrementConfig.isPolling()) {
214217
try {
215218
TimeUnit.MILLISECONDS.sleep(incrementConfig.getPollingInterval());
216-
//间隔轮询检测数据库连接是否断开,超时时间三秒,断开后自动重连
217-
if(!dbConn.isValid(3)){
219+
//间隔轮询检测数据库连接是否断开,断开重新连接
220+
if(Objects.isNull(dbConn) || dbConn.isClosed()){
218221
dbConn = DbUtil.getConnection(dbUrl, username, password);
219222
//重新连接后还是不可用则认为数据库异常,任务失败
220-
if(!dbConn.isValid(3)){
223+
if(Objects.isNull(dbConn) || dbConn.isClosed()){
221224
String message = String.format("cannot connect to %s, username = %s, please check %s is available.", dbUrl, username, databaseInterface.getDatabaseType());
222225
LOG.error(message);
223226
throw new RuntimeException(message);
@@ -246,6 +249,7 @@ public boolean reachedEnd() {
246249
@Override
247250
public Row nextRecordInternal(Row row) throws IOException {
248251
try {
252+
updateColumnCount();
249253
if (!ConstantValue.STAR_SYMBOL.equals(metaColumns.get(0).getName())) {
250254
for (int i = 0; i < columnCount; i++) {
251255
Object val = row.getField(i);
@@ -888,4 +892,19 @@ protected List<String> analyzeTable(String dbUrl, String username, String passwo
888892
String table, List<MetaColumn> metaColumns) {
889893
return DbUtil.analyzeTable(dbUrl, username, password, databaseInterface, table, metaColumns);
890894
}
895+
896+
/**
897+
* 兼容db2 在间隔轮训场景 且第一次读取时没有任何数据
898+
* 在openInternal方法调用时 由于数据库没有数据,db2会自动关闭resultSet,因此只有在间隔轮训中某次读取到数据之后,进行更新columnCount
899+
* @throws SQLException
900+
*/
901+
private void updateColumnCount() throws SQLException {
902+
if(columnCount == 0){
903+
columnCount =resultSet.getMetaData().getColumnCount();
904+
boolean splitWithRowCol = numPartitions > 1 && StringUtils.isNotEmpty(splitKey) && splitKey.contains("(");
905+
if (splitWithRowCol) {
906+
columnCount = columnCount - 1;
907+
}
908+
}
909+
}
891910
}

0 commit comments

Comments
 (0)