Skip to content

Commit e95f12b

Browse files
pan3793dongjoon-hyun
authored andcommitted
[SPARK-53633][SQL] Reuse InputStream in vectorized Parquet reader
### What changes were proposed in this pull request? Reuse InputStream in vectorized Parquet reader between reading the footer and row groups, on the executor side. This PR is part of SPARK-52011, you can check more details at #50765 ### Why are the changes needed? Reduce unnecessary RPCs of NameNode to improve performance and stability for large Hadoop clusters. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? See #50765 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52384 from pan3793/SPARK-53633. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 1841dd2 commit e95f12b

File tree

9 files changed

+446
-288
lines changed

9 files changed

+446
-288
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.parquet;
19+
20+
import java.util.Optional;
21+
22+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
23+
import org.apache.parquet.hadoop.util.HadoopInputFile;
24+
import org.apache.parquet.io.SeekableInputStream;
25+
26+
public record OpenedParquetFooter(
27+
ParquetMetadata footer,
28+
HadoopInputFile inputFile,
29+
Optional<SeekableInputStream> inputStreamOpt) {
30+
31+
public SeekableInputStream inputStream() {
32+
return inputStreamOpt.get();
33+
}
34+
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
package org.apache.spark.sql.execution.datasources.parquet;
1919

2020
import java.io.IOException;
21+
import java.util.Optional;
2122

2223
import org.apache.hadoop.conf.Configuration;
23-
import org.apache.hadoop.fs.FileStatus;
24-
import org.apache.hadoop.fs.Path;
2524
import org.apache.parquet.HadoopReadOptions;
2625
import org.apache.parquet.ParquetReadOptions;
2726
import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -37,53 +36,77 @@
3736
*/
3837
public class ParquetFooterReader {
3938

40-
public static final boolean SKIP_ROW_GROUPS = true;
41-
public static final boolean WITH_ROW_GROUPS = false;
42-
4339
/**
44-
* Reads footer for the input Parquet file 'split'. If 'skipRowGroup' is true,
45-
* this will skip reading the Parquet row group metadata.
40+
* Build a filter for reading footer of the input Parquet file 'split'.
41+
* If 'skipRowGroup' is true, this will skip reading the Parquet row group metadata.
4642
*
4743
* @param file a part (i.e. "block") of a single file that should be read
48-
* @param configuration hadoop configuration of file
44+
* @param hadoopConf hadoop configuration of file
4945
* @param skipRowGroup If true, skip reading row groups;
5046
* if false, read row groups according to the file split range
5147
*/
52-
public static ParquetMetadata readFooter(
53-
Configuration configuration,
54-
PartitionedFile file,
55-
boolean skipRowGroup) throws IOException {
56-
long fileStart = file.start();
57-
ParquetMetadataConverter.MetadataFilter filter;
48+
public static ParquetMetadataConverter.MetadataFilter buildFilter(
49+
Configuration hadoopConf, PartitionedFile file, boolean skipRowGroup) {
5850
if (skipRowGroup) {
59-
filter = ParquetMetadataConverter.SKIP_ROW_GROUPS;
51+
return ParquetMetadataConverter.SKIP_ROW_GROUPS;
6052
} else {
61-
filter = HadoopReadOptions.builder(configuration, file.toPath())
53+
long fileStart = file.start();
54+
return HadoopReadOptions.builder(hadoopConf, file.toPath())
6255
.withRange(fileStart, fileStart + file.length())
6356
.build()
6457
.getMetadataFilter();
6558
}
66-
return readFooter(configuration, file.toPath(), filter);
67-
}
68-
69-
public static ParquetMetadata readFooter(Configuration configuration,
70-
Path file, ParquetMetadataConverter.MetadataFilter filter) throws IOException {
71-
return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
72-
}
73-
74-
public static ParquetMetadata readFooter(Configuration configuration,
75-
FileStatus fileStatus, ParquetMetadataConverter.MetadataFilter filter) throws IOException {
76-
return readFooter(HadoopInputFile.fromStatus(fileStatus, configuration), filter);
7759
}
7860

79-
private static ParquetMetadata readFooter(HadoopInputFile inputFile,
61+
public static ParquetMetadata readFooter(
62+
HadoopInputFile inputFile,
8063
ParquetMetadataConverter.MetadataFilter filter) throws IOException {
81-
ParquetReadOptions readOptions =
82-
HadoopReadOptions.builder(inputFile.getConfiguration(), inputFile.getPath())
64+
ParquetReadOptions readOptions = HadoopReadOptions
65+
.builder(inputFile.getConfiguration(), inputFile.getPath())
8366
.withMetadataFilter(filter).build();
84-
// Use try-with-resources to ensure fd is closed.
85-
try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) {
67+
try (var fileReader = ParquetFileReader.open(inputFile, readOptions)) {
8668
return fileReader.getFooter();
8769
}
8870
}
71+
72+
/**
73+
* Decoding Parquet files generally involves two steps:
74+
* 1. read and resolve the metadata (footer),
75+
* 2. read and decode the row groups/column chunks.
76+
* <p>
77+
* It's possible to avoid opening the file twice by resuing the SeekableInputStream.
78+
* When keepInputStreamOpen is true, the caller takes responsibility to close the
79+
* SeekableInputStream. Currently, this is only supported by parquet vectorized reader.
80+
*
81+
* @param hadoopConf hadoop configuration of file
82+
* @param file a part (i.e. "block") of a single file that should be read
83+
* @param keepInputStreamOpen when true, keep the SeekableInputStream of file being open
84+
* @return if keepInputStreamOpen is true, the returned OpenedParquetFooter carries
85+
* Some(SeekableInputStream), otherwise None.
86+
*/
87+
public static OpenedParquetFooter openFileAndReadFooter(
88+
Configuration hadoopConf,
89+
PartitionedFile file,
90+
boolean keepInputStreamOpen) throws IOException {
91+
var readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath())
92+
// `keepInputStreamOpen` is true only when parquet vectorized reader is used
93+
// on the caller side, in such a case, the footer will be resued later on
94+
// reading row groups, so here must read row groups metadata ahead.
95+
// when false, the caller uses parquet-mr to read the file, only file metadata
96+
// is required on planning phase, and parquet-mr will read the footer again
97+
// on reading row groups.
98+
.withMetadataFilter(buildFilter(hadoopConf, file, !keepInputStreamOpen))
99+
.build();
100+
var inputFile = HadoopInputFile.fromPath(file.toPath(), hadoopConf);
101+
var inputStream = inputFile.newStream();
102+
try (var fileReader = ParquetFileReader.open(inputFile, readOptions, inputStream)) {
103+
var footer = fileReader.getFooter();
104+
if (keepInputStreamOpen) {
105+
fileReader.detachFileInputStream();
106+
return new OpenedParquetFooter(footer, inputFile, Optional.of(inputStream));
107+
} else {
108+
return new OpenedParquetFooter(footer, inputFile, Optional.empty());
109+
}
110+
}
111+
}
89112
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
5050
import org.apache.parquet.hadoop.util.ConfigurationUtil;
5151
import org.apache.parquet.hadoop.util.HadoopInputFile;
52+
import org.apache.parquet.io.SeekableInputStream;
5253
import org.apache.parquet.schema.MessageType;
5354
import org.apache.parquet.schema.Types;
5455

@@ -89,24 +90,27 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
8990
@Override
9091
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
9192
throws IOException, InterruptedException {
92-
initialize(inputSplit, taskAttemptContext, Option.empty());
93+
initialize(inputSplit, taskAttemptContext, Option.empty(), Option.empty(), Option.empty());
9394
}
9495

9596
public void initialize(
9697
InputSplit inputSplit,
9798
TaskAttemptContext taskAttemptContext,
99+
Option<HadoopInputFile> inputFile,
100+
Option<SeekableInputStream> inputStream,
98101
Option<ParquetMetadata> fileFooter) throws IOException, InterruptedException {
99102
Configuration configuration = taskAttemptContext.getConfiguration();
100103
FileSplit split = (FileSplit) inputSplit;
101104
this.file = split.getPath();
105+
ParquetReadOptions options = HadoopReadOptions
106+
.builder(configuration, file)
107+
.withRange(split.getStart(), split.getStart() + split.getLength())
108+
.build();
102109
ParquetFileReader fileReader;
103-
if (fileFooter.isDefined()) {
104-
fileReader = new ParquetFileReader(configuration, file, fileFooter.get());
110+
if (inputFile.isDefined() && fileFooter.isDefined() && inputStream.isDefined()) {
111+
fileReader = new ParquetFileReader(
112+
inputFile.get(), fileFooter.get(), options, inputStream.get());
105113
} else {
106-
ParquetReadOptions options = HadoopReadOptions
107-
.builder(configuration, file)
108-
.withRange(split.getStart(), split.getStart() + split.getLength())
109-
.build();
110114
fileReader = new ParquetFileReader(
111115
HadoopInputFile.fromPath(file, configuration), options);
112116
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import java.util.List;
2525
import java.util.Set;
2626

27-
import org.apache.spark.SparkUnsupportedOperationException;
28-
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
2927
import scala.Option;
3028
import scala.jdk.javaapi.CollectionConverters;
3129

@@ -35,11 +33,15 @@
3533
import org.apache.parquet.column.ColumnDescriptor;
3634
import org.apache.parquet.column.page.PageReadStore;
3735
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
36+
import org.apache.parquet.hadoop.util.HadoopInputFile;
37+
import org.apache.parquet.io.SeekableInputStream;
3838
import org.apache.parquet.schema.GroupType;
3939
import org.apache.parquet.schema.MessageType;
4040
import org.apache.parquet.schema.Type;
4141

42+
import org.apache.spark.SparkUnsupportedOperationException;
4243
import org.apache.spark.memory.MemoryMode;
44+
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
4345
import org.apache.spark.sql.catalyst.InternalRow;
4446
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
4547
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector;
@@ -190,9 +192,11 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
190192
public void initialize(
191193
InputSplit inputSplit,
192194
TaskAttemptContext taskAttemptContext,
195+
Option<HadoopInputFile> inputFile,
196+
Option<SeekableInputStream> inputStream,
193197
Option<ParquetMetadata> fileFooter)
194198
throws IOException, InterruptedException, UnsupportedOperationException {
195-
super.initialize(inputSplit, taskAttemptContext, fileFooter);
199+
super.initialize(inputSplit, taskAttemptContext, inputFile, inputStream, fileFooter);
196200
initializeInternal();
197201
}
198202

0 commit comments

Comments
 (0)