Skip to content

Commit ded4022

Browse files
authored
feat: Encapsulate Parquet objects (#1920)
1 parent 469ee6e commit ded4022

File tree

5 files changed

+229
-4
lines changed

5 files changed

+229
-4
lines changed

common/src/main/java/org/apache/comet/parquet/ColumnReader.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,13 @@ public void setPageReader(PageReader pageReader) throws IOException {
126126
}
127127
}
128128

129+
/** This method is called from Apache Iceberg. */
130+
public void setRowGroupReader(RowGroupReader rowGroupReader, ParquetColumnSpec columnSpec)
131+
throws IOException {
132+
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(columnSpec);
133+
setPageReader(rowGroupReader.getPageReader(descriptor));
134+
}
135+
129136
@Override
130137
public void readBatch(int total) {
131138
LOG.debug("Start to batch of size = " + total);

common/src/main/java/org/apache/comet/parquet/FileReader.java

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import java.nio.ByteBuffer;
2828
import java.util.ArrayList;
2929
import java.util.Arrays;
30+
import java.util.Collection;
3031
import java.util.HashMap;
3132
import java.util.List;
3233
import java.util.Map;
34+
import java.util.Set;
3335
import java.util.concurrent.ExecutionException;
3436
import java.util.concurrent.ExecutorService;
3537
import java.util.concurrent.Future;
@@ -40,6 +42,9 @@
4042
import org.slf4j.Logger;
4143
import org.slf4j.LoggerFactory;
4244

45+
import org.apache.hadoop.conf.Configuration;
46+
import org.apache.hadoop.fs.Path;
47+
import org.apache.parquet.HadoopReadOptions;
4348
import org.apache.parquet.ParquetReadOptions;
4449
import org.apache.parquet.Preconditions;
4550
import org.apache.parquet.bytes.ByteBufferInputStream;
@@ -53,6 +58,7 @@
5358
import org.apache.parquet.column.page.PageReadStore;
5459
import org.apache.parquet.compression.CompressionCodecFactory;
5560
import org.apache.parquet.crypto.AesCipher;
61+
import org.apache.parquet.crypto.EncryptionPropertiesFactory;
5662
import org.apache.parquet.crypto.FileDecryptionProperties;
5763
import org.apache.parquet.crypto.InternalColumnDecryptionSetup;
5864
import org.apache.parquet.crypto.InternalFileDecryptor;
@@ -67,6 +73,7 @@
6773
import org.apache.parquet.format.PageHeader;
6874
import org.apache.parquet.format.Util;
6975
import org.apache.parquet.format.converter.ParquetMetadataConverter;
76+
import org.apache.parquet.hadoop.ParquetInputFormat;
7077
import org.apache.parquet.hadoop.metadata.BlockMetaData;
7178
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
7279
import org.apache.parquet.hadoop.metadata.ColumnPath;
@@ -128,6 +135,48 @@ public FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometO
128135
this(file, null, options, cometOptions, null);
129136
}
130137

138+
/** This constructor is called from Apache Iceberg. */
139+
public FileReader(
140+
Path path,
141+
Configuration conf,
142+
ReadOptions cometOptions,
143+
Map<String, String> properties,
144+
Long start,
145+
Long length,
146+
byte[] fileEncryptionKey,
147+
byte[] fileAADPrefix)
148+
throws IOException {
149+
ParquetReadOptions options =
150+
buildParquetReadOptions(conf, properties, start, length, fileEncryptionKey, fileAADPrefix);
151+
this.converter = new ParquetMetadataConverter(options);
152+
this.file = CometInputFile.fromPath(path, conf);
153+
this.f = file.newStream();
154+
this.options = options;
155+
this.cometOptions = cometOptions;
156+
this.metrics = null;
157+
try {
158+
this.footer = readFooter(file, options, f, converter);
159+
} catch (Exception e) {
160+
// In case that reading footer throws an exception in the constructor, the new stream
161+
// should be closed. Otherwise, there's no way to close this outside.
162+
f.close();
163+
throw e;
164+
}
165+
this.fileMetaData = footer.getFileMetaData();
166+
this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups!
167+
if (null != fileDecryptor && fileDecryptor.plaintextFile()) {
168+
this.fileDecryptor = null; // Plaintext file. No need in decryptor
169+
}
170+
171+
this.blocks = footer.getBlocks(); // filter row group in iceberg
172+
this.blockIndexStores = listWithNulls(this.blocks.size());
173+
this.blockRowRanges = listWithNulls(this.blocks.size());
174+
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
175+
paths.put(ColumnPath.get(col.getPath()), col);
176+
}
177+
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
178+
}
179+
131180
public FileReader(
132181
InputFile file,
133182
ParquetReadOptions options,
@@ -209,6 +258,57 @@ public void setRequestedSchema(List<ColumnDescriptor> projection) {
209258
}
210259
}
211260

