Skip to content

Commit 3f2ea78

Browse files
authored
[Feature][connectors-v2/connector-file]Support logical Parquet splits (#10239)
1 parent 84b039d commit 3f2ea78

File tree

15 files changed

+540
-32
lines changed

15 files changed

+540
-32
lines changed

docs/en/connector-v2/source/LocalFile.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ File modification time filter. The connector will filter some files base on the
420420

421421
### enable_file_split [string]
422422

423-
Turn on the file splitting function, the default is falseIt can be selected when the file type is csv, text, json and non-compressed format.
423+
Turn on the file splitting function, the default is false.It can be selected when the file type is csv, text, json, parquet and non-compressed format.
424424

425425
### file_split_size [long]
426426

docs/zh/connector-v2/source/LocalFile.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ null_format 定义哪些字符串可以表示为 null。
421421

422422
### enable_file_split [boolean]
423423

424-
开启文件分割功能,默认为false。文件类型为csv、text、json、非压缩格式时可选择
424+
开启文件分割功能,默认为false。文件类型为csv、text、json、parquet非压缩格式时可选择
425425

426426
### file_split_size [long]
427427

seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@
185185
<artifactId>flexmark-all</artifactId>
186186
<version>${flexmark-all.version}</version>
187187
</dependency>
188+
189+
<dependency>
190+
<groupId>org.mockito</groupId>
191+
<artifactId>mockito-junit-jupiter</artifactId>
192+
<version>${mockito.version}</version>
193+
<scope>test</scope>
194+
</dependency>
188195
</dependencies>
189196

190197
<build>

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@
4141
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
4242
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy;
4343

44+
import lombok.extern.slf4j.Slf4j;
45+
4446
import java.io.Serializable;
4547
import java.util.Arrays;
4648

49+
@Slf4j
4750
public enum FileFormat implements Serializable {
4851
CSV("csv") {
4952
@Override
@@ -207,4 +210,17 @@ public ReadStrategy getReadStrategy() {
207210
public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
208211
return null;
209212
}
213+
214+
public boolean supportFileSplit() {
215+
switch (this) {
216+
case CSV:
217+
case TEXT:
218+
case JSON:
219+
case PARQUET:
220+
return true;
221+
default:
222+
log.info("The {} file type does not support file split", this);
223+
return false;
224+
}
225+
}
210226
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
2828
FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
2929
FORMAT_NOT_SUPPORT("FILE-07", "Format not support"),
3030
FILE_READ_FAILED("FILE-08", "File read failed"),
31-
BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order abnormality");
31+
BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order abnormality"),
32+
FILE_SPLIT_SIZE_ILLEGAL("FILE-10", "SplitSizeBytes must be greater than 0"),
33+
FILE_SPLIT_FAIL("FILE-11", "File split fail");
3234

3335
private final String code;
3436
private final String description;

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
3434
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
3535
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
36+
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
3637

3738
import org.apache.avro.Conversions;
3839
import org.apache.avro.data.TimeConversions;
@@ -89,6 +90,14 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
8990
@Override
9091
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
9192
throws FileConnectorException, IOException {
93+
this.read(new FileSourceSplit(path), output);
94+
}
95+
96+
@Override
97+
public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
98+
throws IOException, FileConnectorException {
99+
String tableId = split.getTableId();
100+
String path = split.getFilePath();
92101
if (Boolean.FALSE.equals(checkFileType(path))) {
93102
String errorMsg =
94103
String.format(
@@ -107,11 +116,18 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
107116
dataModel.addLogicalTypeConversion(new Conversions.DecimalConversion());
108117
dataModel.addLogicalTypeConversion(new TimeConversions.DateConversion());
109118
dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
119+
final boolean useSplitRange =
120+
enableSplitFile && split.getStart() >= 0 && split.getLength() > 0;
110121
GenericRecord record;
111-
try (ParquetReader<GenericData.Record> reader =
122+
AvroParquetReader.Builder<GenericData.Record> builder =
112123
AvroParquetReader.<GenericData.Record>builder(hadoopInputFile)
113-
.withDataModel(dataModel)
114-
.build()) {
124+
.withDataModel(dataModel);
125+
if (useSplitRange) {
126+
long start = split.getStart();
127+
long end = start + split.getLength();
128+
builder.withFileRange(start, end);
129+
}
130+
try (ParquetReader<GenericData.Record> reader = builder.build()) {
115131
while ((record = reader.read()) != null) {
116132
Object[] fields;
117133
if (isMergePartition) {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,18 @@
1919
import java.io.Serializable;
2020
import java.util.List;
2121

22+
/**
23+
* {@link FileSplitStrategy} defines the contract for splitting a file into one or more {@link
24+
* FileSourceSplit}s that can be processed in parallel by file-based sources.
25+
*
26+
* <p>The split strategy determines how a file is logically divided, such as by byte ranges, record
27+
* boundaries, or format-specific physical units. Implementations are responsible for ensuring that
28+
* each generated split is readable and does not violate the semantics of the underlying file
29+
* format.
30+
*
31+
* <p>The resulting {@link FileSourceSplit}s describe the portion of the file to be read, while the
32+
* actual data parsing and decoding are handled by the corresponding reader implementation.
33+
*/
2234
public interface FileSplitStrategy extends Serializable {
2335

2436
List<FileSourceSplit> split(String tableId, String filePath);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.source.split;
19+
20+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
21+
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
22+
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.parquet.hadoop.ParquetFileReader;
26+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
27+
import org.apache.parquet.hadoop.util.HadoopInputFile;
28+
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
/**
34+
* {@link ParquetFileSplitStrategy} defines a split strategy for Parquet files based on Parquet
35+
* physical storage units (RowGroups).
36+
*
37+
* <p>This strategy uses {@code RowGroup} as the minimum indivisible split unit and generates {@link
38+
* FileSourceSplit}s by merging one or more contiguous RowGroups according to the configured split
39+
* size. A split will never break a RowGroup, ensuring correctness and compatibility with Parquet
40+
* readers.
41+
*
42+
* <p>The generated split range ({@code start}, {@code length}) represents a byte range covering
43+
* complete RowGroups. The actual row-level reading and decoding are delegated to the Parquet reader
44+
* implementation.
45+
*
46+
* <p>This design enables efficient parallel reading of Parquet files while preserving Parquet
47+
* format semantics and avoiding invalid byte-level splits.
48+
*/
49+
public class ParquetFileSplitStrategy implements FileSplitStrategy {
50+
51+
private final long splitSizeBytes;
52+
53+
public ParquetFileSplitStrategy(long splitSizeBytes) {
54+
if (splitSizeBytes <= 0) {
55+
throw new SeaTunnelRuntimeException(
56+
FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
57+
"SplitSizeBytes must be greater than 0");
58+
}
59+
this.splitSizeBytes = splitSizeBytes;
60+
}
61+
62+
@Override
63+
public List<FileSourceSplit> split(String tableId, String filePath) {
64+
try {
65+
return splitByRowGroups(tableId, filePath, readRowGroups(filePath));
66+
} catch (IOException e) {
67+
throw new SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_SPLIT_FAIL, e);
68+
}
69+
}
70+
71+
/**
72+
* Core split logic based on row group metadata. This method is IO-free and unit-test friendly.
73+
*/
74+
List<FileSourceSplit> splitByRowGroups(
75+
String tableId, String filePath, List<BlockMetaData> rowGroups) {
76+
List<FileSourceSplit> splits = new ArrayList<>();
77+
if (rowGroups == null || rowGroups.isEmpty()) {
78+
return splits;
79+
}
80+
long currentStart = 0;
81+
long currentLength = 0;
82+
boolean hasOpenSplit = false;
83+
for (BlockMetaData block : rowGroups) {
84+
long rgStart = block.getStartingPos();
85+
long rgSize = block.getCompressedSize();
86+
// start a new split
87+
if (!hasOpenSplit) {
88+
currentStart = rgStart;
89+
currentLength = rgSize;
90+
hasOpenSplit = true;
91+
continue;
92+
}
93+
// exceeds threshold, close current split
94+
if (currentLength + rgSize > splitSizeBytes) {
95+
splits.add(new FileSourceSplit(tableId, filePath, currentStart, currentLength));
96+
// start next split
97+
currentStart = rgStart;
98+
currentLength = rgSize;
99+
} else {
100+
currentLength += rgSize;
101+
}
102+
}
103+
// last split
104+
if (hasOpenSplit && currentLength > 0) {
105+
splits.add(new FileSourceSplit(tableId, filePath, currentStart, currentLength));
106+
}
107+
return splits;
108+
}
109+
110+
private List<BlockMetaData> readRowGroups(String filePath) throws IOException {
111+
Path path = new Path(filePath);
112+
Configuration conf = new Configuration();
113+
try (ParquetFileReader reader =
114+
ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
115+
return reader.getFooter().getBlocks();
116+
}
117+
}
118+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seatunnel.connectors.seatunnel.file.source.split;
18+
19+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
20+
21+
import org.junit.jupiter.api.Assertions;
22+
import org.junit.jupiter.api.Test;
23+
import org.mockito.Mockito;
24+
25+
import java.util.ArrayList;
26+
import java.util.Collections;
27+
import java.util.List;
28+
29+
import static org.mockito.Mockito.when;
30+
31+
public class ParquetFileSplitStrategyTest {
32+
33+
private static final String TABLE_ID = "test.test_table";
34+
private static final String FILE_PATH = "/tmp/test.parquet";
35+
36+
@Test
37+
void testSplitByRowGroupsEmpty() {
38+
ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(100);
39+
List<FileSourceSplit> splits =
40+
strategy.splitByRowGroups(TABLE_ID, FILE_PATH, Collections.emptyList());
41+
Assertions.assertTrue(splits.isEmpty());
42+
}
43+
44+
@Test
45+
void testSplitByRowGroupsSingleRowGroup() {
46+
ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(1000);
47+
List<BlockMetaData> blocks = new ArrayList<>();
48+
blocks.add(mockBlock(0, 200));
49+
List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID, FILE_PATH, blocks);
50+
Assertions.assertEquals(1, splits.size());
51+
FileSourceSplit split = splits.get(0);
52+
Assertions.assertEquals(0, split.getStart());
53+
Assertions.assertEquals(200, split.getLength());
54+
}
55+
56+
@Test
57+
void testSplitByRowGroupsMergeRowGroups() {
58+
ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(500);
59+
List<BlockMetaData> blocks = new ArrayList<>();
60+
blocks.add(mockBlock(0, 100));
61+
blocks.add(mockBlock(100, 150));
62+
blocks.add(mockBlock(250, 200));
63+
List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID, FILE_PATH, blocks);
64+
// 100 + 150 + 200 = 450 < 500
65+
Assertions.assertEquals(1, splits.size());
66+
FileSourceSplit split = splits.get(0);
67+
Assertions.assertEquals(0, split.getStart());
68+
Assertions.assertEquals(450, split.getLength());
69+
}
70+
71+
@Test
72+
void testSplitByRowGroupsSplitWhenExceedsThreshold() {
73+
ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(300);
74+
List<BlockMetaData> blocks = new ArrayList<>();
75+
blocks.add(mockBlock(0, 100));
76+
blocks.add(mockBlock(100, 150));
77+
blocks.add(mockBlock(250, 200));
78+
List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID, FILE_PATH, blocks);
79+
Assertions.assertEquals(2, splits.size());
80+
FileSourceSplit first = splits.get(0);
81+
Assertions.assertEquals(0, first.getStart());
82+
Assertions.assertEquals(250, first.getLength());
83+
FileSourceSplit second = splits.get(1);
84+
Assertions.assertEquals(250, second.getStart());
85+
Assertions.assertEquals(200, second.getLength());
86+
}
87+
88+
private BlockMetaData mockBlock(long start, long compressedSize) {
89+
BlockMetaData block = Mockito.mock(BlockMetaData.class);
90+
when(block.getStartingPos()).thenReturn(start);
91+
when(block.getCompressedSize()).thenReturn(compressedSize);
92+
return block;
93+
}
94+
}

seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,44 +18,21 @@
1818
package org.apache.seatunnel.connectors.seatunnel.file.local.source;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21-
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
2221
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
2322
import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
24-
import org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
23+
import org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
2524
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
26-
import org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
27-
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
28-
29-
import static org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
3025

3126
public class LocalFileSource extends BaseMultipleTableFileSource {
3227

3328
public LocalFileSource(ReadonlyConfig readonlyConfig) {
3429
super(
3530
new MultipleTableLocalFileSourceConfig(readonlyConfig),
36-
initFileSplitStrategy(readonlyConfig));
31+
LocalFileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig));
3732
}
3833

3934
@Override
4035
public String getPluginName() {
4136
return FileSystemType.LOCAL.getFileSystemPluginName();
4237
}
43-
44-
private static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig readonlyConfig) {
45-
if (readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
46-
return new DefaultFileSplitStrategy();
47-
}
48-
String rowDelimiter =
49-
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
50-
? DEFAULT_ROW_DELIMITER
51-
: readonlyConfig.get(FileBaseSourceOptions.ROW_DELIMITER);
52-
long skipHeaderRowNumber =
53-
readonlyConfig.get(FileBaseSourceOptions.CSV_USE_HEADER_LINE)
54-
? 1L
55-
: readonlyConfig.get(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER);
56-
String encodingName = readonlyConfig.get(FileBaseSourceOptions.ENCODING);
57-
long splitSize = readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
58-
return new LocalFileAccordingToSplitSizeSplitStrategy(
59-
rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
60-
}
6138
}

0 commit comments

Comments
 (0)