Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 23 additions & 28 deletions Sources/Sharing/Internal/PersistentReferences.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ final class PersistentReferences: @unchecked Sendable, DependencyKey {
static var liveValue: PersistentReferences { PersistentReferences() }
static var testValue: PersistentReferences { PersistentReferences() }

struct Pair<Key: SharedReaderKey> {
var cachedValue: Key.Value
var reference: _PersistentReference<Key>?
struct Weak<Key: SharedReaderKey> {
weak var reference: _PersistentReference<Key>?
}

private var storage: [AnyHashable: Any] = [:]
Expand All @@ -18,37 +17,33 @@ final class PersistentReferences: @unchecked Sendable, DependencyKey {
forKey key: Key,
default value: @autoclosure () throws -> Key.Value,
skipInitialLoad: Bool
) rethrows -> _ManagedReference<Key> {
try lock.withLock {
guard var pair = storage[key.id] as? Pair<Key> else {
let value = try value()
let persistentReference = _PersistentReference(
key: key,
value: value,
skipInitialLoad: skipInitialLoad
)
storage[key.id] = Pair(cachedValue: value, reference: persistentReference)
return _ManagedReference(persistentReference)
) rethrows -> _PersistentReference<Key> {
if let reference = lock.withLock({ (storage[key.id] as? Weak<Key>)?.reference }) {
return reference
} else {
let value = try value()
let reference = _PersistentReference(
key: key,
value: value,
skipInitialLoad: skipInitialLoad
)
return lock.withLock {
if let reference = (storage[key.id] as? Weak<Key>)?.reference {
return reference
} else {
storage[key.id] = Weak(reference: reference)
reference.onDeinit = { [self] in
removeReference(forKey: key)
}
return reference
}
}
guard let persistentReference = pair.reference else {
let persistentReference = _PersistentReference(
key: key,
value: skipInitialLoad ? (try? value()) ?? pair.cachedValue : pair.cachedValue,
skipInitialLoad: skipInitialLoad
)
pair.reference = persistentReference
storage[key.id] = pair
return _ManagedReference(persistentReference)
}
return _ManagedReference(persistentReference)
}
}

func removeReference<Key: SharedReaderKey>(forKey key: Key) {
lock.withLock {
guard var pair = storage[key.id] as? Pair<Key> else { return }
pair.reference = nil
storage[key.id] = pair
_ = storage.removeValue(forKey: key.id)
}
}
}
99 changes: 5 additions & 94 deletions Sources/Sharing/Internal/Reference.swift
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ final class _PersistentReference<Key: SharedReaderKey>:
private var _isLoading = false
private var _loadError: (any Error)?
private var _saveError: (any Error)?
private var _referenceCount = 0
private var subscription: SharedSubscription?
internal var onDeinit: (() -> Void)?

init(key: Key, value initialValue: Key.Value, skipInitialLoad: Bool) {
self.key = key
Expand Down Expand Up @@ -231,6 +231,10 @@ final class _PersistentReference<Key: SharedReaderKey>:
)
}

deinit {
onDeinit?()
}

var id: ObjectIdentifier { ObjectIdentifier(self) }

var isLoading: Bool {
Expand Down Expand Up @@ -309,20 +313,6 @@ final class _PersistentReference<Key: SharedReaderKey>:
withMutation(keyPath: \._saveError) {}
}

func retain() {
lock.withLock { _referenceCount += 1 }
}

func release() {
let shouldRelease = lock.withLock {
_referenceCount -= 1
return _referenceCount <= 0
}
guard shouldRelease else { return }
@Dependency(PersistentReferences.self) var persistentReferences
persistentReferences.removeReference(forKey: key)
}

func access<Member>(
keyPath: KeyPath<_PersistentReference, Member>,
fileID: StaticString = #fileID,
Expand Down Expand Up @@ -454,85 +444,6 @@ extension _PersistentReference: MutableReference, Equatable where Key: SharedKey
}
}

final class _ManagedReference<Key: SharedReaderKey>: Reference, Observable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pyrtsa Thanks for the PR and for looking into this! It's been a little while since we worked on this, but I believe our worry at the time was that two shared references could compete and initialize the same persistent reference at the same time, and that they would hold unique references while only a single one raced to live in the global persistent references store.

When looking into this did you conclude that this is not possible due to the existing locking?

I also wonder if it's possible to cook up a test case that deadlocks on main and is fixed in this branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote it so that while two or more threads can indeed start setting up the shared reference, only one of them will succeed (while holding the lock another time), and all others will simply drop their half-initialised reference and grab the one already there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to think a bit on the failing/fixing unit test case, maybe it's possible with some artificial delays. 🤔

Copy link
Contributor Author

@pyrtsa pyrtsa May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a unit test now.

I tried to make the CI run the commit with just the failing test on top of main, but GHA got stuck somehow. (Edit: Maybe the GHA macOS runner breaks down with this test; see #157?)

Anyway, you can check out d188bfb and run tests manually to see that it indeed deadlocks.

private let base: _PersistentReference<Key>

init(_ base: _PersistentReference<Key>) {
base.retain()
self.base = base
}

deinit {
base.release()
}

var id: ObjectIdentifier {
base.id
}

var isLoading: Bool {
base.isLoading
}

var loadError: (any Error)? {
base.loadError
}

var wrappedValue: Key.Value {
base.wrappedValue
}

func load() async throws {
try await base.load()
}

func touch() {
base.touch()
}

#if canImport(Combine)
var publisher: any Publisher<Key.Value, Never> {
base.publisher
}
#endif

var description: String {
base.description
}
}

extension _ManagedReference: MutableReference, Equatable where Key: SharedKey {
var saveError: (any Error)? {
base.saveError
}

var snapshot: Key.Value? {
base.snapshot
}

func takeSnapshot(
_ value: Key.Value,
fileID: StaticString,
filePath: StaticString,
line: UInt,
column: UInt
) {
base.takeSnapshot(value, fileID: fileID, filePath: filePath, line: line, column: column)
}

func withLock<R>(_ body: (inout Key.Value) throws -> R) rethrows -> R {
try base.withLock(body)
}

func save() async throws {
try await base.save()
}

static func == (lhs: _ManagedReference, rhs: _ManagedReference) -> Bool {
lhs.base == rhs.base
}
}

final class _AppendKeyPathReference<
Base: Reference, Value, Path: KeyPath<Base.Value, Value> & Sendable
>: Reference, Observable {
Expand Down
19 changes: 7 additions & 12 deletions Sources/Sharing/SharedContinuations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ public struct SaveContinuation: Sendable {
private final class ContinuationBox<Value>: Sendable {
private let callback: Mutex<(@Sendable (Result<Value?, any Error>) -> Void)?>
private let description: @Sendable () -> String
private let resumeCount = Mutex(0)

init(
callback: @escaping @Sendable (Result<Value?, any Error>) -> Void,
Expand All @@ -221,34 +220,30 @@ private final class ContinuationBox<Value>: Sendable {
}

deinit {
let isComplete = resumeCount.withLock { $0 } > 0
if !isComplete {
if let callback = callback.withLock({ $0 }) {
reportIssue(
"""
\(description()) leaked its continuation without one of its resume methods being \
invoked. This will cause tasks waiting on it to resume immediately.
"""
)
callback.withLock { $0?(.success(nil)) }
callback(.success(nil))
}
}

func resume(with result: Result<Value?, any Error>) {
let resumeCount = resumeCount.withLock {
$0 += 1
return $0
let callback = callback.withLock { callback in
defer { callback = nil }
return callback
}
guard resumeCount == 1 else {
guard let callback else {
reportIssue(
"""
\(description()) tried to resume its continuation more than once.
"""
)
return
}
callback.withLock { callback in
defer { callback = nil }
callback?(result)
}
callback(result)
}
}
40 changes: 40 additions & 0 deletions Tests/SharingTests/SharedTests.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Dependencies
import Foundation
import IdentifiedCollections
import PerceptionCore
Expand Down Expand Up @@ -35,6 +36,45 @@ import Testing
let a = A()
#expect(a.b.c == C())
}

@Test func lockingOrderWithDependencies() async {
struct D: TestDependencyKey {
@Shared(.inMemory("count")) var count = 0
init() {
Thread.sleep(forTimeInterval: 0.2)
$count.withLock { $0 += 1 }
}
static var testValue: D { D() }
}
let a = Task {
do {
try await Task.sleep(nanoseconds: 100_000_000)
@Dependency(D.self) var d
#expect(d.count == 1)
} catch {}
}
let b = Task {
@Shared(.inMemory("count")) var count: Int = {
Thread.sleep(forTimeInterval: 0.2)
return 2
}()
#expect(count == 1)
}
let c = Task {
do {
try await Task.sleep(nanoseconds: 500_000_000)
Issue.record("Deadlock detected")
exit(1)
} catch {}
}
await withTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
_ = await (a.value, b.value)
c.cancel()
}
taskGroup.addTask { await c.value }
}
}
}

@Suite struct BoxReference {
Expand Down