Skip to content

Commit 2b01a06

Browse files
committed
Merge branch '1.8_release_3.9.x' into hotfix_3.9.x_jingdongfang
2 parents 59f4a32 + 92bd85c commit 2b01a06

File tree

7 files changed

+20
-12
lines changed

7 files changed

+20
-12
lines changed

flinkx-hdfs/flinkx-hdfs-core/src/main/java/com/dtstack/flinkx/hdfs/HdfsUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class HdfsUtil {
4040
public static final String NULL_VALUE = "\\N";
4141

4242
public static Object string2col(String str, String type, SimpleDateFormat customDateFormat) {
43-
if (str == null || str.length() == 0){
43+
if (str == null){
4444
return null;
4545
}
4646

flinkx-hdfs/flinkx-hdfs-reader/src/main/java/com/dtstack/flinkx/hdfs/reader/HdfsTextInputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flinkx.reader.MetaColumn;
2323
import jodd.util.StringUtil;
2424
import org.apache.commons.io.output.ByteArrayOutputStream;
25+
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.flink.core.io.InputSplit;
2627
import org.apache.flink.types.Row;
2728
import org.apache.hadoop.fs.Path;
@@ -85,7 +86,7 @@ public void openInternal(InputSplit inputSplit) throws IOException {
8586
@Override
8687
public Row nextRecordInternal(Row row) throws IOException {
8788
String line = new String(((Text)value).getBytes(), 0, ((Text)value).getLength(), charsetName);
88-
String[] fields = line.split(delimiter);
89+
String[] fields = StringUtils.splitPreserveAllTokens(line, delimiter);
8990

9091
if (metaColumns.size() == 1 && "*".equals(metaColumns.get(0).getName())){
9192
row = new Row(fields.length);

flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,6 @@ public class MongodbConfigKeys {
5757
public final static String KEY_MAX_WAIT_TIME = "maxWaitTime";
5858

5959
public final static String KEY_SOCKET_TIMEOUT = "socketTimeout";
60+
61+
public final static String KEY_SERVER_SELECTION_TIMEOUT = "serverSelectionTimeout";
6062
}

flinkx-mongodb/flinkx-mongodb-core/src/main/java/com/dtstack/flinkx/mongodb/MongodbUtil.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public class MongodbUtil {
7070

7171
private static final Integer DEFAULT_SOCKET_TIMEOUT = 0;
7272

73+
private static final Integer DEFAULT_SERVER_SELECTION_TIMEOUT = 5 * 1000;
74+
7375
/**
7476
* Get mongo client
7577
* @param mongodbConfig
@@ -96,7 +98,8 @@ public static MongoClient getMongoClient(Map<String,Object> mongodbConfig){
9698

9799
mongoClient = new MongoClient(serverAddress,credentials,options);
98100
}
99-
101+
//验证client是否连接成功
102+
mongoClient.listDatabases();
100103
LOG.info("Get mongodb client successful");
101104
return mongoClient;
102105
}catch (Exception e){
@@ -196,6 +199,10 @@ private static MongoClientOptions getOption(Map<String,Object> mongodbConfig){
196199
LOG.info("Mongodb config -- socketTimeout:" + socketTimeout);
197200
build.maxWaitTime(socketTimeout);
198201

202+
int serverSelectionTimeout = MapUtils.getIntValue(mongodbConfig, KEY_SERVER_SELECTION_TIMEOUT, DEFAULT_SERVER_SELECTION_TIMEOUT);
203+
LOG.info("Mongodb config -- serverSelectionTimeout:" + serverSelectionTimeout);
204+
build.serverSelectionTimeout(serverSelectionTimeout);
205+
199206
build.writeConcern(WriteConcern.UNACKNOWLEDGED);
200207
return build.build();
201208
}

flinkx-mongodb/flinkx-mongodb-reader/src/main/java/com/dtstack/flinkx/mongodb/reader/MongodbInputFormat.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.dtstack.flinkx.mongodb.MongodbConfigKeys;
2323
import com.dtstack.flinkx.mongodb.MongodbUtil;
2424
import com.dtstack.flinkx.reader.MetaColumn;
25+
import com.dtstack.flinkx.util.ExceptionUtil;
2526
import com.dtstack.flinkx.util.StringUtil;
2627
import com.mongodb.BasicDBObject;
2728
import com.mongodb.MongoClient;
@@ -38,7 +39,9 @@
3839
import org.bson.conversions.Bson;
3940

4041
import java.io.IOException;
41-
import java.util.*;
42+
import java.util.ArrayList;
43+
import java.util.List;
44+
import java.util.Map;
4245

4346
/**
4447
* Read plugin for reading static data
@@ -171,7 +174,8 @@ public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
171174
splits.add(new MongodbInputSplit((int)(size * minNumSplits), (int)(docNum - size * minNumSplits)));
172175
}
173176
} catch (Exception e){
174-
LOG.error("{}", e);
177+
LOG.error("error to create inputSplits, e = {}", ExceptionUtil.getErrorMessage(e));
178+
throw e;
175179
} finally {
176180
MongodbUtil.close(client, null);
177181
}

flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/DistributedJdbcInputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ protected void closeCurrentSource(){
206206

207207
@Override
208208
protected void closeInternal() throws IOException {
209-
209+
closeCurrentSource();
210210
}
211211

212212
@Override

flinkx-test/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,6 @@
3030
<version>19.0</version>
3131
</dependency>
3232

33-
<dependency>
34-
<groupId>ch.qos.logback</groupId>
35-
<artifactId>logback-classic</artifactId>
36-
<version>1.1.7</version>
37-
</dependency>
38-
3933
<dependency>
4034
<groupId>com.google.code.gson</groupId>
4135
<artifactId>gson</artifactId>

0 commit comments

Comments
 (0)