Skip to content

Commit d83f84a

Browse files
JoshRosenjoshrosen-stripe
authored andcommitted
[SPARK-27676][SQL][SS] InMemoryFileIndex should respect spark.sql.files.ignoreMissingFiles
## What changes were proposed in this pull request? Spark's `InMemoryFileIndex` contains two places where `FileNotFound` exceptions are caught and logged as warnings (during [directory listing](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274) and [block location lookup](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L333)). This logic was added in apache#15153 and apache#21408. I think that this is a dangerous default behavior because it can mask bugs caused by race conditions (e.g. overwriting a table while it's being read) or S3 consistency issues (there's more discussion on this in the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-27676)). Failing fast when we detect missing files is not sufficient to make concurrent table reads/writes or S3 listing safe (there are other classes of eventual consistency issues to worry about), but I think it's still beneficial to throw exceptions and fail-fast on the subset of inconsistencies / races that we _can_ detect because that increases the likelihood that an end user will notice the problem and investigate further. There may be some cases where users _do_ want to ignore missing files, but I think that should be an opt-in behavior via the existing `spark.sql.files.ignoreMissingFiles` flag (the current behavior is itself race-prone because a file might be be deleted between catalog listing and query execution time, triggering FileNotFoundExceptions on executors (which are handled in a way that _does_ respect `ignoreMissingFIles`)). This PR updates `InMemoryFileIndex` to guard the log-and-ignore-FileNotFoundException behind the existing `spark.sql.files.ignoreMissingFiles` flag. **Note**: this is a change of default behavior, so I think it needs to be mentioned in release notes. ## How was this patch tested? New unit tests to simulate file-deletion race conditions, tested with both values of the `ignoreMissingFIles` flag. Closes apache#24668 from JoshRosen/SPARK-27676. Lead-authored-by: Josh Rosen <[email protected]> Co-authored-by: Josh Rosen <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 38263f6 commit d83f84a

File tree

4 files changed

+188
-13
lines changed

4 files changed

+188
-13
lines changed

docs/sql-migration-guide-upgrade.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ license: |
145145

146146
- Since Spark 3.0, a higher-order function `exists` follows the three-valued boolean logic, i.e., if the `predicate` returns any `null`s and no `true` is obtained, then `exists` will return `null` instead of `false`. For example, `exists(array(1, null, 3), x -> x % 2 == 0)` will be `null`. The previous behaviour can be restored by setting `spark.sql.legacy.arrayExistsFollowsThreeValuedLogic` to `false`.
147147

