Skip to content

Commit 952c9a4

Browse files
#30: Added sorting and merging of overlapping chunks to be safe (#31)
Co-authored-by: chπ <christoph.pirkl@gmail.com>
1 parent f781cd4 commit 952c9a4

File tree

9 files changed

+264
-108
lines changed

9 files changed

+264
-108
lines changed

doc/changes/changes_1.1.0.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Parquet for Java 1.1.0, released 2021-09-17
1+
# Parquet for Java 1.1.0, released 2021-09-21
22

33
Code name: New Chunk Based Reader
44

@@ -13,6 +13,7 @@ This release brings new Parquet reader that can read chunks of a file containing
1313
## Refactoring
1414

1515
* #26: Replaced foreach iterations with loops
16+
* #30: Added sorting and merging of overlapping chunks to be safe.
1617

1718
## Dependency Updates
1819

src/main/java/com/exasol/parquetio/data/ChunkIntervalImpl.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.exasol.parquetio.data;
22

3+
import java.util.Objects;
4+
35
/**
46
* An implementation of {@link ChunkInterval} that holds {@code start} and {@code end} position of row group chunks.
57
*/
@@ -12,7 +14,7 @@ public class ChunkIntervalImpl implements ChunkInterval {
1214
* Creates a new instance of {@link ChunkInterval}.
1315
*
1416
* @param start a starting row group position
15-
* @param end an ending row group position
17+
* @param end an ending row group position
1618
*/
1719
public ChunkIntervalImpl(final long start, final long end) {
1820
this.start = start;
@@ -29,4 +31,33 @@ public long getEndPosition() {
2931
return end;
3032
}
3133

34+
@Override
35+
public boolean equals(final Object other) {
36+
if (this == other) {
37+
return true;
38+
}
39+
if (!(other instanceof ChunkIntervalImpl)) {
40+
return false;
41+
}
42+
final ChunkIntervalImpl otherChunk = (ChunkIntervalImpl) other;
43+
return this.start == otherChunk.start && this.end == otherChunk.end;
44+
}
45+
46+
@Override
47+
public int hashCode() {
48+
return Objects.hash(this.start, this.end);
49+
}
50+
51+
@Override
52+
public String toString() {
53+
final var stringBuilder = new StringBuilder();
54+
stringBuilder//
55+
.append("ChunkInterval(start = ")//
56+
.append(this.start)//
57+
.append(", end = ")//
58+
.append(this.end)//
59+
.append(")");
60+
return stringBuilder.toString();
61+
}
62+
3263
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.exasol.parquetio.merger;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collections;
5+
import java.util.List;
6+
7+
import com.exasol.parquetio.data.ChunkInterval;
8+
import com.exasol.parquetio.data.ChunkIntervalImpl;
9+
10+
/**
11+
* A class that sorts and merges list of {@link ChunkInterval}s.
12+
*/
13+
public class ChunkIntervalMerger {
14+
15+
/**
16+
* Sorts and merges overlapping chunks.
17+
*
18+
* @param chunks a list {@link ChunkInterval} chunks
19+
* @return a list of sorted and merged chunks
20+
*/
21+
public List<ChunkInterval> sortAndMerge(final List<ChunkInterval> chunks) {
22+
if (chunks == null || chunks.size() <= 1) {
23+
return chunks;
24+
}
25+
final List<ChunkInterval> modifiableChunks = new ArrayList<>(chunks);
26+
sortByStartPosition(modifiableChunks);
27+
return mergeOverlaps(modifiableChunks);
28+
}
29+
30+
private void sortByStartPosition(final List<ChunkInterval> chunks) {
31+
Collections.sort(chunks, (a, b) -> Long.compare(a.getStartPosition(), b.getStartPosition()));
32+
}
33+
34+
private List<ChunkInterval> mergeOverlaps(final List<ChunkInterval> chunks) {
35+
final List<ChunkInterval> result = new ArrayList<>();
36+
long startPosition = chunks.get(0).getStartPosition();
37+
long endPosition = chunks.get(0).getEndPosition();
38+
for (final var chunk : chunks) {
39+
if (chunk.getStartPosition() <= endPosition) {
40+
endPosition = Math.max(endPosition, chunk.getEndPosition());
41+
} else {
42+
result.add(new ChunkIntervalImpl(startPosition, endPosition));
43+
startPosition = chunk.getStartPosition();
44+
endPosition = chunk.getEndPosition();
45+
}
46+
}
47+
result.add(new ChunkIntervalImpl(startPosition, endPosition));
48+
return result;
49+
}
50+
51+
}

src/main/java/com/exasol/parquetio/reader/RowParquetChunkReader.java

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,29 @@
11
package com.exasol.parquetio.reader;
22

3+
import java.io.IOException;
4+
import java.io.UncheckedIOException;
5+
import java.util.Collections;
6+
import java.util.List;
7+
import java.util.function.Consumer;
8+
39
import com.exasol.errorreporting.ExaError;
410
import com.exasol.parquetio.data.ChunkInterval;
511
import com.exasol.parquetio.data.ChunkIntervalImpl;
612
import com.exasol.parquetio.data.Row;
13+
import com.exasol.parquetio.merger.ChunkIntervalMerger;
14+
715
import org.apache.hadoop.conf.Configuration;
816
import org.apache.parquet.column.page.PageReadStore;
917
import org.apache.parquet.filter2.compat.FilterCompat;
1018
import org.apache.parquet.hadoop.ParquetFileReader;
1119
import org.apache.parquet.hadoop.util.HadoopInputFile;
12-
import org.apache.parquet.io.*;
20+
import org.apache.parquet.io.ColumnIOFactory;
21+
import org.apache.parquet.io.InputFile;
22+
import org.apache.parquet.io.MessageColumnIO;
23+
import org.apache.parquet.io.ParquetDecodingException;
24+
import org.apache.parquet.io.RecordReader;
1325
import org.apache.parquet.io.api.RecordMaterializer;
1426

15-
import java.io.IOException;
16-
import java.io.UncheckedIOException;
17-
import java.util.Collections;
18-
import java.util.List;
19-
import java.util.function.Consumer;
20-
2127
/**
2228
* A Parquet file reader that reads only provided row groups.
2329
*/
@@ -35,7 +41,7 @@ public class RowParquetChunkReader {
3541
*
3642
* Since no chunks are provided it reads all row groups of given file.
3743
*
38-
* @param file a Parquet file
44+
* @param file a Parquet file
3945
*/
4046
public RowParquetChunkReader(final InputFile file) {
4147
this(file, List.of(new ChunkIntervalImpl(0L, getRowGroupSize(file))));
@@ -60,15 +66,20 @@ public RowParquetChunkReader(final InputFile file, final long start, final long
6066
*/
6167
public RowParquetChunkReader(final InputFile file, final List<ChunkInterval> chunks) {
6268
this.file = file;
63-
this.chunks = chunks;
69+
if (chunks == null || chunks.isEmpty()) {
70+
throw new IllegalArgumentException(
71+
ExaError.messageBuilder("E-PIOJ-5").message("Chunk intervals list is empty.")
72+
.mitigation("Please provide a valid list of Parquet file chunks.").toString());
73+
}
74+
this.chunks = new ChunkIntervalMerger().sortAndMerge(chunks);
6475
final var readSupport = new RowReadSupport();
6576
try (final var reader = ParquetFileReader.open(file)) {
6677
final var conf = getConfiguration(file);
6778
final var schema = reader.getFooter().getFileMetaData().getSchema();
6879
final var readContext = readSupport.init(conf, Collections.emptyMap(), schema);
6980
this.recordMaterializer = readSupport.prepareForRead(conf, Collections.emptyMap(), schema, readContext);
7081
this.messageIO = new ColumnIOFactory(reader.getFooter().getFileMetaData().getCreatedBy())//
71-
.getColumnIO(readContext.getRequestedSchema(), schema, true);
82+
.getColumnIO(readContext.getRequestedSchema(), schema, true);
7283
} catch (IOException exception) {
7384
throw new UncheckedIOException(getFileReadingErrorMessage(file), exception);
7485
} catch (RuntimeException exception) {
@@ -90,12 +101,9 @@ private static long getRowGroupSize(final InputFile file) {
90101
} catch (IOException exception) {
91102
throw new UncheckedIOException(getFileReadingErrorMessage(file), exception);
92103
} catch (RuntimeException exception) {
93-
throw new IllegalStateException(ExaError
94-
.messageBuilder("E-PIOJ-3")
95-
.message("Error getting row group size from a Parquet {{FILE}} file.", file.toString())
96-
.mitigation(CHECK_FILE_MITIGATION).toString(),
97-
exception
98-
);
104+
throw new IllegalStateException(ExaError.messageBuilder("E-PIOJ-3")
105+
.message("Error getting row group size from a Parquet {{FILE}} file.", file.toString())
106+
.mitigation(CHECK_FILE_MITIGATION).toString(), exception);
99107
}
100108
}
101109

@@ -122,7 +130,8 @@ public void read(final Consumer<Row> rowConsumer) {
122130
}
123131
}
124132

125-
private long moveToRowGroupPosition(final ParquetFileReader reader, final long currentPosition, final long startPosition) {
133+
private long moveToRowGroupPosition(final ParquetFileReader reader, final long currentPosition,
134+
final long startPosition) {
126135
long position = currentPosition;
127136
while (position < startPosition) {
128137
reader.skipNextRowGroup();
@@ -132,27 +141,25 @@ private long moveToRowGroupPosition(final ParquetFileReader reader, final long c
132141
}
133142

134143
private void consumeRows(final PageReadStore pageStore, final Consumer<Row> rowConsumer) {
135-
final RecordReader<Row> recordReader = messageIO.getRecordReader(pageStore, recordMaterializer, FilterCompat.NOOP);
144+
final RecordReader<Row> recordReader = messageIO.getRecordReader(pageStore, recordMaterializer,
145+
FilterCompat.NOOP);
136146
consumeRecords(recordReader, pageStore.getRowCount(), rowConsumer);
137147
}
138148

139149
// This similar how Parquet reads records underneath,
140150
// https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java#L217
141-
protected void consumeRecords(final RecordReader<Row> recordReader, final long totalRows, final Consumer<Row> rowConsumer) {
151+
protected void consumeRecords(final RecordReader<Row> recordReader, final long totalRows,
152+
final Consumer<Row> rowConsumer) {
142153
long currentRow = 0;
143154
Row row;
144155
while (currentRow < totalRows) {
145156
currentRow += 1;
146157
try {
147158
row = recordReader.read();
148159
} catch (RecordMaterializer.RecordMaterializationException exception) {
149-
throw new ParquetDecodingException(ExaError
150-
.messageBuilder("F-PIOJ-2")
151-
.message("Failed to materialize a record from the Parquet file {{FILE}}.", this.file.toString())
152-
.mitigation(CHECK_FILE_MITIGATION)
153-
.toString(),
154-
exception
155-
);
160+
throw new ParquetDecodingException(ExaError.messageBuilder("F-PIOJ-2")
161+
.message("Failed to materialize a record from the Parquet file {{FILE}}.", this.file.toString())
162+
.mitigation(CHECK_FILE_MITIGATION).toString(), exception);
156163
}
157164
if (row == null) { // Only happens with FilteredRecordReader at end of block
158165
break;
@@ -164,11 +171,8 @@ protected void consumeRecords(final RecordReader<Row> recordReader, final long t
164171
}
165172

166173
private static String getFileReadingErrorMessage(final InputFile file) {
167-
return ExaError
168-
.messageBuilder("E-PIOJ-1")
169-
.message("Failed to read Parquet file {{FILE}}.", file.toString())
170-
.mitigation(CHECK_FILE_MITIGATION)
171-
.toString();
174+
return ExaError.messageBuilder("E-PIOJ-1").message("Failed to read Parquet file {{FILE}}.", file.toString())
175+
.mitigation(CHECK_FILE_MITIGATION).toString();
172176
}
173177

174178
}

src/main/java/com/exasol/parquetio/splitter/ParquetFileSplitter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public ParquetFileSplitter(final InputFile file) {
3535
/**
3636
* Creates a new instance of {@link ParquetFileSplitter}.
3737
*
38-
* @param file a Parquet file
38+
* @param file a Parquet file
3939
* @param chunkSize a chunk size in bytes
4040
*/
4141
public ParquetFileSplitter(final InputFile file, final long chunkSize) {
@@ -48,11 +48,9 @@ public List<ChunkInterval> getSplits() {
4848
try (final var reader = ParquetFileReader.open(file)) {
4949
return getRowGroupSplits(reader.getRowGroups());
5050
} catch (Exception exception) {
51-
throw new IllegalStateException(ExaError
52-
.messageBuilder("E-PIOJ-4")
53-
.message("Failed to open a Parquet file {{FILE}} for splitting.", this.file.toString()).toString(),
54-
exception
55-
);
51+
throw new IllegalStateException(ExaError.messageBuilder("E-PIOJ-4")
52+
.message("Failed to open a Parquet file {{FILE}} for splitting.", this.file.toString()).toString(),
53+
exception);
5654
}
5755
}
5856

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.exasol.parquetio.merger;
2+
3+
import static org.hamcrest.CoreMatchers.equalTo;
4+
import static org.hamcrest.MatcherAssert.assertThat;
5+
6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.stream.Stream;
9+
10+
import com.exasol.parquetio.data.ChunkInterval;
11+
import com.exasol.parquetio.data.ChunkIntervalImpl;
12+
13+
import org.junit.jupiter.params.ParameterizedTest;
14+
import org.junit.jupiter.params.provider.Arguments;
15+
import org.junit.jupiter.params.provider.MethodSource;
16+
17+
class ChunkIntervalMergerTest {
18+
19+
static Stream<Arguments> getChunks() {
20+
return Stream.of(//
21+
Arguments.of(null, null), //
22+
Arguments.of(Collections.emptyList(), Collections.emptyList()),
23+
Arguments.of(List.of(new ChunkIntervalImpl(0, 1)), List.of(new ChunkIntervalImpl(0, 1))),
24+
Arguments.of(List.of(new ChunkIntervalImpl(1, 2), new ChunkIntervalImpl(0, 1)),
25+
List.of(new ChunkIntervalImpl(0, 2))),
26+
Arguments.of(
27+
List.of(new ChunkIntervalImpl(1, 2), new ChunkIntervalImpl(0, 3), new ChunkIntervalImpl(4, 5)),
28+
List.of(new ChunkIntervalImpl(0, 3), new ChunkIntervalImpl(4, 5)))//
29+
);
30+
}
31+
32+
@ParameterizedTest
33+
@MethodSource("getChunks")
34+
void testSortAndMerge(final List<ChunkInterval> input, final List<ChunkInterval> expected) {
35+
assertThat(new ChunkIntervalMerger().sortAndMerge(input), equalTo(expected));
36+
}
37+
38+
}

0 commit comments

Comments
 (0)