From 4d8e6d90387bf80cc08e975bc5144acec8bfc247 Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Sun, 23 Feb 2025 17:47:56 +0100 Subject: [PATCH 01/13] Make sure that everyone that plays events during subscription doesn't do that under their own lock --- RxSwift/Observables/ShareReplayScope.swift | 30 ++++++++++------------ RxSwift/Subjects/BehaviorSubject.swift | 8 +++--- RxSwift/Subjects/PublishSubject.swift | 7 +++-- RxSwift/Subjects/ReplaySubject.swift | 17 +++++++----- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/RxSwift/Observables/ShareReplayScope.swift b/RxSwift/Observables/ShareReplayScope.swift index 59d440dc3e..80c2f3c1ad 100644 --- a/RxSwift/Observables/ShareReplayScope.swift +++ b/RxSwift/Observables/ShareReplayScope.swift @@ -166,14 +166,13 @@ private final class ShareReplay1WhileConnectedConnection private let parent: Parent private let subscription = SingleAssignmentDisposable() - private let lock: RecursiveLock + private let lock = RecursiveLock() private var disposed: Bool = false fileprivate var observers = Observers() private var element: Element? - init(parent: Parent, lock: RecursiveLock) { + init(parent: Parent) { self.parent = parent - self.lock = lock #if TRACE_RESOURCES _ = Resources.incrementTotal() @@ -206,15 +205,15 @@ private final class ShareReplay1WhileConnectedConnection } final func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - self.lock.performLocked { - if let element = self.element { - observer.on(.next(element)) - } - - let disposeKey = self.observers.insert(observer.on) - - return SubscriptionDisposable(owner: self, key: disposeKey) + let disposeKey = self.lock.performLocked { + self.observers.insert(observer.on) } + + if let element = self.element { + observer.on(.next(element)) + } + + return SubscriptionDisposable(owner: self, key: disposeKey) } final private func synchronized_dispose() { @@ -274,8 +273,9 @@ final private class ShareReplay1WhileConnected let connection = self.synchronized_subscribe(observer) let count = connection.observers.count - let disposable = connection.synchronized_subscribe(observer) self.lock.unlock() + let disposable = connection.synchronized_subscribe(observer) + if count == 0 { connection.connect() @@ -292,9 +292,7 @@ final private class ShareReplay1WhileConnected connection = existingConnection } else { - connection = ShareReplay1WhileConnectedConnection( - parent: self, - lock: self.lock) + connection = ShareReplay1WhileConnectedConnection(parent: self) self.connection = connection } @@ -414,8 +412,8 @@ final private class ShareWhileConnected let connection = self.synchronized_subscribe(observer) let count = connection.observers.count - let disposable = connection.synchronized_subscribe(observer) self.lock.unlock() + let disposable = connection.synchronized_subscribe(observer) if count == 0 { connection.connect() diff --git a/RxSwift/Subjects/BehaviorSubject.swift b/RxSwift/Subjects/BehaviorSubject.swift index 7251f616b9..1fef412e8e 100644 --- a/RxSwift/Subjects/BehaviorSubject.swift +++ b/RxSwift/Subjects/BehaviorSubject.swift @@ -103,21 +103,21 @@ public final class BehaviorSubject /// - parameter observer: Observer to subscribe to the subject. /// - returns: Disposable object that can be used to unsubscribe the observer from the subject. public override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - self.lock.performLocked { self.synchronized_subscribe(observer) } - } - - func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { + lock.lock() if self.isDisposed { + lock.unlock() observer.on(.error(RxError.disposed(object: self))) return Disposables.create() } if let stoppedEvent = self.stoppedEvent { + lock.unlock() observer.on(stoppedEvent) return Disposables.create() } let key = self.observers.insert(observer.on) + lock.unlock() observer.on(.next(self.element)) return SubscriptionDisposable(owner: self, key: key) diff --git a/RxSwift/Subjects/PublishSubject.swift b/RxSwift/Subjects/PublishSubject.swift index 0318486ef9..3c6a23acf6 100644 --- a/RxSwift/Subjects/PublishSubject.swift +++ b/RxSwift/Subjects/PublishSubject.swift @@ -90,16 +90,15 @@ public final class PublishSubject - returns: Disposable object that can be used to unsubscribe the observer from the subject. */ public override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - self.lock.performLocked { self.synchronized_subscribe(observer) } - } - - func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { + lock.lock() if let stoppedEvent = self.stoppedEvent { + lock.unlock() observer.on(stoppedEvent) return Disposables.create() } if self.isDisposed { + lock.unlock() observer.on(.error(RxError.disposed(object: self))) return Disposables.create() } diff --git a/RxSwift/Subjects/ReplaySubject.swift b/RxSwift/Subjects/ReplaySubject.swift index 2336ce78f9..ddac0c4c0b 100644 --- a/RxSwift/Subjects/ReplaySubject.swift +++ b/RxSwift/Subjects/ReplaySubject.swift @@ -140,10 +140,7 @@ private class ReplayBufferBase } override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - self.lock.performLocked { self.synchronized_subscribe(observer) } - } - - func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { + lock.lock() if self.isDisposed { observer.on(.error(RxError.disposed(object: self))) return Disposables.create() @@ -151,13 +148,19 @@ private class ReplayBufferBase let anyObserver = observer.asObserver() - self.replayBuffer(anyObserver) if let stoppedEvent = self.stoppedEvent { + lock.unlock() + + self.replayBuffer(anyObserver) observer.on(stoppedEvent) + return Disposables.create() - } - else { + } else { let key = self.observers.insert(observer.on) + lock.unlock() + + self.replayBuffer(anyObserver) + return SubscriptionDisposable(owner: self, key: key) } } From 4cffbf779d112deabc4057ff022c229dc207b10a Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Sun, 23 Feb 2025 18:08:13 +0100 Subject: [PATCH 02/13] Fix the ShareReplay connecting more than once --- RxSwift/Observables/ShareReplayScope.swift | 22 ++++++++++++++-------- RxSwift/Subjects/ReplaySubject.swift | 7 ++----- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/RxSwift/Observables/ShareReplayScope.swift b/RxSwift/Observables/ShareReplayScope.swift index 80c2f3c1ad..daf237434b 100644 --- a/RxSwift/Observables/ShareReplayScope.swift +++ b/RxSwift/Observables/ShareReplayScope.swift @@ -166,13 +166,14 @@ private final class ShareReplay1WhileConnectedConnection private let parent: Parent private let subscription = SingleAssignmentDisposable() - private let lock = RecursiveLock() + private let lock: RecursiveLock private var disposed: Bool = false fileprivate var observers = Observers() private var element: Element? - init(parent: Parent) { + init(parent: Parent, lock: RecursiveLock) { self.parent = parent + self.lock = lock #if TRACE_RESOURCES _ = Resources.incrementTotal() @@ -205,15 +206,17 @@ private final class ShareReplay1WhileConnectedConnection } final func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - let disposeKey = self.lock.performLocked { - self.observers.insert(observer.on) + self.lock.performLocked { + let disposeKey = self.observers.insert(observer.on) + + return SubscriptionDisposable(owner: self, key: disposeKey) } + } + func replayStoredElementIfNeeded(_ observer: Observer) where Observer.Element == Element { if let element = self.element { observer.on(.next(element)) } - - return SubscriptionDisposable(owner: self, key: disposeKey) } final private func synchronized_dispose() { @@ -273,9 +276,10 @@ final private class ShareReplay1WhileConnected let connection = self.synchronized_subscribe(observer) let count = connection.observers.count - self.lock.unlock() let disposable = connection.synchronized_subscribe(observer) + self.lock.unlock() + connection.replayStoredElementIfNeeded(observer) if count == 0 { connection.connect() @@ -292,7 +296,9 @@ final private class ShareReplay1WhileConnected connection = existingConnection } else { - connection = ShareReplay1WhileConnectedConnection(parent: self) + connection = ShareReplay1WhileConnectedConnection( + parent: self, + lock: self.lock) self.connection = connection } diff --git a/RxSwift/Subjects/ReplaySubject.swift b/RxSwift/Subjects/ReplaySubject.swift index ddac0c4c0b..6d1396fe2a 100644 --- a/RxSwift/Subjects/ReplaySubject.swift +++ b/RxSwift/Subjects/ReplaySubject.swift @@ -150,17 +150,14 @@ private class ReplayBufferBase if let stoppedEvent = self.stoppedEvent { lock.unlock() - self.replayBuffer(anyObserver) observer.on(stoppedEvent) - return Disposables.create() - } else { + } + else { let key = self.observers.insert(observer.on) lock.unlock() - self.replayBuffer(anyObserver) - return SubscriptionDisposable(owner: self, key: key) } } From 3bbf7693d6c6be1a56fa1015c139ece6c58ff47c Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Sun, 23 Feb 2025 19:01:32 +0100 Subject: [PATCH 03/13] Don't forget to unlock PublishSubject --- RxSwift/Subjects/PublishSubject.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/RxSwift/Subjects/PublishSubject.swift b/RxSwift/Subjects/PublishSubject.swift index 3c6a23acf6..40d4691177 100644 --- a/RxSwift/Subjects/PublishSubject.swift +++ b/RxSwift/Subjects/PublishSubject.swift @@ -104,6 +104,7 @@ public final class PublishSubject } let key = self.observers.insert(observer.on) + lock.unlock() return SubscriptionDisposable(owner: self, key: key) } From 41856c6382550cb7473b8cc6f5f61c0b143b6846 Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Sun, 23 Feb 2025 19:38:59 +0100 Subject: [PATCH 04/13] Fix ShareReplayConnection state access --- RxSwift/Observables/ShareReplayScope.swift | 26 +++++++--------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/RxSwift/Observables/ShareReplayScope.swift b/RxSwift/Observables/ShareReplayScope.swift index daf237434b..8ed1bdb792 100644 --- a/RxSwift/Observables/ShareReplayScope.swift +++ b/RxSwift/Observables/ShareReplayScope.swift @@ -169,7 +169,7 @@ private final class ShareReplay1WhileConnectedConnection private let lock: RecursiveLock private var disposed: Bool = false fileprivate var observers = Observers() - private var element: Element? + fileprivate var element: Element? init(parent: Parent, lock: RecursiveLock) { self.parent = parent @@ -205,20 +205,6 @@ private final class ShareReplay1WhileConnectedConnection self.subscription.setDisposable(self.parent.source.subscribe(self)) } - final func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - self.lock.performLocked { - let disposeKey = self.observers.insert(observer.on) - - return SubscriptionDisposable(owner: self, key: disposeKey) - } - } - - func replayStoredElementIfNeeded(_ observer: Observer) where Observer.Element == Element { - if let element = self.element { - observer.on(.next(element)) - } - } - final private func synchronized_dispose() { self.disposed = true if self.parent.connection === self { @@ -276,16 +262,20 @@ final private class ShareReplay1WhileConnected let connection = self.synchronized_subscribe(observer) let count = connection.observers.count - let disposable = connection.synchronized_subscribe(observer) + let disposeKey = connection.observers.insert(observer.on) + + let initialValueToReplay = connection.element self.lock.unlock() - connection.replayStoredElementIfNeeded(observer) + if let initialValueToReplay { + observer.on(.next(initialValueToReplay)) + } if count == 0 { connection.connect() } - return disposable + return SubscriptionDisposable(owner: connection, key: disposeKey) } @inline(__always) From 4e63a6e50c44bfaa4fe10ff3657859faf3286590 Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Sun, 23 Feb 2025 19:43:59 +0100 Subject: [PATCH 05/13] Fix ReplaySubject accessing its state under lock --- RxSwift/Subjects/ReplaySubject.swift | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/RxSwift/Subjects/ReplaySubject.swift b/RxSwift/Subjects/ReplaySubject.swift index 6d1396fe2a..087bc8cc45 100644 --- a/RxSwift/Subjects/ReplaySubject.swift +++ b/RxSwift/Subjects/ReplaySubject.swift @@ -103,7 +103,7 @@ private class ReplayBufferBase rxAbstractMethod() } - func replayBuffer(_ observer: Observer) where Observer.Element == Element { + func getEventsToReplay() -> [Event] { rxAbstractMethod() } @@ -149,15 +149,21 @@ private class ReplayBufferBase let anyObserver = observer.asObserver() if let stoppedEvent = self.stoppedEvent { + let eventsToReplay = self.getEventsToReplay() lock.unlock() - self.replayBuffer(anyObserver) + for event in eventsToReplay { + observer.on(event) + } observer.on(stoppedEvent) return Disposables.create() } else { let key = self.observers.insert(observer.on) + let eventsToReplay = self.getEventsToReplay() lock.unlock() - self.replayBuffer(anyObserver) + for event in eventsToReplay { + observer.on(event) + } return SubscriptionDisposable(owner: self, key: key) } } @@ -205,10 +211,11 @@ private final class ReplayOne : ReplayBufferBase { self.value = value } - override func replayBuffer(_ observer: Observer) where Observer.Element == Element { + override func getEventsToReplay() -> [Event] { if let value = self.value { - observer.on(.next(value)) + return [.next(value)] } + return [] } override func synchronized_dispose() { @@ -228,10 +235,8 @@ private class ReplayManyBase: ReplayBufferBase { self.queue.enqueue(value) } - override func replayBuffer(_ observer: Observer) where Observer.Element == Element { - for item in self.queue { - observer.on(.next(item)) - } + override func getEventsToReplay() -> [Event] { + return queue.map(Event.next) } override func synchronized_dispose() { From 15ddc7411676b7700e7df6443f4ca59c1fb2ed8a Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Sun, 23 Feb 2025 19:45:28 +0100 Subject: [PATCH 06/13] Cleanup --- RxSwift/Subjects/ReplaySubject.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/RxSwift/Subjects/ReplaySubject.swift b/RxSwift/Subjects/ReplaySubject.swift index 087bc8cc45..96b27d3a2c 100644 --- a/RxSwift/Subjects/ReplaySubject.swift +++ b/RxSwift/Subjects/ReplaySubject.swift @@ -236,7 +236,9 @@ private class ReplayManyBase: ReplayBufferBase { } override func getEventsToReplay() -> [Event] { - return queue.map(Event.next) + return queue.map { element in + Event.next(element) + } } override func synchronized_dispose() { From 853faae1f685bd4149233c2f69e6db2353732a71 Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Tue, 25 Feb 2025 19:27:50 +0100 Subject: [PATCH 07/13] Finalize the entire suite of tests --- Tests/RxSwiftTests/Anomalies.swift | 100 +++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/Tests/RxSwiftTests/Anomalies.swift b/Tests/RxSwiftTests/Anomalies.swift index 4e1cd204e9..8762a2e6ee 100644 --- a/Tests/RxSwiftTests/Anomalies.swift +++ b/Tests/RxSwiftTests/Anomalies.swift @@ -176,4 +176,104 @@ extension AnomaliesTest { performSharingOperatorsTest(share: op) } } + + func testShareReplayOneInitialEmissionDeadlock() { + let immediatelyEmittingSource = Observable.create { observer in + observer.on(.next(())) + return Disposables.create() + } + .share(replay: 1) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "`share(replay: 1)`", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 1) + } + + func testIdleBehaviorSubjectInitialEmissionDeadlock() { + let immediatelyEmittingSource = BehaviorSubject(value: ()) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "'Idle BehaviorSubject'", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 1) + } + + func testCompletedBehaviorSubjectInitialEmissionDeadlock() { + let immediatelyEmittingSource = BehaviorSubject(value: ()) + immediatelyEmittingSource.on(.completed) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "'BehaviorSubject with completed event'", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 1) + } + + func testCompletedPublishSubjectInitialEmissionDeadlock() { + let immediatelyEmittingSource = PublishSubject() + immediatelyEmittingSource.on(.completed) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "'PublishSubject with completed event'", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 1) + } + + func testIdleReplaySubjectInitialEmissionDeadlock() { + let immediatelyEmittingSource = ReplaySubject.create(bufferSize: 1) + immediatelyEmittingSource.on(.next(())) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "'Idle ReplaySubject'", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 1) + } + + func testCompletedReplaySubjectInitialEmissionDeadlock() { + let immediatelyEmittingSource = ReplaySubject.create(bufferSize: 1) + immediatelyEmittingSource.on(.completed) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "'ReplaySubject with completed event'", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 1) + } + + private func createInitialEmissionsDeadlockExpectation( + sourceName: String, + immediatelyEmittingSource: Observable + ) -> XCTestExpectation { + let exp = expectation(description: "`\(sourceName)` doesn't cause a deadlock in multithreaded environment because it replays with its own lock acquired") + + let triggerRange = 0..<100 + + let concurrentScheduler = ConcurrentDispatchQueueScheduler(qos: .userInitiated) + + let multipleSubscriptions = Observable.zip(triggerRange.map { _ in + Observable.just(()) + .observe(on: concurrentScheduler) + .flatMap { _ in + immediatelyEmittingSource + } + .take(1) + }) + + _ = multipleSubscriptions.subscribe(onCompleted: { + exp.fulfill() + }) + + return exp + } } From e5dca1c9f5766e7b9f8ec55b0ddbac0c01568a2c Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Tue, 25 Feb 2025 20:41:52 +0100 Subject: [PATCH 08/13] Test share replay 3 --- Tests/RxSwiftTests/Anomalies.swift | 58 ++++-------------------------- 1 file changed, 7 insertions(+), 51 deletions(-) diff --git a/Tests/RxSwiftTests/Anomalies.swift b/Tests/RxSwiftTests/Anomalies.swift index 8762a2e6ee..bc3042dcca 100644 --- a/Tests/RxSwiftTests/Anomalies.swift +++ b/Tests/RxSwiftTests/Anomalies.swift @@ -192,59 +192,15 @@ extension AnomaliesTest { wait(for: [exp], timeout: 1) } - func testIdleBehaviorSubjectInitialEmissionDeadlock() { - let immediatelyEmittingSource = BehaviorSubject(value: ()) - - let exp = createInitialEmissionsDeadlockExpectation( - sourceName: "'Idle BehaviorSubject'", - immediatelyEmittingSource: immediatelyEmittingSource - ) - - wait(for: [exp], timeout: 1) - } - - func testCompletedBehaviorSubjectInitialEmissionDeadlock() { - let immediatelyEmittingSource = BehaviorSubject(value: ()) - immediatelyEmittingSource.on(.completed) - - let exp = createInitialEmissionsDeadlockExpectation( - sourceName: "'BehaviorSubject with completed event'", - immediatelyEmittingSource: immediatelyEmittingSource - ) - - wait(for: [exp], timeout: 1) - } - - func testCompletedPublishSubjectInitialEmissionDeadlock() { - let immediatelyEmittingSource = PublishSubject() - immediatelyEmittingSource.on(.completed) - - let exp = createInitialEmissionsDeadlockExpectation( - sourceName: "'PublishSubject with completed event'", - immediatelyEmittingSource: immediatelyEmittingSource - ) - - wait(for: [exp], timeout: 1) - } - - func testIdleReplaySubjectInitialEmissionDeadlock() { - let immediatelyEmittingSource = ReplaySubject.create(bufferSize: 1) - immediatelyEmittingSource.on(.next(())) - - let exp = createInitialEmissionsDeadlockExpectation( - sourceName: "'Idle ReplaySubject'", - immediatelyEmittingSource: immediatelyEmittingSource - ) - - wait(for: [exp], timeout: 1) - } - - func testCompletedReplaySubjectInitialEmissionDeadlock() { - let immediatelyEmittingSource = ReplaySubject.create(bufferSize: 1) - immediatelyEmittingSource.on(.completed) + func testShareReplayMoreInitialEmissionDeadlock() { + let immediatelyEmittingSource = Observable.create { observer in + observer.on(.next(())) + return Disposables.create() + } + .share(replay: 3) let exp = createInitialEmissionsDeadlockExpectation( - sourceName: "'ReplaySubject with completed event'", + sourceName: "`share(replay: 3)`", immediatelyEmittingSource: immediatelyEmittingSource ) From 28c872d96629cd96ca117176421484f184a43a14 Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Tue, 25 Feb 2025 20:43:27 +0100 Subject: [PATCH 09/13] Restore implementations of Subjects to pre-pr state --- RxSwift/Subjects/BehaviorSubject.swift | 8 +++---- RxSwift/Subjects/PublishSubject.swift | 8 +++---- RxSwift/Subjects/ReplaySubject.swift | 29 ++++++++++---------------- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/RxSwift/Subjects/BehaviorSubject.swift b/RxSwift/Subjects/BehaviorSubject.swift index 1fef412e8e..7251f616b9 100644 --- a/RxSwift/Subjects/BehaviorSubject.swift +++ b/RxSwift/Subjects/BehaviorSubject.swift @@ -103,21 +103,21 @@ public final class BehaviorSubject /// - parameter observer: Observer to subscribe to the subject. /// - returns: Disposable object that can be used to unsubscribe the observer from the subject. public override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - lock.lock() + self.lock.performLocked { self.synchronized_subscribe(observer) } + } + + func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { if self.isDisposed { - lock.unlock() observer.on(.error(RxError.disposed(object: self))) return Disposables.create() } if let stoppedEvent = self.stoppedEvent { - lock.unlock() observer.on(stoppedEvent) return Disposables.create() } let key = self.observers.insert(observer.on) - lock.unlock() observer.on(.next(self.element)) return SubscriptionDisposable(owner: self, key: key) diff --git a/RxSwift/Subjects/PublishSubject.swift b/RxSwift/Subjects/PublishSubject.swift index 40d4691177..0318486ef9 100644 --- a/RxSwift/Subjects/PublishSubject.swift +++ b/RxSwift/Subjects/PublishSubject.swift @@ -90,21 +90,21 @@ public final class PublishSubject - returns: Disposable object that can be used to unsubscribe the observer from the subject. */ public override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - lock.lock() + self.lock.performLocked { self.synchronized_subscribe(observer) } + } + + func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { if let stoppedEvent = self.stoppedEvent { - lock.unlock() observer.on(stoppedEvent) return Disposables.create() } if self.isDisposed { - lock.unlock() observer.on(.error(RxError.disposed(object: self))) return Disposables.create() } let key = self.observers.insert(observer.on) - lock.unlock() return SubscriptionDisposable(owner: self, key: key) } diff --git a/RxSwift/Subjects/ReplaySubject.swift b/RxSwift/Subjects/ReplaySubject.swift index 96b27d3a2c..2336ce78f9 100644 --- a/RxSwift/Subjects/ReplaySubject.swift +++ b/RxSwift/Subjects/ReplaySubject.swift @@ -103,7 +103,7 @@ private class ReplayBufferBase rxAbstractMethod() } - func getEventsToReplay() -> [Event] { + func replayBuffer(_ observer: Observer) where Observer.Element == Element { rxAbstractMethod() } @@ -140,7 +140,10 @@ private class ReplayBufferBase } override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - lock.lock() + self.lock.performLocked { self.synchronized_subscribe(observer) } + } + + func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { if self.isDisposed { observer.on(.error(RxError.disposed(object: self))) return Disposables.create() @@ -148,22 +151,13 @@ private class ReplayBufferBase let anyObserver = observer.asObserver() + self.replayBuffer(anyObserver) if let stoppedEvent = self.stoppedEvent { - let eventsToReplay = self.getEventsToReplay() - lock.unlock() - for event in eventsToReplay { - observer.on(event) - } observer.on(stoppedEvent) return Disposables.create() } else { let key = self.observers.insert(observer.on) - let eventsToReplay = self.getEventsToReplay() - lock.unlock() - for event in eventsToReplay { - observer.on(event) - } return SubscriptionDisposable(owner: self, key: key) } } @@ -211,11 +205,10 @@ private final class ReplayOne : ReplayBufferBase { self.value = value } - override func getEventsToReplay() -> [Event] { + override func replayBuffer(_ observer: Observer) where Observer.Element == Element { if let value = self.value { - return [.next(value)] + observer.on(.next(value)) } - return [] } override func synchronized_dispose() { @@ -235,9 +228,9 @@ private class ReplayManyBase: ReplayBufferBase { self.queue.enqueue(value) } - override func getEventsToReplay() -> [Event] { - return queue.map { element in - Event.next(element) + override func replayBuffer(_ observer: Observer) where Observer.Element == Element { + for item in self.queue { + observer.on(.next(item)) } } From 83896ee0aee4d070992d6c60a75987a931e43aa8 Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Tue, 25 Feb 2025 20:45:36 +0100 Subject: [PATCH 10/13] Improve labeling --- Tests/RxSwiftTests/Anomalies.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Tests/RxSwiftTests/Anomalies.swift b/Tests/RxSwiftTests/Anomalies.swift index bc3042dcca..004b24be0c 100644 --- a/Tests/RxSwiftTests/Anomalies.swift +++ b/Tests/RxSwiftTests/Anomalies.swift @@ -177,7 +177,7 @@ extension AnomaliesTest { } } - func testShareReplayOneInitialEmissionDeadlock() { + func test2653ShareReplayOneInitialEmissionDeadlock() { let immediatelyEmittingSource = Observable.create { observer in observer.on(.next(())) return Disposables.create() @@ -192,7 +192,7 @@ extension AnomaliesTest { wait(for: [exp], timeout: 1) } - func testShareReplayMoreInitialEmissionDeadlock() { + func test2653ShareReplayMoreInitialEmissionDeadlock() { let immediatelyEmittingSource = Observable.create { observer in observer.on(.next(())) return Disposables.create() @@ -211,7 +211,7 @@ extension AnomaliesTest { sourceName: String, immediatelyEmittingSource: Observable ) -> XCTestExpectation { - let exp = expectation(description: "`\(sourceName)` doesn't cause a deadlock in multithreaded environment because it replays with its own lock acquired") + let exp = expectation(description: "`\(sourceName)` doesn't cause a deadlock in multithreaded environment because it doesn't keep its lock acquired to replay values upon subscription") let triggerRange = 0..<100 From b96fe94090da2a104d0e4dacf5870063b5e31431 Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Tue, 25 Feb 2025 21:15:49 +0100 Subject: [PATCH 11/13] Add more tests --- Tests/RxSwiftTests/Anomalies.swift | 42 +++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/Tests/RxSwiftTests/Anomalies.swift b/Tests/RxSwiftTests/Anomalies.swift index 004b24be0c..abeb76224f 100644 --- a/Tests/RxSwiftTests/Anomalies.swift +++ b/Tests/RxSwiftTests/Anomalies.swift @@ -182,10 +182,10 @@ extension AnomaliesTest { observer.on(.next(())) return Disposables.create() } - .share(replay: 1) + .share(replay: 1, scope: .whileConnected) let exp = createInitialEmissionsDeadlockExpectation( - sourceName: "`share(replay: 1)`", + sourceName: "`share(replay: 1, scope: .whileConnected)`", immediatelyEmittingSource: immediatelyEmittingSource ) @@ -197,10 +197,40 @@ extension AnomaliesTest { observer.on(.next(())) return Disposables.create() } - .share(replay: 3) + .share(replay: 2, scope: .whileConnected) let exp = createInitialEmissionsDeadlockExpectation( - sourceName: "`share(replay: 3)`", + sourceName: "`share(replay: 2, scope: .whileConnected)`", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 1) + } + + func test2653ShareReplayOneForeverInitialEmissionDeadlock() { + let immediatelyEmittingSource = Observable.create { observer in + observer.on(.next(())) + return Disposables.create() + } + .share(replay: 1, scope: .forever) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "`share(replay: 1, scope: .forever)`", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 1) + } + + func test2653ShareReplayMoreForeverInitialEmissionDeadlock() { + let immediatelyEmittingSource = Observable.create { observer in + observer.on(.next(())) + return Disposables.create() + } + .share(replay: 2, scope: .forever) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "`share(replay: 2, scope: .forever)`", immediatelyEmittingSource: immediatelyEmittingSource ) @@ -215,11 +245,9 @@ extension AnomaliesTest { let triggerRange = 0..<100 - let concurrentScheduler = ConcurrentDispatchQueueScheduler(qos: .userInitiated) - let multipleSubscriptions = Observable.zip(triggerRange.map { _ in Observable.just(()) - .observe(on: concurrentScheduler) + .observe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated)) .flatMap { _ in immediatelyEmittingSource } From ae926b487b6cdd0ffeda330f2d251a728d674b1d Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Wed, 26 Feb 2025 21:15:01 +0100 Subject: [PATCH 12/13] Improve tolerance for failure to repro in a more detailed fashion --- Tests/RxSwiftTests/Anomalies.swift | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Tests/RxSwiftTests/Anomalies.swift b/Tests/RxSwiftTests/Anomalies.swift index abeb76224f..3f8e81d77d 100644 --- a/Tests/RxSwiftTests/Anomalies.swift +++ b/Tests/RxSwiftTests/Anomalies.swift @@ -189,7 +189,7 @@ extension AnomaliesTest { immediatelyEmittingSource: immediatelyEmittingSource ) - wait(for: [exp], timeout: 1) + wait(for: [exp], timeout: 5) } func test2653ShareReplayMoreInitialEmissionDeadlock() { @@ -204,7 +204,7 @@ extension AnomaliesTest { immediatelyEmittingSource: immediatelyEmittingSource ) - wait(for: [exp], timeout: 1) + wait(for: [exp], timeout: 5) } func test2653ShareReplayOneForeverInitialEmissionDeadlock() { @@ -219,7 +219,7 @@ extension AnomaliesTest { immediatelyEmittingSource: immediatelyEmittingSource ) - wait(for: [exp], timeout: 1) + wait(for: [exp], timeout: 5) } func test2653ShareReplayMoreForeverInitialEmissionDeadlock() { @@ -234,7 +234,7 @@ extension AnomaliesTest { immediatelyEmittingSource: immediatelyEmittingSource ) - wait(for: [exp], timeout: 1) + wait(for: [exp], timeout: 5) } private func createInitialEmissionsDeadlockExpectation( @@ -243,7 +243,7 @@ extension AnomaliesTest { ) -> XCTestExpectation { let exp = expectation(description: "`\(sourceName)` doesn't cause a deadlock in multithreaded environment because it doesn't keep its lock acquired to replay values upon subscription") - let triggerRange = 0..<100 + let triggerRange = 0..<1000 let multipleSubscriptions = Observable.zip(triggerRange.map { _ in Observable.just(()) From 7bd2d49fe4e8aac2f3f023029c2d9e1a761dae15 Mon Sep 17 00:00:00 2001 From: Isaac Weisberg Date: Wed, 26 Feb 2025 21:15:47 +0100 Subject: [PATCH 13/13] Fix the `ConnectableObservableAdapter` --- RxSwift/Observables/Multicast.swift | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/RxSwift/Observables/Multicast.swift b/RxSwift/Observables/Multicast.swift index 1ebcf0212f..2d0df4dc8a 100644 --- a/RxSwift/Observables/Multicast.swift +++ b/RxSwift/Observables/Multicast.swift @@ -229,13 +229,15 @@ final private class ConnectableObservableAdapter } private var lazySubject: Subject { - if let subject = self.subject { + lock.performLocked { + if let subject = self.subject { + return subject + } + + let subject = self.makeSubject() + self.subject = subject return subject } - - let subject = self.makeSubject() - self.subject = subject - return subject } override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {