Skip to content

Commit b276788

Browse files
WeichenXu123cloud-fan
authored andcommitted
[SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource
## What changes were proposed in this pull request? Provide a way to recursively load data from datasource. I add a "recursiveFileLookup" option. When "recursiveFileLookup" option turn on, then partition inferring is turned off and all files from the directory will be loaded recursively. If some datasource explicitly specify the partitionSpec, then if user turn on "recursive" option, then exception will be thrown. ## How was this patch tested? Unit tests. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes apache#24830 from WeichenXu123/recursive_ds. Authored-by: WeichenXu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent ec032ce commit b276788

File tree

3 files changed

+101
-18
lines changed

3 files changed

+101
-18
lines changed

mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {
3030

3131
// Single column of images named "image"
3232
private lazy val imagePath = "../data/mllib/images/partitioned"
33+
private lazy val recursiveImagePath = "../data/mllib/images"
3334

3435
test("image datasource count test") {
3536
val df1 = spark.read.format("image").load(imagePath)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ abstract class PartitioningAwareFileIndex(
6262
pathGlobFilter.forall(_.accept(file.getPath))
6363
}
6464

65+
protected lazy val recursiveFileLookup = {
66+
parameters.getOrElse("recursiveFileLookup", "false").toBoolean
67+
}
68+
6569
override def listFiles(
6670
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
6771
def isNonEmptyFile(f: FileStatus): Boolean = {
@@ -70,6 +74,10 @@ abstract class PartitioningAwareFileIndex(
7074
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
7175
PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
7276
} else {
77+
if (recursiveFileLookup) {
78+
throw new IllegalArgumentException(
79+
"Datasource with partition do not allow recursive file loading.")
80+
}
7381
prunePartitions(partitionFilters, partitionSpec()).map {
7482
case PartitionPath(values, path) =>
7583
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
@@ -95,7 +103,7 @@ abstract class PartitioningAwareFileIndex(
95103
override def sizeInBytes: Long = allFiles().map(_.getLen).sum
96104

97105
def allFiles(): Seq[FileStatus] = {
98-
val files = if (partitionSpec().partitionColumns.isEmpty) {
106+
val files = if (partitionSpec().partitionColumns.isEmpty && !recursiveFileLookup) {
99107
// For each of the root input paths, get the list of files inside them
100108
rootPaths.flatMap { path =>
101109
// Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles).
@@ -128,23 +136,27 @@ abstract class PartitioningAwareFileIndex(
128136
}
129137

130138
protected def inferPartitioning(): PartitionSpec = {
131-
// We use leaf dirs containing data files to discover the schema.
132-
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
133-
files.exists(f => isDataPath(f.getPath))
134-
}.keys.toSeq
135-
136-
val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
137-
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
138-
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
139-
140-
PartitioningUtils.parsePartitions(
141-
leafDirs,
142-
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
143-
basePaths = basePaths,
144-
userSpecifiedSchema = userSpecifiedSchema,
145-
caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
146-
validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns,
147-
timeZoneId = timeZoneId)
139+
if (recursiveFileLookup) {
140+
PartitionSpec.emptySpec
141+
} else {
142+
// We use leaf dirs containing data files to discover the schema.
143+
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
144+
files.exists(f => isDataPath(f.getPath))
145+
}.keys.toSeq
146+
147+
val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
148+
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
149+
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
150+
151+
PartitioningUtils.parsePartitions(
152+
leafDirs,
153+
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
154+
basePaths = basePaths,
155+
userSpecifiedSchema = userSpecifiedSchema,
156+
caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
157+
validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns,
158+
timeZoneId = timeZoneId)
159+
}
148160
}
149161

150162
private def prunePartitions(

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql
1919

2020
import java.io.{File, FilenameFilter, FileNotFoundException}
21+
import java.nio.file.{Files, StandardOpenOption}
2122
import java.util.Locale
2223

2324
import scala.collection.mutable
@@ -572,6 +573,75 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
572573
}
573574
}
574575

576+
test("Option recursiveFileLookup: recursive loading correctly") {
577+
578+
val expectedFileList = mutable.ListBuffer[String]()
579+
580+
def createFile(dir: File, fileName: String, format: String): Unit = {
581+
val path = new File(dir, s"${fileName}.${format}")
582+
Files.write(
583+
path.toPath,
584+
s"content of ${path.toString}".getBytes,
585+
StandardOpenOption.CREATE, StandardOpenOption.WRITE
586+
)
587+
val fsPath = new Path(path.getAbsoluteFile.toURI).toString
588+
expectedFileList.append(fsPath)
589+
}
590+
591+
def createDir(path: File, dirName: String, level: Int): Unit = {
592+
val dir = new File(path, s"dir${dirName}-${level}")
593+
dir.mkdir()
594+
createFile(dir, s"file${level}", "bin")
595+
createFile(dir, s"file${level}", "text")
596+
597+
if (level < 4) {
598+
// create sub-dir
599+
createDir(dir, "sub0", level + 1)
600+
createDir(dir, "sub1", level + 1)
601+
}
602+
}
603+
604+
withTempPath { path =>
605+
path.mkdir()
606+
createDir(path, "root", 0)
607+
608+
val dataPath = new File(path, "dirroot-0").getAbsolutePath
609+
val fileList = spark.read.format("binaryFile")
610+
.option("recursiveFileLookup", true)
611+
.load(dataPath)
612+
.select("path").collect().map(_.getString(0))
613+
614+
assert(fileList.toSet === expectedFileList.toSet)
615+
616+
val fileList2 = spark.read.format("binaryFile")
617+
.option("recursiveFileLookup", true)
618+
.option("pathGlobFilter", "*.bin")
619+
.load(dataPath)
620+
.select("path").collect().map(_.getString(0))
621+
622+
assert(fileList2.toSet === expectedFileList.filter(_.endsWith(".bin")).toSet)
623+
}
624+
}
625+
626+
test("Option recursiveFileLookup: disable partition inferring") {
627+
val dataPath = Thread.currentThread().getContextClassLoader
628+
.getResource("test-data/text-partitioned").toString
629+
630+
val df = spark.read.format("binaryFile")
631+
.option("recursiveFileLookup", true)
632+
.load(dataPath)
633+
634+
assert(!df.columns.contains("year"), "Expect partition inferring disabled")
635+
val fileList = df.select("path").collect().map(_.getString(0))
636+
637+
val expectedFileList = Array(
638+
dataPath + "/year=2014/data.txt",
639+
dataPath + "/year=2015/data.txt"
640+
).map(path => new Path(path).toString)
641+
642+
assert(fileList.toSet === expectedFileList.toSet)
643+
}
644+
575645
test("Return correct results when data columns overlap with partition columns") {
576646
Seq("parquet", "orc", "json").foreach { format =>
577647
withTempPath { path =>

0 commit comments

Comments
 (0)