Skip to content

Commit 2bb2b5a

Browse files
jasdevfreak4pc
authored andcommitted
Adds basic locking to ReplaySubject.
Following @sgl0v’s lead from their [related post](https://www.onswiftwings.com/posts/share-replay-operator/).
1 parent 0540960 commit 2bb2b5a

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

Sources/Subjects/ReplaySubject.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,18 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
2828
private var completion: Subscribers.Completion<Failure>?
2929
private var isActive: Bool { completion == nil }
3030

31+
private let lock = NSRecursiveLock()
32+
3133
/// Create a `ReplaySubject`, buffering up to `bufferSize` values and replaying them to new subscribers
3234
/// - Parameter bufferSize: The maximum number of value events to buffer and replay to all future subscribers.
3335
public init(bufferSize: Int) {
3436
self.bufferSize = bufferSize
3537
}
3638

3739
public func send(_ value: Output) {
40+
lock.lock()
41+
defer { lock.unlock() }
42+
3843
guard isActive else { return }
3944

4045
buffer.append(value)
@@ -47,6 +52,9 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
4752
}
4853

4954
public func send(completion: Subscribers.Completion<Failure>) {
55+
lock.lock()
56+
defer { lock.unlock() }
57+
5058
guard isActive else { return }
5159

5260
self.completion = completion
@@ -55,10 +63,16 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
5563
}
5664

5765
public func send(subscription: Combine.Subscription) {
66+
lock.lock()
67+
defer { lock.unlock() }
68+
5869
subscription.request(.unlimited)
5970
}
6071

6172
public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber) where Failure == Subscriber.Failure, Output == Subscriber.Input {
73+
lock.lock()
74+
defer { lock.unlock() }
75+
6276
let subscriberIdentifier = subscriber.combineIdentifier
6377

6478
let subscription = Subscription(downstream: AnySubscriber(subscriber)) { [weak self] in

0 commit comments

Comments
 (0)