Skip to content

Commit 93e4fbb

Browse files
pwoodyRobert Kruszewski
authored andcommitted
Fix parquet split calculation to avoid O(file*block) lookups (apache-spark-on-k8s#380)
1 parent 95646b5 commit 93e4fbb

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ import org.apache.spark.util.ThreadUtils
4040
abstract class ParquetFileSplitter {
4141
def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit])
4242

43-
def singleFileSplit(stat: FileStatus): Seq[FileSplit] = {
44-
Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty))
43+
def singleFileSplit(path: Path, length: Long): Seq[FileSplit] = {
44+
Seq(new FileSplit(path, 0, length, Array.empty))
4545
}
4646
}
4747

4848
object ParquetDefaultFileSplitter extends ParquetFileSplitter {
4949
override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = {
50-
stat => singleFileSplit(stat)
50+
stat => singleFileSplit(stat.getPath, stat.getLen)
5151
}
5252
}
5353

@@ -84,18 +84,20 @@ class ParquetMetadataFileSplitter(
8484
(applied, unapplied, filteredBlocks)
8585
}
8686

87+
// Group eligible splits by file Path.
8788
val eligible = applyParquetFilter(unapplied, filteredBlocks).map { bmd =>
8889
val blockPath = new Path(root, bmd.getPath)
8990
new FileSplit(blockPath, bmd.getStartingPos, bmd.getCompressedSize, Array.empty)
90-
}
91+
}.groupBy(_.getPath)
9192

9293
val statFilter: (FileStatus => Seq[FileSplit]) = { stat =>
93-
if (referencedFiles.contains(stat.getPath)) {
94-
eligible.filter(_.getPath == stat.getPath)
94+
val filePath = stat.getPath
95+
if (referencedFiles.contains(filePath)) {
96+
eligible.getOrElse(filePath, Nil)
9597
} else {
9698
log.warn(s"Found _metadata file for $root," +
97-
s" but no entries for blocks in ${stat.getPath}. Retaining whole file.")
98-
singleFileSplit(stat)
99+
s" but no entries for blocks in ${filePath}. Retaining whole file.")
100+
singleFileSplit(filePath, stat.getLen)
99101
}
100102
}
101103
statFilter

0 commit comments

Comments
 (0)