Skip to content

Commit b571ca5

Browse files
Meehauhntd187
authored andcommitted
fix(job-server): Fix for issue spark-jobserver#786 - Concurrent access to the jars w… (spark-jobserver#872)
* fix(job-server): Fix for issue spark-jobserver#786 - Concurrent access to the jars when cashing on the file system resulting in ClassNotFoundError * Modify/extend the FileCacher.cacheBinary(...) method to store the JAR/binary data into a temporary file first, and only after this completes, try to rename it to the expected full original name (with a new timestamp and so on). If the rename succeeds - everything is fine. If the rename fails (ex. there is already a file with the same target full filename), then it means that the other process was faster than us, and we're OK with this (but we have to delete our temporary file, its not needed anymore). * fix(job-server): Fix for issue spark-jobserver#786 - Concurrent access to the jars when cashing on the file system resulting in ClassNotFoundError * Modify/extend the FileCacher.cacheBinary(...) method to store the JAR/binary data into a temporary file first, and only after this completes, try to rename it to the expected full original name (with a new timestamp and so on). If the rename succeeds - everything is fine. If the rename fails (ex. there is already a file with the same target full filename), then it means that the other process was faster than us, and we're OK with this (but we have to delete our temporary file, its not needed anymore). * fix(job-server): Fix for issue spark-jobserver#786 - Concurrent access to the jars when cashing on the file system resulting in ClassNotFoundError * Modify/extend the FileCacher.cacheBinary(...) method to store the JAR/binary data into a temporary file first, and only after this completes, try to rename it to the expected full original name (with a new timestamp and so on). If the rename succeeds - everything is fine. If the rename fails (ex. there is already a file with the same target full filename), then it means that the other process was faster than us, and we're OK with this (but we have to delete our temporary file, its not needed anymore).
1 parent 5d525e0 commit b571ca5

File tree

1 file changed

+22
-7
lines changed

1 file changed

+22
-7
lines changed

job-server/src/main/scala/spark/jobserver/io/FileCacher.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,34 @@ trait FileCacher {
2929

3030
// Cache the jar file into local file system.
3131
protected def cacheBinary(appName: String,
32-
binaryType: BinaryType,
33-
uploadTime: DateTime,
34-
binBytes: Array[Byte]) {
35-
val outFile =
36-
new File(rootDir, createBinaryName(appName, binaryType, uploadTime))
37-
val bos = new BufferedOutputStream(new FileOutputStream(outFile))
32+
binaryType: BinaryType,
33+
uploadTime: DateTime,
34+
binBytes: Array[Byte]) {
35+
val targetFullBinaryName = createBinaryName(appName, binaryType, uploadTime)
36+
val tempSuffix = ".tmp"
37+
val tempOutFile = File.createTempFile(targetFullBinaryName + "-", tempSuffix, new File(rootDir))
38+
val tempOutFileName = tempOutFile.getName
39+
val bos = new BufferedOutputStream(new FileOutputStream(tempOutFile))
40+
3841
try {
39-
logger.debug("Writing {} bytes to file {}", binBytes.length, outFile.getPath)
42+
logger.debug("Writing {} bytes to a temporary file {}", binBytes.length, tempOutFile.getPath)
4043
bos.write(binBytes)
4144
bos.flush()
4245
} finally {
4346
bos.close()
4447
}
48+
49+
logger.debug("Renaming the temporary file {} to the target full binary name {}",
50+
tempOutFileName, targetFullBinaryName: Any)
51+
52+
val tempFile = new File(rootDir, tempOutFileName)
53+
if( ! tempFile.renameTo(new File(rootDir, targetFullBinaryName))) {
54+
logger.debug("Renaming the temporary file {} failed, another process has probably already updated " +
55+
"the target file - deleting the redundant temp file", tempOutFileName)
56+
if( ! tempFile.delete()) {
57+
logger.warn("Could not delete the temporary file {}", tempOutFileName)
58+
}
59+
}
4560
}
4661

4762
protected def cleanCacheBinaries(appName: String): Unit = {

0 commit comments

Comments
 (0)