Skip to content

Commit 282f00b

Browse files
zsxwingtdas
authored andcommitted
[SPARK-21696][SS] Fix a potential issue that may generate partial snapshot files
## What changes were proposed in this pull request? Directly writing a snapshot file may generate a partial file. This PR changes it to write to a temp file then rename to the target file. ## How was this patch tested? Jenkins. Author: Shixiong Zhu <[email protected]> Closes apache#18928 from zsxwing/SPARK-21696.
1 parent fbc2692 commit 282f00b

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
386386

387387
private def writeSnapshotFile(version: Long, map: MapType): Unit = {
388388
val fileToWrite = snapshotFile(version)
389+
val tempFile =
390+
new Path(fileToWrite.getParent, s"${fileToWrite.getName}.temp-${Random.nextLong}")
389391
var output: DataOutputStream = null
390392
Utils.tryWithSafeFinally {
391-
output = compressStream(fs.create(fileToWrite, false))
393+
output = compressStream(fs.create(tempFile, false))
392394
val iter = map.entrySet().iterator()
393395
while(iter.hasNext) {
394396
val entry = iter.next()
@@ -403,6 +405,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
403405
} {
404406
if (output != null) output.close()
405407
}
408+
if (fs.exists(fileToWrite)) {
409+
// Skip rename if the file is alreayd created.
410+
fs.delete(tempFile, true)
411+
} else if (!fs.rename(tempFile, fileToWrite)) {
412+
throw new IOException(s"Failed to rename $tempFile to $fileToWrite")
413+
}
406414
logInfo(s"Written snapshot file for version $version of $this at $fileToWrite")
407415
}
408416

0 commit comments

Comments
 (0)