Skip to content

Commit 075ae1e

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-29537][SQL] throw exception when user defined a wrong base path
### What changes were proposed in this pull request? When user defined a base path which is not an ancestor directory for all the input paths, throw exception immediately. ### Why are the changes needed? Assuming that we have a DataFrame[c1, c2] be written out in parquet and partitioned by c1. When using `spark.read.parquet("/path/to/data/c1=1")` to read the data, we'll have a DataFrame with column c2 only. But if we use `spark.read.option("basePath", "/path/from").parquet("/path/to/data/c1=1")` to read the data, we'll have a DataFrame with column c1 and c2. This's happens because a wrong base path does not actually work in `parsePartition()`, so paring would continue until it reaches a directory without "=". And I think the result of the second read way doesn't make sense. ### Does this PR introduce any user-facing change? Yes, with this change, user would hit `IllegalArgumentException ` when given a wrong base path while previous behavior doesn't. ### How was this patch tested? Added UT. Closes apache#26195 from Ngone51/dev-wrong-basePath. Lead-authored-by: wuyi <[email protected]> Co-authored-by: wuyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4021354 commit 075ae1e

File tree

3 files changed

+44
-1
lines changed

3 files changed

+44
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,15 @@ abstract class PartitioningAwareFileIndex(
221221
if (!fs.isDirectory(userDefinedBasePath)) {
222222
throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory")
223223
}
224-
Set(fs.makeQualified(userDefinedBasePath))
224+
val qualifiedBasePath = fs.makeQualified(userDefinedBasePath)
225+
val qualifiedBasePathStr = qualifiedBasePath.toString
226+
rootPaths
227+
.find(!fs.makeQualified(_).toString.startsWith(qualifiedBasePathStr))
228+
.foreach { rp =>
229+
throw new IllegalArgumentException(
230+
s"Wrong basePath $userDefinedBasePath for the root path: $rp")
231+
}
232+
Set(qualifiedBasePath)
225233

226234
case None =>
227235
rootPaths.map { path =>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,26 @@ class FileIndexSuite extends SharedSparkSession {
352352
"driver side must not be negative"))
353353
}
354354

355+
test ("SPARK-29537: throw exception when user defined a wrong base path") {
356+
withTempDir { dir =>
357+
val partitionDirectory = new File(dir, "a=foo")
358+
partitionDirectory.mkdir()
359+
val file = new File(partitionDirectory, "text.txt")
360+
stringToFile(file, "text")
361+
val path = new Path(dir.getCanonicalPath)
362+
val wrongBasePath = new File(dir, "unknown")
363+
// basePath must be a directory
364+
wrongBasePath.mkdir()
365+
val parameters = Map("basePath" -> wrongBasePath.getCanonicalPath)
366+
val fileIndex = new InMemoryFileIndex(spark, Seq(path), parameters, None)
367+
val msg = intercept[IllegalArgumentException] {
368+
// trigger inferPartitioning()
369+
fileIndex.partitionSpec()
370+
}.getMessage
371+
assert(msg === s"Wrong basePath ${wrongBasePath.getCanonicalPath} for the root path: $path")
372+
}
373+
}
374+
355375
test("refresh for InMemoryFileIndex with FileStatusCache") {
356376
withTempDir { dir =>
357377
val fileStatusCache = FileStatusCache.getOrCreate(spark)

sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,21 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
234234
assert(DataSourceUtils.decodePartitioningColumns(partColumns) === Seq("col1", "col2"))
235235
}
236236

237+
test ("SPARK-29537: throw exception when user defined a wrong base path") {
238+
withTempPath { p =>
239+
val path = new Path(p.toURI).toString
240+
Seq((1, 1), (2, 2)).toDF("c1", "c2")
241+
.write.partitionBy("c1").mode(SaveMode.Overwrite).parquet(path)
242+
val wrongBasePath = new File(p, "unknown")
243+
// basePath must be a directory
244+
wrongBasePath.mkdir()
245+
val msg = intercept[IllegalArgumentException] {
246+
spark.read.option("basePath", wrongBasePath.getCanonicalPath).parquet(path)
247+
}.getMessage
248+
assert(msg === s"Wrong basePath ${wrongBasePath.getCanonicalPath} for the root path: $path")
249+
}
250+
}
251+
237252
test("save mode") {
238253
spark.range(10).write
239254
.format("org.apache.spark.sql.test")

0 commit comments

Comments
 (0)