148+
- Since Spark 3.0, if files or subdirectories disappear during recursive directory listing (i.e. they appear in an intermediate listing but then cannot be read or listed during later phases of the recursive directory listing, due to either concurrent file deletions or object store consistency issues) then the listing will fail with an exception unless `spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous versions, these missing files or subdirectories would be ignored. Note that this change of behavior only applies during initial table file listing (or during `REFRESH TABLE`), not during query execution: the net change is that `spark.sql.files.ignoreMissingFiles` is now obeyed during table file listing / query planning, not only at query execution time.
149+
148150
## Upgrading from Spark SQL 2.4 to 2.4.1
149151

150152
- The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ object CommandUtils extends Logging {
6767
override def accept(path: Path): Boolean = isDataPath(path, stagingDir)
6868
}
6969
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
70-
paths, sessionState.newHadoopConf(), pathFilter, spark)
70+
paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths = true)
7171
fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
7272
} else {
7373
partitions.map { p =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class InMemoryFileIndex(
125125
}
126126
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
127127
val discovered = InMemoryFileIndex.bulkListLeafFiles(
128-
pathsToFetch, hadoopConf, filter, sparkSession)
128+
pathsToFetch, hadoopConf, filter, sparkSession, areRootPaths = true)
129129
discovered.foreach { case (path, leafFiles) =>
130130
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
131131
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -167,12 +167,22 @@ object InMemoryFileIndex extends Logging {
167167
paths: Seq[Path],
168168
hadoopConf: Configuration,
169169
filter: PathFilter,
170-
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
170+
sparkSession: SparkSession,
171+
areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = {
172+
173+
val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles
171174

172175
// Short-circuits parallel listing when serial listing is likely to be faster.
173176
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
174177
return paths.map { path =>
175-
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
178+
val leafFiles = listLeafFiles(
179+
path,
180+
hadoopConf,
181+
filter,
182+
Some(sparkSession),
183+
ignoreMissingFiles = ignoreMissingFiles,
184+
isRootPath = areRootPaths)
185+
(path, leafFiles)
176186
}
177187
}
178188

@@ -205,7 +215,14 @@ object InMemoryFileIndex extends Logging {
205215
.mapPartitions { pathStrings =>
206216
val hadoopConf = serializableConfiguration.value
207217
pathStrings.map(new Path(_)).toSeq.map { path =>
208-
(path, listLeafFiles(path, hadoopConf, filter, None))
218+
val leafFiles = listLeafFiles(
219+
path,
220+
hadoopConf,
221+
filter,
222+
None,
223+
ignoreMissingFiles = ignoreMissingFiles,
224+
isRootPath = areRootPaths)
225+
(path, leafFiles)
209226
}.iterator
210227
}.map { case (path, statuses) =>
211228
val serializableStatuses = statuses.map { status =>
@@ -268,11 +285,12 @@ object InMemoryFileIndex extends Logging {
268285
path: Path,
269286
hadoopConf: Configuration,
270287
filter: PathFilter,
271-
sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
288+
sessionOpt: Option[SparkSession],
289+
ignoreMissingFiles: Boolean,
290+
isRootPath: Boolean): Seq[FileStatus] = {
272291
logTrace(s"Listing $path")
273292
val fs = path.getFileSystem(hadoopConf)
274293

275-
// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
276294
// Note that statuses only include FileStatus for the files and dirs directly under path,
277295
// and does not include anything else recursively.
278296
val statuses: Array[FileStatus] = try {
@@ -290,7 +308,26 @@ object InMemoryFileIndex extends Logging {
290308
case _ => fs.listStatus(path)
291309
}
292310
} catch {
293-
case _: FileNotFoundException =>
311+
// If we are listing a root path (e.g. a top level directory of a table), we need to
312+
// ignore FileNotFoundExceptions during this root level of the listing because
313+
//
314+
// (a) certain code paths might construct an InMemoryFileIndex with root paths that
315+
// might not exist (i.e. not all callers are guaranteed to have checked
316+
// path existence prior to constructing InMemoryFileIndex) and,
317+
// (b) we need to ignore deleted root paths during REFRESH TABLE, otherwise we break
318+
// existing behavior and break the ability drop SessionCatalog tables when tables'
319+
// root directories have been deleted (which breaks a number of Spark's own tests).
320+
//
321+
// If we are NOT listing a root path then a FileNotFoundException here means that the
322+
// directory was present in a previous level of file listing but is absent in this
323+
// listing, likely indicating a race condition (e.g. concurrent table overwrite or S3
324+
// list inconsistency).
325+
//
326+
// The trade-off in supporting existing behaviors / use-cases is that we won't be
327+
// able to detect race conditions involving root paths being deleted during
328+
// InMemoryFileIndex construction. However, it's still a net improvement to detect and
329+
// fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion.
330+
case _: FileNotFoundException if isRootPath || ignoreMissingFiles =>
294331
logWarning(s"The directory $path was not found. Was it deleted very recently?")
295332
Array.empty[FileStatus]
296333
}
@@ -301,9 +338,23 @@ object InMemoryFileIndex extends Logging {
301338
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
302339
val nestedFiles: Seq[FileStatus] = sessionOpt match {
303340
case Some(session) =>
304-
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
341+
bulkListLeafFiles(
342+
dirs.map(_.getPath),
343+
hadoopConf,
344+
filter,
345+
session,
346+
areRootPaths = false
347+
).flatMap(_._2)
305348
case _ =>
306-
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
349+
dirs.flatMap { dir =>
350+
listLeafFiles(
351+
dir.getPath,
352+
hadoopConf,
353+
filter,
354+
sessionOpt,
355+
ignoreMissingFiles = ignoreMissingFiles,
356+
isRootPath = false)
357+
}
307358
}
308359
val allFiles = topLevelFiles ++ nestedFiles
309360
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
@@ -345,7 +396,7 @@ object InMemoryFileIndex extends Logging {
345396
}
346397
Some(lfs)
347398
} catch {
348-
case _: FileNotFoundException =>
399+
case _: FileNotFoundException if ignoreMissingFiles =>
349400
missingFiles += f.getPath.toString
350401
None
351402
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import java.io.File
20+
import java.io.{File, FileNotFoundException}
2121
import java.net.URI
2222

2323
import scala.collection.mutable
2424

2525
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem}
2626

27+
import org.apache.spark.SparkException
2728
import org.apache.spark.metrics.source.HiveCatalogMetrics
2829
import org.apache.spark.sql.SparkSession
2930
import org.apache.spark.sql.catalyst.util._
@@ -167,7 +168,7 @@ class FileIndexSuite extends SharedSQLContext {
167168
}
168169
}
169170

170-
test("InMemoryFileIndex: folders that don't exist don't throw exceptions") {
171+
test("InMemoryFileIndex: root folders that don't exist don't throw exceptions") {
171172
withTempDir { dir =>
172173
val deletedFolder = new File(dir, "deleted")
173174
assert(!deletedFolder.exists())
@@ -178,6 +179,67 @@ class FileIndexSuite extends SharedSQLContext {
178179
}
179180
}
180181

182+
test("SPARK-27676: InMemoryFileIndex respects ignoreMissingFiles config for non-root paths") {
183+
import DeletionRaceFileSystem._
184+
for (
185+
raceCondition <- Seq(
186+
classOf[SubdirectoryDeletionRaceFileSystem],
187+
classOf[FileDeletionRaceFileSystem]
188+
);
189+
ignoreMissingFiles <- Seq(true, false);
190+
parDiscoveryThreshold <- Seq(0, 100)
191+
) {
192+
withClue(s"raceCondition=$raceCondition, ignoreMissingFiles=$ignoreMissingFiles, " +
193+
s"parDiscoveryThreshold=$parDiscoveryThreshold"
194+
) {
195+
withSQLConf(
196+
SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissingFiles.toString,
197+
SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> parDiscoveryThreshold.toString,
198+
"fs.mockFs.impl" -> raceCondition.getName,
199+
"fs.mockFs.impl.disable.cache" -> "true"
200+
) {
201+
def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex(
202+
spark, Seq(rootDirPath), Map.empty, None)
203+
if (ignoreMissingFiles) {
204+
// We're ignoring missing files, so catalog construction should succeed
205+
val catalog = makeCatalog()
206+
val leafFiles = catalog.listLeafFiles(catalog.rootPaths)
207+
if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) {
208+
// The only subdirectory was missing, so there should be no leaf files:
209+
assert(leafFiles.isEmpty)
210+
} else {
211+
assert(raceCondition == classOf[FileDeletionRaceFileSystem])
212+
// One of the two leaf files was missing, but we should still list the other:
213+
assert(leafFiles.size == 1)
214+
assert(leafFiles.head.getPath == nonDeletedLeafFilePath)
215+
}
216+
} else {
217+
// We're NOT ignoring missing files, so catalog construction should fail
218+
val e = intercept[Exception] {
219+
makeCatalog()
220+
}
221+
// The exact exception depends on whether we're using parallel listing
222+
if (parDiscoveryThreshold == 0) {
223+
// The FileNotFoundException occurs in a Spark executor (as part of a job)
224+
assert(e.isInstanceOf[SparkException])
225+
assert(e.getMessage.contains("FileNotFoundException"))
226+
} else {
227+
// The FileNotFoundException occurs directly on the driver
228+
assert(e.isInstanceOf[FileNotFoundException])
229+
// Test that the FileNotFoundException is triggered for the expected reason:
230+
if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) {
231+
assert(e.getMessage.contains(subDirPath.toString))
232+
} else {
233+
assert(raceCondition == classOf[FileDeletionRaceFileSystem])
234+
assert(e.getMessage.contains(leafFilePath.toString))
235+
}
236+
}
237+
}
238+
}
239+
}
240+
}
241+
}
242+
181243
test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
182244
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
183245
withTempDir { dir =>
@@ -356,6 +418,66 @@ class FileIndexSuite extends SharedSQLContext {
356418

357419
}
358420

421+
object DeletionRaceFileSystem {
422+
val rootDirPath: Path = new Path("mockFs:///rootDir/")
423+
val subDirPath: Path = new Path(rootDirPath, "subDir")
424+
val leafFilePath: Path = new Path(subDirPath, "leafFile")
425+
val nonDeletedLeafFilePath: Path = new Path(subDirPath, "nonDeletedLeafFile")
426+
val rootListing: Array[FileStatus] =
427+
Array(new FileStatus(0, true, 0, 0, 0, subDirPath))
428+
val subFolderListing: Array[FileStatus] =
429+
Array(
430+
new FileStatus(0, false, 0, 100, 0, leafFilePath),
431+
new FileStatus(0, false, 0, 100, 0, nonDeletedLeafFilePath))
432+
}
433+
434+
// Used in SPARK-27676 test to simulate a race where a subdirectory is deleted
435+
// between back-to-back listing calls.
436+
class SubdirectoryDeletionRaceFileSystem extends RawLocalFileSystem {
437+
import DeletionRaceFileSystem._
438+
439+
override def getScheme: String = "mockFs"
440+
441+
override def listStatus(path: Path): Array[FileStatus] = {
442+
if (path == rootDirPath) {
443+
rootListing
444+
} else if (path == subDirPath) {
445+
throw new FileNotFoundException(subDirPath.toString)
446+
} else {
447+
throw new IllegalArgumentException()
448+
}
449+
}
450+
}
451+
452+
// Used in SPARK-27676 test to simulate a race where a file is deleted between
453+
// being listed and having its size / file status checked.
454+
class FileDeletionRaceFileSystem extends RawLocalFileSystem {
455+
import DeletionRaceFileSystem._
456+
457+
override def getScheme: String = "mockFs"
458+
459+
override def listStatus(path: Path): Array[FileStatus] = {
460+
if (path == rootDirPath) {
461+
rootListing
462+
} else if (path == subDirPath) {
463+
subFolderListing
464+
} else {
465+
throw new IllegalArgumentException()
466+
}
467+
}
468+
469+
override def getFileBlockLocations(
470+
file: FileStatus,
471+
start: Long,
472+
len: Long): Array[BlockLocation] = {
473+
if (file.getPath == leafFilePath) {
474+
throw new FileNotFoundException(leafFilePath.toString)
475+
} else {
476+
Array.empty
477+
}
478+
}
479+
}
480+
359481
class FakeParentPathFileSystem extends RawLocalFileSystem {
360482
override def getScheme: String = "mockFs"
361483

0 commit comments

Comments
 (0)