Skip to content

Commit c36e236

Browse files
authored
Merge branch 'apache:master' into bloom_filter_length
2 parents 83a9777 + 330242b commit c36e236

File tree

13 files changed

+418
-29
lines changed

13 files changed

+418
-29
lines changed

NOTICE

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11

2-
Apache Parquet MR (Incubating)
3-
Copyright 2014 The Apache Software Foundation
2+
Apache Parquet MR
3+
Copyright 2014-2024 The Apache Software Foundation
44

55
This product includes software developed at
66
The Apache Software Foundation (http://www.apache.org/).

parquet-cli/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@
211211
<dependency>
212212
<groupId>commons-logging</groupId>
213213
<artifactId>commons-logging</artifactId>
214-
<version>1.1.3</version>
214+
<version>1.3.0</version>
215215
<scope>${deps.scope}</scope>
216216
</dependency>
217217
</dependencies>

parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.parquet.crypto.FileDecryptionProperties;
3030
import org.apache.parquet.filter2.compat.FilterCompat;
3131
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
32+
import org.apache.parquet.hadoop.ParquetMetricsCallback;
3233

3334
public class HadoopReadOptions extends ParquetReadOptions {
3435
private final Configuration conf;
@@ -49,7 +50,8 @@ private HadoopReadOptions(
4950
int maxAllocationSize,
5051
Map<String, String> properties,
5152
Configuration conf,
52-
FileDecryptionProperties fileDecryptionProperties) {
53+
FileDecryptionProperties fileDecryptionProperties,
54+
ParquetMetricsCallback metricsCallback) {
5355
super(
5456
useSignedStringMinMax,
5557
useStatsFilter,
@@ -66,6 +68,7 @@ private HadoopReadOptions(
6668
maxAllocationSize,
6769
properties,
6870
fileDecryptionProperties,
71+
metricsCallback,
6972
new HadoopParquetConfiguration(conf));
7073
this.conf = conf;
7174
}
@@ -127,7 +130,8 @@ public ParquetReadOptions build() {
127130
maxAllocationSize,
128131
properties,
129132
conf,
130-
fileDecryptionProperties);
133+
fileDecryptionProperties,
134+
metricsCallback);
131135
}
132136
}
133137

parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.parquet.crypto.FileDecryptionProperties;
4444
import org.apache.parquet.filter2.compat.FilterCompat;
4545
import org.apache.parquet.format.converter.ParquetMetadataConverter;
46+
import org.apache.parquet.hadoop.ParquetMetricsCallback;
4647
import org.apache.parquet.hadoop.util.HadoopCodecs;
4748

