Skip to content

Commit 2cabd23

Browse files
committed
Use file locking for safe concurrent downloads
1 parent d81fe0c commit 2cabd23

File tree

3 files changed

+203
-34
lines changed

3 files changed

+203
-34
lines changed

Sources/Hub/FileLock.swift

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import Foundation
2+
3+
/// A file-based lock for coordinating access to shared resources.
4+
///
5+
/// `FileLock` provides file locking using `flock(2)` to enable safe
6+
/// concurrent access to cache files from multiple processes.
7+
///
8+
/// ## Usage
9+
///
10+
/// ```swift
11+
/// let lock = FileLock(path: blobPath)
12+
/// try await lock.withLock {
13+
/// // Exclusive access to the resource
14+
/// try data.write(to: blobPath)
15+
/// }
16+
/// ```
17+
///
18+
/// The lock is automatically released when the closure completes or throws.
19+
///
20+
/// ## Lock File Location
21+
///
22+
/// The lock file is created alongside the target path with a `.lock` extension.
23+
/// For example, locking `/cache/blobs/abc123` creates `/cache/blobs/abc123.lock`.
24+
public struct FileLock: Sendable {
25+
/// The path to the lock file.
26+
public let lockPath: URL
27+
28+
/// Maximum number of lock acquisition attempts.
29+
public let maxRetries: Int
30+
31+
/// Delay between retry attempts in seconds.
32+
public let retryDelay: TimeInterval
33+
34+
/// Creates a file lock for the specified path.
35+
///
36+
/// - Parameters:
37+
/// - path: The path to the resource being protected.
38+
/// - maxRetries: Maximum number of lock acquisition attempts. Defaults to 5.
39+
/// - retryDelay: Delay between retry attempts in seconds. Defaults to 1.0.
40+
public init(path: URL, maxRetries: Int = 5, retryDelay: TimeInterval = 1.0) {
41+
self.lockPath = path.appendingPathExtension("lock")
42+
self.maxRetries = maxRetries
43+
self.retryDelay = retryDelay
44+
}
45+
46+
/// Executes the given closure while holding an exclusive lock.
47+
///
48+
/// This method acquires an exclusive lock on the lock file, executes the closure,
49+
/// and then releases the lock. If the lock cannot be acquired after the maximum
50+
/// number of retries, an error is thrown.
51+
///
52+
/// - Parameter body: The closure to execute while holding the lock.
53+
/// - Returns: The value returned by the closure.
54+
/// - Throws: `FileLockError.acquisitionFailed` if the lock cannot be acquired,
55+
/// or any error thrown by the closure.
56+
public func withLockSync<T>(_ body: () throws -> T) throws -> T {
57+
let handle = try acquireLock()
58+
defer { releaseLock(handle) }
59+
return try body()
60+
}
61+
62+
/// Executes the given async closure while holding an exclusive lock.
63+
///
64+
/// - Parameter body: The async closure to execute while holding the lock.
65+
/// - Returns: The value returned by the closure.
66+
/// - Throws: `FileLockError.acquisitionFailed` if the lock cannot be acquired,
67+
/// or any error thrown by the closure.
68+
public func withLock<T>(_ body: () async throws -> T) async throws -> T {
69+
let handle = try await acquireLockAsync()
70+
defer { releaseLock(handle) }
71+
return try await body()
72+
}
73+
74+
// MARK: -
75+
76+
private func prepareLockFile() throws -> FileHandle {
77+
try FileManager.default.createDirectory(
78+
at: lockPath.deletingLastPathComponent(),
79+
withIntermediateDirectories: true
80+
)
81+
82+
if !FileManager.default.fileExists(atPath: lockPath.path) {
83+
FileManager.default.createFile(atPath: lockPath.path, contents: nil)
84+
}
85+
86+
guard let handle = FileHandle(forWritingAtPath: lockPath.path) else {
87+
throw FileLockError.acquisitionFailed(lockPath)
88+
}
89+
90+
return handle
91+
}
92+
93+
private func tryLock(_ handle: FileHandle) -> Bool {
94+
flock(handle.fileDescriptor, LOCK_EX | LOCK_NB) == 0
95+
}
96+
97+
private func acquireLock() throws -> FileHandle {
98+
let handle = try prepareLockFile()
99+
100+
for attempt in 0...maxRetries {
101+
if tryLock(handle) { return handle }
102+
if attempt < maxRetries {
103+
Thread.sleep(forTimeInterval: retryDelay)
104+
}
105+
}
106+
107+
try? handle.close()
108+
throw FileLockError.acquisitionFailed(lockPath)
109+
}
110+
111+
private func acquireLockAsync() async throws -> FileHandle {
112+
let handle = try prepareLockFile()
113+
114+
for attempt in 0...maxRetries {
115+
if tryLock(handle) { return handle }
116+
if attempt < maxRetries {
117+
try await Task.sleep(for: .seconds(retryDelay))
118+
}
119+
}
120+
121+
try? handle.close()
122+
throw FileLockError.acquisitionFailed(lockPath)
123+
}
124+
125+
private func releaseLock(_ handle: FileHandle) {
126+
flock(handle.fileDescriptor, LOCK_UN)
127+
try? handle.close()
128+
// Clean up the lock file after releasing
129+
try? FileManager.default.removeItem(at: lockPath)
130+
}
131+
}
132+
133+
/// Errors that can occur during file locking operations.
134+
public enum FileLockError: Error, LocalizedError {
135+
/// The lock could not be acquired after the maximum number of retries.
136+
case acquisitionFailed(URL)
137+
138+
public var errorDescription: String? {
139+
switch self {
140+
case .acquisitionFailed(let path):
141+
return "Failed to acquire file lock at: \(path.path)"
142+
}
143+
}
144+
}

