Skip to content

Commit 9d06b7d

Browse files
authored
[feat-#1470][ftp] improvement ftp read/write performance (#1483)
1 parent df98c32 commit 9d06b7d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1162
-573
lines changed

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/Data.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,33 @@
1818

1919
package com.dtstack.chunjun.connector.ftp.client;
2020

21+
import com.dtstack.chunjun.connector.ftp.extend.ftp.FtpParseException;
2122
import com.dtstack.chunjun.connector.ftp.handler.Position;
2223

23-
/** return from ftpSeqBufferedReader contains line and position */
2424
public class Data {
2525
private String[] data;
2626
private Position position;
27+
private FtpParseException exception;
2728

2829
public Data(String[] data, Position position) {
2930
this.data = data;
3031
this.position = position;
3132
}
3233

34+
public Data(String[] data, Position position, FtpParseException exception) {
35+
this.data = data;
36+
this.position = position;
37+
this.exception = exception;
38+
}
39+
3340
public String[] getData() {
3441
return data;
3542
}
3643

44+
public FtpParseException getException() {
45+
return exception;
46+
}
47+
3748
public Position getPosition() {
3849
return position;
3950
}

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/FileUtil.java

Lines changed: 0 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -18,146 +18,14 @@
1818

1919
package com.dtstack.chunjun.connector.ftp.client;
2020

21-
import com.dtstack.chunjun.connector.ftp.conf.FtpConfig;
22-
import com.dtstack.chunjun.connector.ftp.handler.FtpHandler;
23-
import com.dtstack.chunjun.connector.ftp.handler.IFtpHandler;
24-
import com.dtstack.chunjun.connector.ftp.source.FtpFileSplit;
25-
2621
import org.slf4j.Logger;
2722
import org.slf4j.LoggerFactory;
2823

29-
import java.io.IOException;
30-
import java.io.InputStream;
31-
import java.nio.charset.Charset;
32-
import java.util.List;
33-
import java.util.Locale;
34-
import java.util.zip.ZipEntry;
35-
import java.util.zip.ZipInputStream;
36-
3724
public class FileUtil {
3825
private static final Logger LOG = LoggerFactory.getLogger(FileUtil.class);
3926

40-
public static void addCompressFile(
41-
IFtpHandler ftpHandler,
42-
String filePath,
43-
FtpConfig ftpConfig,
44-
List<FtpFileSplit> fileList)
45-
throws IOException {
46-
if ("ZIP".equals(ftpConfig.getCompressType().toUpperCase(Locale.ENGLISH))) {
47-
try (ZipInputStream zipInputStream =
48-
new ZipInputStream(
49-
ftpHandler.getInputStream(filePath),
50-
Charset.forName(ftpConfig.encoding))) {
51-
ZipEntry zipEntry;
52-
while ((zipEntry = zipInputStream.getNextEntry()) != null) {
53-
fileList.add(
54-
new FtpFileSplit(
55-
0,
56-
zipEntry.getSize(),
57-
filePath,
58-
zipEntry.getName(),
59-
ftpConfig.getCompressType()));
60-
}
61-
closeWithFtpHandler(ftpHandler, LOG);
62-
}
63-
} else {
64-
throw new RuntimeException("not support compressType " + ftpConfig.getCompressType());
65-
}
66-
}
67-
68-
public static void closeWithFtpHandler(IFtpHandler ftpHandler, Logger log) {
69-
if (ftpHandler instanceof FtpHandler) {
70-
try {
71-
((FtpHandler) ftpHandler).getFtpClient().completePendingCommand();
72-
} catch (Exception e) {
73-
log.warn("FTPClient completePendingCommand has error ->", e);
74-
try {
75-
ftpHandler.logoutFtpServer();
76-
} catch (Exception exception) {
77-
log.warn("FTPClient logout has error ->", exception);
78-
}
79-
}
80-
}
81-
}
82-
8327
public static String getFilename(String filepath) {
8428
String[] paths = filepath.split("/");
8529
return paths[paths.length - 1];
8630
}
87-
88-
/** analyse file */
89-
public static void addFile(
90-
IFtpHandler ftpHandler,
91-
String filePath,
92-
FtpConfig ftpConfig,
93-
List<FtpFileSplit> fileList)
94-
throws Exception {
95-
long maxFetchSize = ftpConfig.getMaxFetchSize();
96-
97-
// fetchSize should bigger than 1M
98-
maxFetchSize = Math.max(maxFetchSize, 1024 * 1024);
99-
100-
long currentFileSize = ftpHandler.getFileSize(filePath);
101-
int parallelism = ftpConfig.getParallelism();
102-
103-
String filename = getFilename(filePath);
104-
105-
// do not split excel
106-
if (ftpConfig.getFileType() == null
107-
|| ftpConfig.getFileType().equals("excel")
108-
|| ftpConfig.getFileType().equals("custom")) {
109-
FtpFileSplit ftpFileSplit = new FtpFileSplit(0, currentFileSize, filePath, filename);
110-
fileList.add(ftpFileSplit);
111-
return;
112-
}
113-
114-
// split file
115-
if (maxFetchSize < currentFileSize) {
116-
int perSplit = Math.min((int) currentFileSize / parallelism, Integer.MAX_VALUE);
117-
long startPosition = 0;
118-
long endPosition = startPosition + perSplit;
119-
120-
while (endPosition <= currentFileSize) {
121-
if (endPosition == currentFileSize) {
122-
FtpFileSplit ftpFileSplit =
123-
new FtpFileSplit(startPosition, endPosition, filePath, filename);
124-
fileList.add(ftpFileSplit);
125-
break;
126-
}
127-
128-
InputStream input = ftpHandler.getInputStreamByPosition(filePath, endPosition);
129-
char c = ' ';
130-
131-
while (c != '\n') {
132-
c = (char) input.read();
133-
endPosition += 1;
134-
}
135-
FtpFileSplit ftpFileSplit =
136-
new FtpFileSplit(startPosition, endPosition, filePath, filename);
137-
fileList.add(ftpFileSplit);
138-
139-
LOG.info(
140-
String.format(
141-
"build file split, filename: %s, startPosition: %d, endPosition: %d",
142-
filePath, startPosition, endPosition));
143-
144-
startPosition = endPosition;
145-
endPosition = startPosition + perSplit;
146-
}
147-
148-
if (startPosition != currentFileSize) {
149-
FtpFileSplit ftpFileSplit =
150-
new FtpFileSplit(startPosition, currentFileSize, filePath, filename);
151-
fileList.add(ftpFileSplit);
152-
153-
LOG.info(
154-
String.format(
155-
"build file split, filename: %s, startPosition: %d, endPosition: %d",
156-
filePath, startPosition, currentFileSize));
157-
}
158-
} else {
159-
FtpFileSplit ftpFileSplit = new FtpFileSplit(0, currentFileSize, filePath, filename);
160-
fileList.add(ftpFileSplit);
161-
}
162-
}
16331
}

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/ZipInputStream.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,12 @@
2929
import java.util.List;
3030
import java.util.zip.ZipEntry;
3131

32-
/** zip文件流 如果fileNameList不为空 只会读取fileNameList里的文件* */
3332
public class ZipInputStream extends InputStream {
3433
private static final Logger LOG = LoggerFactory.getLogger(ZipInputStream.class);
3534

36-
private java.util.zip.ZipInputStream zipInputStream;
35+
private final java.util.zip.ZipInputStream zipInputStream;
36+
private final List<String> fileNameList;
3737
private ZipEntry currentZipEntry;
38-
private List<String> fileNameList;
3938

4039
public ZipInputStream(InputStream in) {
4140
this.zipInputStream = new java.util.zip.ZipInputStream(in);

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReaderExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
public class ExcelReaderExecutor implements Runnable {
2424

2525
private final ExcelReader reader;
26-
private final ExcelSubExceptionCarrier ec;
26+
private ExcelSubExceptionCarrier ec;
2727

2828
public ExcelReaderExecutor(ExcelReader reader, ExcelSubExceptionCarrier ec) {
2929
this.reader = reader;

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelSubExceptionCarrier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
99
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
1111
*
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/conf/FtpConfig.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.chunjun.connector.ftp.conf;
2020

2121
import com.dtstack.chunjun.conf.BaseFileConf;
22+
import com.dtstack.chunjun.connector.ftp.enums.FileType;
2223
import com.dtstack.chunjun.constants.ConstantValue;
2324

2425
import java.util.Map;
@@ -47,6 +48,9 @@ public class FtpConfig extends BaseFileConf {
4748

4849
private String ftpFileName;
4950

51+
/** 批量写入数据太大,会导致ftp协议缓冲区报错, 批量写入默认值设置小点 */
52+
private long nextCheckRows = 100;
53+
5054
public String encoding = "UTF-8";
5155

5256
/** 空值替换 */
@@ -58,12 +62,34 @@ public class FtpConfig extends BaseFileConf {
5862
/** User defined format class name */
5963
private String customFormatClassName;
6064

65+
/** User defined split class name */
66+
private String customConcurrentFileSplitClassName;
67+
68+
/* 行分隔符 */
69+
private String columnDelimiter = "\n";
70+
6171
/** Get the specified fileReadClient according to the filetype * */
62-
public String fileType;
72+
public String fileType = FileType.TXT.name();
6373

6474
/** 压缩格式 * */
6575
public String compressType;
6676

77+
public String getColumnDelimiter() {
78+
return columnDelimiter;
79+
}
80+
81+
public void setColumnDelimiter(String columnDelimiter) {
82+
this.columnDelimiter = columnDelimiter;
83+
}
84+
85+
public String getCustomConcurrentFileSplitClassName() {
86+
return customConcurrentFileSplitClassName;
87+
}
88+
89+
public void setCustomConcurrentFileSplitClassName(String customConcurrentFileSplitClassName) {
90+
this.customConcurrentFileSplitClassName = customConcurrentFileSplitClassName;
91+
}
92+
6793
public String getCustomFormatClassName() {
6894
return customFormatClassName;
6995
}
@@ -238,6 +264,16 @@ public void setMaxFetchSize(long fetchSize) {
238264
this.maxFetchSize = fetchSize;
239265
}
240266

267+
@Override
268+
public long getNextCheckRows() {
269+
return nextCheckRows;
270+
}
271+
272+
@Override
273+
public void setNextCheckRows(long nextCheckRows) {
274+
this.nextCheckRows = nextCheckRows;
275+
}
276+
241277
public long getMaxFetchSize() {
242278
return this.maxFetchSize;
243279
}
@@ -259,10 +295,16 @@ public String toString() {
259295
.add("listHiddenFiles=" + listHiddenFiles)
260296
.add("maxFetchSize=" + maxFetchSize)
261297
.add("ftpFileName='" + ftpFileName + "'")
298+
.add("nextCheckRows=" + nextCheckRows)
262299
.add("encoding='" + encoding + "'")
263300
.add("nullIsReplacedWithValue=" + nullIsReplacedWithValue)
264301
.add("fileConfig=" + fileConfig)
265302
.add("customFormatClassName='" + customFormatClassName + "'")
303+
.add(
304+
"customConcurrentFileSplitClassName='"
305+
+ customConcurrentFileSplitClassName
306+
+ "'")
307+
.add("columnDelimiter='" + columnDelimiter + "'")
266308
.add("fileType='" + fileType + "'")
267309
.add("compressType='" + compressType + "'")
268310
.toString();

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpColumnConverter.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package com.dtstack.chunjun.connector.ftp.converter;
2020

21-
import com.dtstack.chunjun.conf.FieldConf;
2221
import com.dtstack.chunjun.connector.ftp.conf.FtpConfig;
2322
import com.dtstack.chunjun.converter.AbstractRowConverter;
2423
import com.dtstack.chunjun.converter.IDeserializationConverter;
@@ -47,26 +46,21 @@
4746
import java.util.ArrayList;
4847
import java.util.List;
4948

50-
/**
51-
* @program chunjun
52-
* @author: xiuzhu
53-
* @create: 2021/06/19
54-
*/
55-
public class FtpColumnConverter extends AbstractRowConverter<RowData, RowData, String, FieldConf> {
49+
public class FtpColumnConverter
50+
extends AbstractRowConverter<RowData, RowData, String, LogicalType> {
5651

5752
private final FtpConfig ftpConfig;
5853

5954
public FtpColumnConverter(RowType rowType, FtpConfig ftpConfig) {
60-
super(rowType);
55+
super(rowType, ftpConfig);
6156
this.ftpConfig = ftpConfig;
6257
for (int i = 0; i < rowType.getFieldCount(); i++) {
63-
FieldConf fieldConf = ftpConfig.getColumn().get(i);
6458
toInternalConverters.add(
6559
wrapIntoNullableInternalConverter(
6660
createInternalConverter(rowType.getTypeAt(i))));
6761
toExternalConverters.add(
6862
wrapIntoNullableExternalConverter(
69-
createExternalConverter(fieldConf), fieldConf));
63+
createExternalConverter(rowType.getTypeAt(i)), rowType.getTypeAt(i)));
7064
}
7165
}
7266

@@ -96,7 +90,7 @@ public String toExternal(RowData rowData, String output) throws Exception {
9690
StringBuilder sb = new StringBuilder(128);
9791

9892
List<String> columnData = new ArrayList<>(ftpConfig.getColumn().size());
99-
for (int index = 0; index < rowData.getArity(); index++) {
93+
for (int index = 0; index < toExternalConverters.size(); index++) {
10094
toExternalConverters.get(index).serialize(rowData, index, columnData);
10195
if (index != 0) {
10296
sb.append(ftpConfig.getFieldDelimiter());
@@ -109,7 +103,7 @@ public String toExternal(RowData rowData, String output) throws Exception {
109103
@Override
110104
@SuppressWarnings("unchecked")
111105
protected ISerializationConverter<List<String>> wrapIntoNullableExternalConverter(
112-
ISerializationConverter serializationConverter, FieldConf fieldConf) {
106+
ISerializationConverter serializationConverter, LogicalType logicalType) {
113107
return (rowData, index, list) -> {
114108
if (rowData == null || rowData.isNullAt(index)) {
115109
list.add(index, null);
@@ -119,6 +113,7 @@ protected ISerializationConverter<List<String>> wrapIntoNullableExternalConverte
119113
};
120114
}
121115

116+
@Override
122117
protected IDeserializationConverter createInternalConverter(LogicalType type) {
123118
switch (type.getTypeRoot()) {
124119
case BOOLEAN:
@@ -159,8 +154,27 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) {
159154
}
160155

161156
@Override
162-
protected ISerializationConverter<List<String>> createExternalConverter(FieldConf fieldConf) {
163-
return (rowData, index, list) ->
164-
list.add(index, ((ColumnRowData) rowData).getField(index).asString());
157+
protected ISerializationConverter<List<String>> createExternalConverter(
158+
LogicalType logicalType) {
159+
switch (logicalType.getTypeRoot()) {
160+
case DATE:
161+
return (rowData, index, list) -> {
162+
if (rowData instanceof ColumnRowData) {
163+
list.add(
164+
index,
165+
((ColumnRowData) rowData).getField(index).asSqlDate().toString());
166+
} else {
167+
list.add(index, ((GenericRowData) rowData).getField(index).toString());
168+
}
169+
};
170+
default:
171+
return (rowData, index, list) -> {
172+
if (rowData instanceof ColumnRowData) {
173+
list.add(index, ((ColumnRowData) rowData).getField(index).asString());
174+
} else {
175+
list.add(index, ((GenericRowData) rowData).getField(index).toString());
176+
}
177+
};
178+
}
165179
}
166180
}

0 commit comments

Comments
 (0)