4849
// Internal use only
@@ -75,6 +76,7 @@ public class ParquetReadOptions {
7576
private final Map<String, String> properties;
7677
private final FileDecryptionProperties fileDecryptionProperties;
7778
private final ParquetConfiguration conf;
79+
private final ParquetMetricsCallback metricsCallback;
7880

7981
ParquetReadOptions(
8082
boolean useSignedStringMinMax,
@@ -91,7 +93,8 @@ public class ParquetReadOptions {
9193
ByteBufferAllocator allocator,
9294
int maxAllocationSize,
9395
Map<String, String> properties,
94-
FileDecryptionProperties fileDecryptionProperties) {
96+
FileDecryptionProperties fileDecryptionProperties,
97+
ParquetMetricsCallback metricsCallback) {
9598
this(
9699
useSignedStringMinMax,
97100
useStatsFilter,
@@ -108,6 +111,7 @@ public class ParquetReadOptions {
108111
maxAllocationSize,
109112
properties,
110113
fileDecryptionProperties,
114+
metricsCallback,
111115
new HadoopParquetConfiguration());
112116
}
113117

@@ -127,6 +131,7 @@ public class ParquetReadOptions {
127131
int maxAllocationSize,
128132
Map<String, String> properties,
129133
FileDecryptionProperties fileDecryptionProperties,
134+
ParquetMetricsCallback metricsCallback,
130135
ParquetConfiguration conf) {
131136
this.useSignedStringMinMax = useSignedStringMinMax;
132137
this.useStatsFilter = useStatsFilter;
@@ -143,6 +148,7 @@ public class ParquetReadOptions {
143148
this.maxAllocationSize = maxAllocationSize;
144149
this.properties = Collections.unmodifiableMap(properties);
145150
this.fileDecryptionProperties = fileDecryptionProperties;
151+
this.metricsCallback = metricsCallback;
146152
this.conf = conf;
147153
}
148154

@@ -210,6 +216,10 @@ public FileDecryptionProperties getDecryptionProperties() {
210216
return fileDecryptionProperties;
211217
}
212218

219+
public ParquetMetricsCallback getMetricsCallback() {
220+
return metricsCallback;
221+
}
222+
213223
public boolean isEnabled(String property, boolean defaultValue) {
214224
Optional<String> propValue = Optional.ofNullable(properties.get(property));
215225
return propValue.map(Boolean::parseBoolean).orElse(defaultValue);
@@ -245,6 +255,7 @@ public static class Builder {
245255
protected Map<String, String> properties = new HashMap<>();
246256
protected FileDecryptionProperties fileDecryptionProperties = null;
247257
protected ParquetConfiguration conf;
258+
protected ParquetMetricsCallback metricsCallback;
248259

249260
public Builder() {
250261
this(new HadoopParquetConfiguration());
@@ -391,6 +402,11 @@ public Builder withDecryption(FileDecryptionProperties fileDecryptionProperties)
391402
return this;
392403
}
393404

405+
public Builder withMetricsCallback(ParquetMetricsCallback metricsCallback) {
406+
this.metricsCallback = metricsCallback;
407+
return this;
408+
}
409+
394410
public Builder set(String key, String value) {
395411
properties.put(key, value);
396412
return this;
@@ -407,6 +423,7 @@ public Builder copy(ParquetReadOptions options) {
407423
withAllocator(options.allocator);
408424
withPageChecksumVerification(options.usePageChecksumVerification);
409425
withDecryption(options.fileDecryptionProperties);
426+
withMetricsCallback(options.metricsCallback);
410427
conf = options.conf;
411428
for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
412429
set(keyValue.getKey(), keyValue.getValue());
@@ -439,6 +456,7 @@ public ParquetReadOptions build() {
439456
maxAllocationSize,
440457
properties,
441458
fileDecryptionProperties,
459+
metricsCallback,
442460
conf);
443461
}
444462
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,13 @@ public DataPage visit(DataPageV1 dataPageV1) {
156156

157157
ByteBuffer decompressedBuffer =
158158
options.getAllocator().allocate(dataPageV1.getUncompressedSize());
159+
long start = System.nanoTime();
159160
decompressor.decompress(
160161
byteBuffer,
161162
(int) compressedSize,
162163
decompressedBuffer,
163164
dataPageV1.getUncompressedSize());
165+
setDecompressMetrics(bytes, start);
164166

165167
// HACKY: sometimes we need to do `flip` because the position of output bytebuffer is
166168
// not reset.
@@ -172,7 +174,9 @@ public DataPage visit(DataPageV1 dataPageV1) {
172174
if (null != blockDecryptor) {
173175
bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
174176
}
177+
long start = System.nanoTime();
175178
decompressed = decompressor.decompress(bytes, dataPageV1.getUncompressedSize());
179+
setDecompressMetrics(bytes, start);
176180
}
177181

178182
final DataPageV1 decompressedPage;
@@ -234,8 +238,10 @@ public DataPage visit(DataPageV2 dataPageV2) {
234238
- dataPageV2.getRepetitionLevels().size());
235239
ByteBuffer decompressedBuffer =
236240
options.getAllocator().allocate(uncompressedSize);
241+
long start = System.nanoTime();
237242
decompressor.decompress(
238243
byteBuffer, (int) compressedSize, decompressedBuffer, uncompressedSize);
244+
setDecompressMetrics(pageBytes, start);
239245

240246
// HACKY: sometimes we need to do `flip` because the position of output bytebuffer is
241247
// not reset.
@@ -255,7 +261,9 @@ public DataPage visit(DataPageV2 dataPageV2) {
255261
int uncompressedSize = Math.toIntExact(dataPageV2.getUncompressedSize()
256262
- dataPageV2.getDefinitionLevels().size()
257263
- dataPageV2.getRepetitionLevels().size());
264+
long start = System.nanoTime();
258265
pageBytes = decompressor.decompress(pageBytes, uncompressedSize);
266+
setDecompressMetrics(pageBytes, start);
259267
}
260268
}
261269
} catch (IOException e) {
@@ -293,6 +301,23 @@ public DataPage visit(DataPageV2 dataPageV2) {
293301
});
294302
}
295303

304+
private void setDecompressMetrics(BytesInput bytes, long start) {
305+
final ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
306+
if (metricsCallback != null) {
307+
long time = Math.max(System.nanoTime() - start, 0);
308+
long len = bytes.size();
309+
double throughput = ((double) len / time) * ((double) 1000_000_000L) / (1024 * 1024);
310+
LOG.debug(
311+
"Decompress block: Length: {} MB, Time: {} msecs, throughput: {} MB/s",
312+
len / (1024 * 1024),
313+
time / 1000_000L,
314+
throughput);
315+
metricsCallback.setDuration(ParquetFileReaderMetrics.DecompressTime.name(), time);
316+
metricsCallback.setValueLong(ParquetFileReaderMetrics.DecompressSize.name(), len);
317+
metricsCallback.setValueDouble(ParquetFileReaderMetrics.DecompressThroughput.name(), throughput);
318+
}
319+
}
320+
296321
@Override
297322
public DictionaryPage readDictionaryPage() {
298323
if (compressedDictionaryPage == null) {
@@ -303,6 +328,10 @@ public DictionaryPage readDictionaryPage() {
303328
if (null != blockDecryptor) {
304329
bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dictionaryPageAAD));
305330
}
331+
long start = System.nanoTime();
332+
BytesInput decompressed =
333+
decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize());
334+
setDecompressMetrics(bytes, start);
306335
DictionaryPage decompressedPage = new DictionaryPage(
307336
decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize()),
308337
compressedDictionaryPage.getDictionarySize(),

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,38 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer)
824824
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
825825
}
826826

827+
/**
828+
* @param conf the Hadoop Configuration
829+
* @param file Path to a parquet file
830+
* @param footer a {@link ParquetMetadata} footer already read from the file
831+
* @param options {@link ParquetReadOptions}
832+
* @throws IOException if the file can not be opened
833+
*/
834+
public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer, ParquetReadOptions options)
835+
throws IOException {
836+
this.converter = new ParquetMetadataConverter(conf);
837+
this.file = HadoopInputFile.fromPath(file, conf);
838+
this.f = this.file.newStream();
839+
this.fileMetaData = footer.getFileMetaData();
840+
this.fileDecryptor = fileMetaData.getFileDecryptor();
841+
this.options = options;
842+
this.footer = footer;
843+
try {
844+
this.blocks = filterRowGroups(footer.getBlocks());
845+
} catch (Exception e) {
846+
// In case that filterRowGroups throws an exception in the constructor, the new stream
847+
// should be closed. Otherwise, there's no way to close this outside.
848+
f.close();
849+
throw e;
850+
}
851+
this.blockIndexStores = listWithNulls(this.blocks.size());
852+
this.blockRowRanges = listWithNulls(this.blocks.size());
853+
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
854+
paths.put(ColumnPath.get(col.getPath()), col);
855+
}
856+
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
857+
}
858+
827859
public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
828860
this.converter = new ParquetMetadataConverter(options);
829861
this.file = file;
@@ -1004,7 +1036,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
10041036
ColumnChunkPageReadStore rowGroup =
10051037
new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset());
10061038
// prepare the list of consecutive parts to read them in one scan
1007-
List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
1039+
List<ConsecutivePartList> allParts = new ArrayList<>();
10081040
ConsecutivePartList currentParts = null;
10091041
for (ColumnChunkMetaData mc : block.getColumns()) {
10101042
ColumnPath pathKey = mc.getPath();
@@ -1979,10 +2011,12 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
19792011
buffers.add(options.getAllocator().allocate(lastAllocationSize));
19802012
}
19812013