261+
/** This method is called from Apache Iceberg. */
262+
public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specList) {
263+
paths.clear();
264+
for (ParquetColumnSpec colSpec : specList) {
265+
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(colSpec);
266+
paths.put(ColumnPath.get(colSpec.getPath()), descriptor);
267+
}
268+
}
269+
270+
private static ParquetReadOptions buildParquetReadOptions(
271+
Configuration conf,
272+
Map<String, String> properties,
273+
Long start,
274+
Long length,
275+
byte[] fileEncryptionKey,
276+
byte[] fileAADPrefix) {
277+
278+
// Iceberg remove these read properties when building the ParquetReadOptions.
279+
// We want build the exact same ParquetReadOptions as Iceberg's.
280+
Collection<String> readPropertiesToRemove =
281+
Set.of(
282+
ParquetInputFormat.UNBOUND_RECORD_FILTER,
283+
ParquetInputFormat.FILTER_PREDICATE,
284+
ParquetInputFormat.READ_SUPPORT_CLASS,
285+
EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME);
286+
287+
for (String property : readPropertiesToRemove) {
288+
conf.unset(property);
289+
}
290+
291+
ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(conf);
292+
for (Map.Entry<String, String> entry : properties.entrySet()) {
293+
optionsBuilder.set(entry.getKey(), entry.getValue());
294+
}
295+
296+
if (start != null && length != null) {
297+
optionsBuilder.withRange(start, start + length);
298+
}
299+
300+
if (fileEncryptionKey != null) {
301+
FileDecryptionProperties fileDecryptionProperties =
302+
FileDecryptionProperties.builder()
303+
.withFooterKey(fileEncryptionKey)
304+
.withAADPrefix(fileAADPrefix)
305+
.build();
306+
optionsBuilder.withDecryption(fileDecryptionProperties);
307+
}
308+
309+
return optionsBuilder.build();
310+
}
311+
212312
/**
213313
* Gets the total number of records across all row groups (after applying row group filtering).
214314
*/
@@ -245,15 +345,15 @@ public boolean skipNextRowGroup() {
245345
* Returns the next row group to read (after applying row group filtering), or null if there's no
246346
* more row group.
247347
*/
248-
public PageReadStore readNextRowGroup() throws IOException {
348+
public RowGroupReader readNextRowGroup() throws IOException {
249349
if (currentBlock == blocks.size()) {
250350
return null;
251351
}
252352
BlockMetaData block = blocks.get(currentBlock);
253353
if (block.getRowCount() == 0) {
254354
throw new RuntimeException("Illegal row group of 0 rows");
255355
}
256-
this.currentRowGroup = new RowGroupReader(block.getRowCount());
356+
this.currentRowGroup = new RowGroupReader(block.getRowCount(), block.getRowIndexOffset());
257357
// prepare the list of consecutive parts to read them in one scan
258358
List<ConsecutivePartList> allParts = new ArrayList<>();
259359
ConsecutivePartList currentParts = null;
@@ -362,7 +462,7 @@ ColumnIndexReader getColumnIndexReader(int blockIndex) {
362462
return ciStore;
363463
}
364464

365-
private PageReadStore readChunks(
465+
private RowGroupReader readChunks(
366466
BlockMetaData block, List<ConsecutivePartList> allParts, ChunkListBuilder builder)
367467
throws IOException {
368468
if (shouldReadParallel()) {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet;
21+
22+
public class ParquetColumnSpec {
23+
24+
private final String[] path;
25+
private final String physicalType;
26+
private final int typeLength;
27+
private final boolean isRepeated;
28+
private final int maxDefinitionLevel;
29+
private final int maxRepetitionLevel;
30+
31+
public ParquetColumnSpec(
32+
String[] path,
33+
String physicalType,
34+
int typeLength,
35+
boolean isRepeated,
36+
int maxDefinitionLevel,
37+
int maxRepetitionLevel) {
38+
this.path = path;
39+
this.physicalType = physicalType;
40+
this.typeLength = typeLength;
41+
this.isRepeated = isRepeated;
42+
this.maxDefinitionLevel = maxDefinitionLevel;
43+
this.maxRepetitionLevel = maxRepetitionLevel;
44+
}
45+
46+
public String[] getPath() {
47+
return path;
48+
}
49+
50+
public String getPhysicalType() {
51+
return physicalType;
52+
}
53+
54+
public int getTypeLength() {
55+
return typeLength;
56+
}
57+
58+
public boolean isRepeated() {
59+
return isRepeated;
60+
}
61+
62+
public int getMaxRepetitionLevel() {
63+
return maxRepetitionLevel;
64+
}
65+
66+
public int getMaxDefinitionLevel() {
67+
return maxDefinitionLevel;
68+
}
69+
}

common/src/main/java/org/apache/comet/parquet/RowGroupReader.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,18 @@ public class RowGroupReader implements PageReadStore {
3333
private final Map<ColumnDescriptor, PageReader> readers = new HashMap<>();
3434
private final long rowCount;
3535
private final RowRanges rowRanges;
36+
private final long rowIndexOffset;
3637

37-
public RowGroupReader(long rowCount) {
38+
public RowGroupReader(long rowCount, long rowIndexOffset) {
3839
this.rowCount = rowCount;
3940
this.rowRanges = null;
41+
this.rowIndexOffset = rowIndexOffset;
4042
}
4143

4244
RowGroupReader(RowRanges rowRanges) {
4345
this.rowRanges = rowRanges;
4446
this.rowCount = rowRanges.rowCount();
47+
this.rowIndexOffset = -1;
4548
}
4649

4750
@Override
@@ -64,6 +67,11 @@ public Optional<PrimitiveIterator.OfLong> getRowIndexes() {
6467
return rowRanges == null ? Optional.empty() : Optional.of(rowRanges.iterator());
6568
}
6669

70+
@Override
71+
public Optional<Long> getRowIndexOffset() {
72+
return this.rowIndexOffset < 0L ? Optional.empty() : Optional.of(this.rowIndexOffset);
73+
}
74+
6775
void addColumn(ColumnDescriptor path, ColumnPageReader reader) {
6876
if (readers.put(path, reader) != null) {
6977
throw new IllegalStateException(path + " was already added");

common/src/main/java/org/apache/comet/parquet/Utils.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,29 @@
2121

2222
import org.apache.parquet.column.ColumnDescriptor;
2323
import org.apache.parquet.schema.LogicalTypeAnnotation;
24+
import org.apache.parquet.schema.MessageType;
2425
import org.apache.parquet.schema.PrimitiveType;
26+
import org.apache.parquet.schema.Type;
2527
import org.apache.spark.sql.types.*;
2628

2729
import org.apache.comet.CometSchemaImporter;
2830

2931
public class Utils {
3032

3133
/** This method is called from Apache Iceberg. */
34+
public static ColumnReader getColumnReader(
35+
DataType type,
36+
ParquetColumnSpec columnSpec,
37+
CometSchemaImporter importer,
38+
int batchSize,
39+
boolean useDecimal128,
40+
boolean useLazyMaterialization) {
41+
42+
ColumnDescriptor descriptor = buildColumnDescriptor(columnSpec);
43+
return getColumnReader(
44+
type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true);
45+
}
46+
3247
public static ColumnReader getColumnReader(
3348
DataType type,
3449
ColumnDescriptor descriptor,
@@ -260,4 +275,30 @@ static int getTimeUnitId(LogicalTypeAnnotation.TimeUnit tu) {
260275
throw new UnsupportedOperationException("Unsupported TimeUnit " + tu);
261276
}
262277
}
278+
279+
public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) {
280+
PrimitiveType.PrimitiveTypeName primType =
281+
PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType());
282+
283+
Type.Repetition repetition;
284+
if (columnSpec.getMaxRepetitionLevel() > 0) {
285+
repetition = Type.Repetition.REPEATED;
286+
} else if (columnSpec.getMaxDefinitionLevel() > 0) {
287+
repetition = Type.Repetition.OPTIONAL;
288+
} else {
289+
repetition = Type.Repetition.REQUIRED;
290+
}
291+
292+
String name = columnSpec.getPath()[columnSpec.getPath().length - 1];
293+
294+
PrimitiveType primitiveType;
295+
if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
296+
primitiveType = new PrimitiveType(repetition, primType, columnSpec.getTypeLength(), name);
297+
} else {
298+
primitiveType = new PrimitiveType(repetition, primType, name);
299+
}
300+
301+
MessageType schema = new MessageType("root", primitiveType);
302+
return schema.getColumnDescription(columnSpec.getPath());
303+
}
263304
}

0 commit comments

Comments
 (0)