Sources/Hub/HubApi.swift

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -584,46 +584,58 @@ public extension HubApi {
584584
}
585585
}
586586

587-
// Otherwise, let's download the file!
588-
let incompleteDestination = repoMetadataDestination.appending(path: relativeFilename + ".\(remoteEtag).incomplete")
589-
try prepareCacheDestination(incompleteDestination)
590-
591-
let downloader = Downloader(to: destination, incompleteDestination: incompleteDestination, inBackground: backgroundSession)
592-
593-
try await withTaskCancellationHandler {
594-
let sub = await downloader.download(from: source, using: hfToken, expectedSize: remoteSize)
595-
listen: for await state in sub {
596-
switch state {
597-
case .notStarted:
598-
continue
599-
case let .downloading(progress, speed):
600-
progressHandler(progress, speed)
601-
case let .failed(error):
602-
throw error
603-
case .completed:
604-
break listen
605-
}
587+
// Otherwise, download the file
588+
// Use file lock to prevent concurrent downloads of the same file
589+
let lock = FileLock(path: destination)
590+
return try await lock.withLock {
591+
// Re-check if file exists with valid metadata after acquiring lock
592+
// (another process may have completed the download while we waited)
593+
if downloaded, let updatedMetadata = try hub.readDownloadMetadata(metadataPath: metadataDestination),
594+
updatedMetadata.etag == remoteEtag
595+
{
596+
return destination
606597
}
607-
} onCancel: {
608-
Task {
609-
await downloader.cancel()
598+
599+
let incompleteDestination = repoMetadataDestination.appending(path: relativeFilename + ".\(remoteEtag).incomplete")
600+
try prepareCacheDestination(incompleteDestination)
601+
602+
let downloader = Downloader(to: destination, incompleteDestination: incompleteDestination, inBackground: backgroundSession)
603+
604+
try await withTaskCancellationHandler {
605+
let sub = await downloader.download(from: source, using: hfToken, expectedSize: remoteSize)
606+
listen: for await state in sub {
607+
switch state {
608+
case .notStarted:
609+
continue
610+
case let .downloading(progress, speed):
611+
progressHandler(progress, speed)
612+
case let .failed(error):
613+
throw error
614+
case .completed:
615+
break listen
616+
}
617+
}
618+
} onCancel: {
619+
Task {
620+
await downloader.cancel()
621+
}
610622
}
611-
}
612623

613-
// Verify downloaded file integrity for LFS files (SHA256 etag)
614-
if hub.isValidHash(hash: remoteEtag, pattern: hub.sha256Pattern) {
615-
let fileHash = try hub.computeFileHash(file: destination)
616-
if fileHash != remoteEtag {
617-
try? FileManager.default.removeItem(at: destination)
618-
throw EnvironmentError.fileIntegrityError(
619-
"Downloaded file hash mismatch for \(destination.lastPathComponent): expected \(remoteEtag), got \(fileHash)"
620-
)
624+
// Verify downloaded file integrity for LFS files (SHA256 etag)
625+
if hub.isValidHash(hash: remoteEtag, pattern: hub.sha256Pattern) {
626+
let fileHash = try hub.computeFileHash(file: destination)
627+
if fileHash != remoteEtag {
628+
try? FileManager.default.removeItem(at: destination)
629+
throw EnvironmentError.fileIntegrityError(
630+
"Downloaded file hash mismatch for \(destination.lastPathComponent): expected \(remoteEtag), got \(fileHash)"
631+
)
632+
}
621633
}
622-
}
623634

624-
try hub.writeDownloadMetadata(commitHash: remoteCommitHash, etag: remoteEtag, metadataPath: metadataDestination)
635+
try hub.writeDownloadMetadata(commitHash: remoteCommitHash, etag: remoteEtag, metadataPath: metadataDestination)
625636

626-
return destination
637+
return destination
638+
}
627639
}
628640
}
629641

Tests/HubTests/HubTests.swift

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,4 +224,17 @@ class HubTests: XCTestCase {
224224
XCTFail("Unexpected error type: \(error)")
225225
}
226226
}
227+
228+
// MARK: - FileLock Tests
229+
230+
func testFileLockCleansUp() throws {
231+
let tempFile = FileManager.default.temporaryDirectory.appendingPathComponent("test-\(UUID())")
232+
let lock = FileLock(path: tempFile)
233+
234+
try lock.withLockSync {
235+
XCTAssertTrue(FileManager.default.fileExists(atPath: lock.lockPath.path))
236+
}
237+
238+
XCTAssertFalse(FileManager.default.fileExists(atPath: lock.lockPath.path))
239+
}
227240
}

0 commit comments

Comments
 (0)