2014+
long readStart = System.nanoTime();
19822015
for (ByteBuffer buffer : buffers) {
19832016
f.readFully(buffer);
19842017
buffer.flip();
19852018
}
2019+
setReadMetrics(readStart);
19862020

19872021
// report in a counter the data we just scanned
19882022
BenchmarkCounter.incrementBytesRead(length);
@@ -1992,6 +2026,24 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
19922026
}
19932027
}
19942028

2029+
private void setReadMetrics(long startNs) {
2030+
ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
2031+
if (metricsCallback != null) {
2032+
long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0);
2033+
double sizeInMb = ((double) length) / (1024 * 1024);
2034+
double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L;
2035+
double throughput = sizeInMb / timeInSec;
2036+
LOG.debug(
2037+
"Parquet: File Read stats: Length: {} MB, Time: {} secs, throughput: {} MB/sec ",
2038+
sizeInMb,
2039+
timeInSec,
2040+
throughput);
2041+
metricsCallback.setDuration(ParquetFileReaderMetrics.ReadTime.name(), totalFileReadTimeNs);
2042+
metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), length);
2043+
metricsCallback.setValueDouble(ParquetFileReaderMetrics.ReadThroughput.name(), throughput);
2044+
}
2045+
}
2046+
19952047
/**
19962048
* @return the position following the last byte of these chunks
19972049
*/
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop;
20+
21+
public enum ParquetFileReaderMetrics {
22+
23+
// metrics
24+
ReadTime("time spent in reading Parquet file from storage"),
25+
SeekTime("time spent in seek when reading Parquet file from storage"),
26+
ReadSize("read size when reading Parquet file from storage (MB)"),
27+
ReadThroughput("read throughput when reading Parquet file from storage (MB/sec)"),
28+
DecompressTime("time spent in block decompression"),
29+
DecompressSize("decompressed data size (MB)"),
30+
DecompressThroughput("block decompression throughput (MB/sec)");
31+
32+
private final String desc;
33+
34+
ParquetFileReaderMetrics(String desc) {
35+
this.desc = desc;
36+
}
37+
38+
public String description() {
39+
return desc;
40+
}
41+
}

0 commit comments

Comments
 (0)