Skip to content

Commit 4c3cf1c

Browse files
viiryagatorsmile
authored andcommitted
[SPARK-21721][SQL] Clear FileSystem deleteOnExit cache when paths are successfully removed
## What changes were proposed in this pull request? We put staging path to delete into the deleteOnExit cache of `FileSystem` in case of the path can't be successfully removed. But when we successfully remove the path, we don't remove it from the cache. We should do it to avoid continuing grow the cache size. ## How was this patch tested? Added a test. Author: Liang-Chi Hsieh <[email protected]> Closes apache#18934 from viirya/SPARK-21721.
1 parent 282f00b commit 4c3cf1c

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,13 @@ case class InsertIntoHiveTable(
423423
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
424424
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
425425
try {
426-
createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
426+
createdTempDir.foreach { path =>
427+
val fs = path.getFileSystem(hadoopConf)
428+
if (fs.delete(path, true)) {
429+
// If we successfully delete the staging directory, remove it from FileSystem's cache.
430+
fs.cancelDeleteOnExit(path)
431+
}
432+
}
427433
} catch {
428434
case NonFatal(e) =>
429435
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution
2020
import java.io.File
2121
import java.nio.charset.StandardCharsets
2222
import java.sql.{Date, Timestamp}
23-
import java.util.Locale
23+
import java.util.{Locale, Set}
2424

2525
import com.google.common.io.Files
26-
import org.apache.hadoop.fs.Path
26+
import org.apache.hadoop.fs.{FileSystem, Path}
2727

2828
import org.apache.spark.TestUtils
2929
import org.apache.spark.sql._
@@ -2021,4 +2021,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
20212021
checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4))
20222022
}
20232023
}
2024+
2025+
test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") {
2026+
withTable("test21721") {
2027+
val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit")
2028+
deleteOnExitField.setAccessible(true)
2029+
2030+
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
2031+
val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]]
2032+
2033+
val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
2034+
sql("CREATE TABLE test21721 (key INT, value STRING)")
2035+
val pathSizeToDeleteOnExit = setOfPath.size()
2036+
2037+
(0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1"))
2038+
2039+
assert(setOfPath.size() == pathSizeToDeleteOnExit)
2040+
}
2041+
}
20242042
}

0 commit comments

Comments
 (0)