Skip to content

Commit 78d546f

Browse files
HeartSaVioRzsxwing
authored andcommitted
[SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted
## What changes were proposed in this pull request? This patch proposes ManifestFileCommitProtocol to clean up incomplete output files in task level if task aborts. Please note that this works as 'best-effort', not kind of guarantee, as we have in HadoopMapReduceCommitProtocol. ## How was this patch tested? Added UT. Closes apache#24154 from HeartSaVioR/SPARK-27210. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 8efc5ec commit 78d546f

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
114114
}
115115

116116
override def abortTask(taskContext: TaskAttemptContext): Unit = {
117-
// Do nothing
118-
// TODO: we can also try delete the addedFiles as a best-effort cleanup.
117+
// best effort cleanup of incomplete files
118+
if (addedFiles.nonEmpty) {
119+
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
120+
addedFiles.foreach { file => fs.delete(new Path(file), false) }
121+
}
119122
}
120123
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package org.apache.spark.sql.streaming
1919

2020
import java.io.File
21+
import java.nio.file.Files
2122
import java.util.Locale
2223

24+
import scala.collection.JavaConverters._
25+
2326
import org.apache.hadoop.fs.Path
2427

2528
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
@@ -478,4 +481,30 @@ class FileStreamSinkSuite extends StreamTest {
478481
checkDatasetUnorderly(outputDf, 1, 2, 3)
479482
}
480483
}
484+
485+
testQuietly("cleanup incomplete output for aborted task") {
486+
withTempDir { tempDir =>
487+
val checkpointDir = new File(tempDir, "chk")
488+
val outputDir = new File(tempDir, "output")
489+
val inputData = MemoryStream[Int]
490+
inputData.addData(1, 2, 3)
491+
val q = inputData.toDS().map(_ / 0)
492+
.writeStream
493+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
494+
.format("parquet")
495+
.start(outputDir.getCanonicalPath)
496+
497+
intercept[StreamingQueryException] {
498+
try {
499+
q.processAllAvailable()
500+
} finally {
501+
q.stop()
502+
}
503+
}
504+
505+
val outputFiles = Files.walk(outputDir.toPath).iterator().asScala
506+
.filter(_.toString.endsWith(".parquet"))
507+
assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned up.")
508+
}
509+
}
481510
}

0 commit comments

Comments
 (0)