Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit 0b9c4ac

Browse files
authored
[Feature][connector-file-local]localfile supports splitting and parallel reading of large files of csv, text, and json (apache#10142)
1 parent 0b14223 commit 0b9c4ac

File tree

30 files changed

+1014
-61
lines changed

30 files changed

+1014
-61
lines changed

docs/en/connector-v2/source/LocalFile.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
6464
| skip_header_row_number | long | no | 0 |
6565
| schema | config | no | - |
6666
| sheet_name | string | no | - |
67-
| excel_engine | string | no | POI |
67+
| excel_engine | string | no | POI |
6868
| xml_row_tag | string | no | - |
6969
| xml_use_attr_format | boolean | no | - |
7070
| csv_use_header_line | boolean | no | false |
@@ -80,6 +80,8 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
8080
| tables_configs | list | no | used to define a multiple table task |
8181
| file_filter_modified_start | string | no | - |
8282
| file_filter_modified_end | string | no | - |
83+
| enable_file_split | boolean | no | false |
84+
| file_split_size | long | no | 134217728 |
8385

8486
### path [string]
8587

@@ -415,6 +417,14 @@ File modification time filter. The connector will filter some files base on the
415417

416418
File modification time filter. The connector will filter some files base on the last modification end time (not include end time). The default data format is `yyyy-MM-dd HH:mm:ss`.
417419

420+
### enable_file_split [string]
421+
422+
Turn on the file splitting function, the default is false。It can be selected when the file type is csv, text, json and non-compressed format.
423+
424+
### file_split_size [long]
425+
426+
File split size, which can be filled in when the enable_file_split parameter is true. The unit is the number of bytes. The default value is the number of bytes of 128MB, which is 134217728.
427+
418428
### common options
419429

420430
Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details

docs/zh/connector-v2/source/LocalFile.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ import ChangeLog from '../changelog/connector-file-local.md';
8080
| tables_configs | list || 用于定义多表任务 |
8181
| file_filter_modified_start | string || - |
8282
| file_filter_modified_end | string || - |
83+
| enable_file_split | boolean || false |
84+
| file_split_size | long || 134217728 |
8385

8486
### path [string]
8587

@@ -415,6 +417,14 @@ null_format 定义哪些字符串可以表示为 null。
415417

416418
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`
417419

420+
### enable_file_split [boolean]
421+
422+
开启文件分割功能,默认为false。文件类型为csv、text、json、非压缩格式时可选择。
423+
424+
### file_split_size [long]
425+
426+
文件分割大小,enable_file_split参数为true时可以填写。单位是字节数。默认值为128MB的字节数,即134217728。
427+
418428
### 通用选项
419429

420430
数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,17 @@ public class FileBaseOptions extends ConnectorCommonOptions {
150150
.enumType(ArchiveCompressFormat.class)
151151
.defaultValue(ArchiveCompressFormat.NONE)
152152
.withDescription("Archive compression codec");
153+
154+
public static final Option<Boolean> ENABLE_FILE_SPLIT =
155+
Options.key("enable_file_split")
156+
.booleanType()
157+
.defaultValue(false)
158+
.withDescription("Turn on the file splitting function, the default is false");
159+
160+
public static final Option<Long> FILE_SPLIT_SIZE =
161+
Options.key("file_split_size")
162+
.longType()
163+
.defaultValue(128 * 1024 * 1024L)
164+
.withDescription(
165+
"File split size, which can be filled in when the enable_file_split parameter is true. The unit is the number of bytes. The default value is the number of bytes of 128MB, which is 128*1024*1024.");
153166
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
2929
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
3030
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader;
31+
import org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
3132
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
33+
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
3234
import org.apache.seatunnel.connectors.seatunnel.file.source.split.MultipleTableFileSourceSplitEnumerator;
3335
import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
3436

@@ -41,10 +43,19 @@ public abstract class BaseMultipleTableFileSource
4143
SupportColumnProjection {
4244

4345
private final BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig;
46+
private final FileSplitStrategy fileSplitStrategy;
4447

4548
public BaseMultipleTableFileSource(
4649
BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig) {
4750
this.baseMultipleTableFileSourceConfig = baseMultipleTableFileSourceConfig;
51+
this.fileSplitStrategy = new DefaultFileSplitStrategy();
52+
}
53+
54+
public BaseMultipleTableFileSource(
55+
BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig,
56+
FileSplitStrategy fileSplitStrategy) {
57+
this.baseMultipleTableFileSourceConfig = baseMultipleTableFileSourceConfig;
58+
this.fileSplitStrategy = fileSplitStrategy;
4859
}
4960

5061
@Override
@@ -72,7 +83,7 @@ public SourceReader<SeaTunnelRow, FileSourceSplit> createReader(
7283
public SourceSplitEnumerator<FileSourceSplit, FileSourceState> createEnumerator(
7384
SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) {
7485
return new MultipleTableFileSourceSplitEnumerator(
75-
enumeratorContext, baseMultipleTableFileSourceConfig);
86+
enumeratorContext, baseMultipleTableFileSourceConfig, fileSplitStrategy);
7687
}
7788

7889
@Override

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,19 @@
2626
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2727
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2828
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
29+
import org.apache.seatunnel.common.utils.SeaTunnelException;
2930
import org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
3031
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
3132
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
3233
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
3334
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
35+
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
3436

3537
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
3638
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
3739
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
3840
import org.apache.commons.compress.compressors.gzip.GzipParameters;
41+
import org.apache.commons.io.input.BoundedInputStream;
3942
import org.apache.hadoop.fs.FileStatus;
4043

4144
import lombok.extern.slf4j.Slf4j;
@@ -94,6 +97,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
9497
protected Date fileModifiedEndDate;
9598
protected String fileBasePath;
9699

100+
protected boolean enableSplitFile;
101+
97102
@Override
98103
public void init(HadoopConf conf) {
99104
this.hadoopConf = conf;
@@ -242,6 +247,10 @@ public void setPluginConfig(Config pluginConfig) {
242247
pluginConfig.getString(
243248
FileBaseSourceOptions.FILE_FILTER_MODIFIED_END.key()));
244249
}
250+
if (pluginConfig.hasPath(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key())) {
251+
enableSplitFile =
252+
pluginConfig.getBoolean(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key());
253+
}
245254
}
246255

247256
@Override
@@ -250,12 +259,13 @@ public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {
250259
}
251260

252261
protected void resolveArchiveCompressedInputStream(
253-
String path,
254-
String tableId,
262+
FileSourceSplit split,
255263
Collector<SeaTunnelRow> output,
256264
Map<String, String> partitionsMap,
257265
FileFormat fileFormat)
258266
throws IOException {
267+
String path = split.getFilePath();
268+
String tableId = split.getTableId();
259269
switch (archiveCompressFormat) {
260270
case ZIP:
261271
try (ZipInputStream zis =
@@ -264,8 +274,7 @@ protected void resolveArchiveCompressedInputStream(
264274
while ((entry = zis.getNextEntry()) != null) {
265275
if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
266276
readProcess(
267-
path,
268-
tableId,
277+
split,
269278
output,
270279
copyInputStream(zis),
271280
partitionsMap,
@@ -282,8 +291,7 @@ protected void resolveArchiveCompressedInputStream(
282291
while ((entry = tarInput.getNextTarEntry()) != null) {
283292
if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
284293
readProcess(
285-
path,
286-
tableId,
294+
split,
287295
output,
288296
copyInputStream(tarInput),
289297
partitionsMap,
@@ -302,8 +310,7 @@ protected void resolveArchiveCompressedInputStream(
302310
while ((entry = tarIn.getNextTarEntry()) != null) {
303311
if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
304312
readProcess(
305-
path,
306-
tableId,
313+
split,
307314
output,
308315
copyInputStream(tarIn),
309316
partitionsMap,
@@ -331,13 +338,11 @@ protected void resolveArchiveCompressedInputStream(
331338
fileName = path;
332339
}
333340
}
334-
readProcess(
335-
path, tableId, output, copyInputStream(gzipIn), partitionsMap, fileName);
341+
readProcess(split, output, copyInputStream(gzipIn), partitionsMap, fileName);
336342
break;
337343
case NONE:
338344
readProcess(
339-
path,
340-
tableId,
345+
split,
341346
output,
342347
hadoopFileSystemProxy.getInputStream(path),
343348
partitionsMap,
@@ -348,8 +353,7 @@ protected void resolveArchiveCompressedInputStream(
348353
"The file does not support this archive compress type: {}",
349354
archiveCompressFormat);
350355
readProcess(
351-
path,
352-
tableId,
356+
split,
353357
output,
354358
hadoopFileSystemProxy.getInputStream(path),
355359
partitionsMap,
@@ -358,8 +362,7 @@ protected void resolveArchiveCompressedInputStream(
358362
}
359363

360364
protected void readProcess(
361-
String path,
362-
String tableId,
365+
FileSourceSplit split,
363366
Collector<SeaTunnelRow> output,
364367
InputStream inputStream,
365368
Map<String, String> partitionsMap,
@@ -451,6 +454,19 @@ protected boolean checkFileType(String fileName, FileFormat fileFormat) {
451454
return false;
452455
}
453456

457+
protected static InputStream safeSlice(InputStream in, long start, long length)
458+
throws IOException {
459+
long toSkip = start;
460+
while (toSkip > 0) {
461+
long skipped = in.skip(toSkip);
462+
if (skipped <= 0) {
463+
throw new SeaTunnelException("skipped error");
464+
}
465+
toSkip -= skipped;
466+
}
467+
return new BoundedInputStream(in, length);
468+
}
469+
454470
@Override
455471
public void close() throws IOException {
456472
try {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
3434
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
3535
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
36+
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
3637
import org.apache.seatunnel.format.csv.CsvDeserializationSchema;
3738
import org.apache.seatunnel.format.csv.processor.CsvLineProcessor;
3839
import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor;
@@ -75,13 +76,20 @@ public class CsvReadStrategy extends AbstractReadStrategy {
7576
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
7677
throws FileConnectorException, IOException {
7778
Map<String, String> partitionsMap = parsePartitionsByPath(path);
78-
resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.CSV);
79+
resolveArchiveCompressedInputStream(
80+
new FileSourceSplit(tableId, path), output, partitionsMap, FileFormat.CSV);
81+
}
82+
83+
@Override
84+
public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
85+
throws IOException, FileConnectorException {
86+
Map<String, String> partitionsMap = parsePartitionsByPath(split.getFilePath());
87+
resolveArchiveCompressedInputStream(split, output, partitionsMap, FileFormat.CSV);
7988
}
8089

8190
@Override
8291
public void readProcess(
83-
String path,
84-
String tableId,
92+
FileSourceSplit split,
8593
Collector<SeaTunnelRow> output,
8694
InputStream inputStream,
8795
Map<String, String> partitionsMap,
@@ -103,11 +111,18 @@ public void readProcess(
103111
actualInputStream = inputStream;
104112
break;
105113
}
114+
// rebuild inputStream
115+
if (enableSplitFile && split.getLength() > -1) {
116+
actualInputStream = safeSlice(inputStream, split.getStart(), split.getLength());
117+
}
106118
Builder builder =
107119
CSVFormat.EXCEL.builder().setIgnoreEmptyLines(true).setDelimiter(getDelimiter());
108120
CSVFormat csvFormat = builder.build();
109-
if (firstLineAsHeader) {
110-
csvFormat = csvFormat.withFirstRecordAsHeader();
121+
// if enableSplitFile is true,no need to skip
122+
if (!enableSplitFile) {
123+
if (firstLineAsHeader) {
124+
csvFormat = csvFormat.withFirstRecordAsHeader();
125+
}
111126
}
112127
try (BufferedReader reader =
113128
new BufferedReader(new InputStreamReader(actualInputStream, encoding));
@@ -119,12 +134,15 @@ public void readProcess(
119134
reader.reset();
120135
}
121136
// skip lines
122-
for (int i = 0; i < skipHeaderNumber; i++) {
123-
if (reader.readLine() == null) {
124-
throw new IOException(
125-
String.format(
126-
"File [%s] has fewer lines than expected to skip.",
127-
currentFileName));
137+
// if enableSplitFile is true,no need to skip
138+
if (!enableSplitFile) {
139+
for (int i = 0; i < skipHeaderNumber; i++) {
140+
if (reader.readLine() == null) {
141+
throw new IOException(
142+
String.format(
143+
"File [%s] has fewer lines than expected to skip.",
144+
currentFileName));
145+
}
128146
}
129147
}
130148
// read lines
@@ -161,7 +179,7 @@ public void readProcess(
161179
seaTunnelRow.setField(index++, value);
162180
}
163181
}
164-
seaTunnelRow.setTableId(tableId);
182+
seaTunnelRow.setTableId(split.getTableId());
165183
output.collect(seaTunnelRow);
166184
}
167185
} catch (IOException e) {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelCellUtils;
3333
import org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelReaderListener;
3434
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
35+
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
3536

3637
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
3738
import org.apache.poi.ss.usermodel.Cell;
@@ -77,19 +78,19 @@ public class ExcelReadStrategy extends AbstractReadStrategy {
7778
@Override
7879
public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
7980
Map<String, String> partitionsMap = parsePartitionsByPath(path);
80-
resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.EXCEL);
81+
resolveArchiveCompressedInputStream(
82+
new FileSourceSplit(tableId, path), output, partitionsMap, FileFormat.EXCEL);
8183
}
8284

8385
@Override
8486
protected void readProcess(
85-
String path,
86-
String tableId,
87+
FileSourceSplit split,
8788
Collector<SeaTunnelRow> output,
8889
InputStream inputStream,
8990
Map<String, String> partitionsMap,
9091
String currentFileName)
9192
throws IOException {
92-
93+
String tableId = split.getTableId();
9394
if (skipHeaderNumber > Integer.MAX_VALUE || skipHeaderNumber < Integer.MIN_VALUE) {
9495
throw new FileConnectorException(
9596
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,

0 commit comments

Comments
 (0)