Skip to content

Commit 0605a06

Browse files
authored
Fine tunes ReplaySubject’s locking. (#74)
1 parent 2bb2b5a commit 0605a06

File tree

2 files changed

+86
-23
lines changed

2 files changed

+86
-23
lines changed

Sources/Subjects/ReplaySubject.swift

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
2323
private var buffer = [Output]()
2424

2525
// Keeping track of all live subscriptions, so `send` events can be forwarded to them.
26-
private var subscriptions = [Subscription<AnySubscriber<Output, Failure>>]()
26+
private(set) var subscriptions = [Subscription<AnySubscriber<Output, Failure>>]()
2727

2828
private var completion: Subscribers.Completion<Failure>?
2929
private var isActive: Bool { completion == nil }
@@ -37,57 +37,76 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
3737
}
3838

3939
public func send(_ value: Output) {
40-
lock.lock()
41-
defer { lock.unlock() }
40+
let subscriptions: [Subscription<AnySubscriber<Output, Failure>>]
4241

43-
guard isActive else { return }
42+
do {
43+
lock.lock()
44+
defer { lock.unlock() }
4445

45-
buffer.append(value)
46+
guard isActive else { return }
4647

47-
if buffer.count > bufferSize {
48+
buffer.append(value)
49+
if buffer.count > bufferSize {
4850
buffer.removeFirst()
51+
}
52+
53+
subscriptions = self.subscriptions
4954
}
5055

5156
subscriptions.forEach { $0.forwardValueToBuffer(value) }
5257
}
5358

5459
public func send(completion: Subscribers.Completion<Failure>) {
55-
lock.lock()
56-
defer { lock.unlock() }
60+
let subscriptions: [Subscription<AnySubscriber<Output, Failure>>]
5761

58-
guard isActive else { return }
62+
do {
63+
lock.lock()
64+
defer { lock.unlock() }
5965

60-
self.completion = completion
66+
guard isActive else { return }
67+
68+
self.completion = completion
69+
70+
subscriptions = self.subscriptions
71+
}
6172

6273
subscriptions.forEach { $0.forwardCompletionToBuffer(completion) }
6374
}
6475

6576
public func send(subscription: Combine.Subscription) {
66-
lock.lock()
67-
defer { lock.unlock() }
68-
6977
subscription.request(.unlimited)
7078
}
7179

7280
public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber) where Failure == Subscriber.Failure, Output == Subscriber.Input {
73-
lock.lock()
74-
defer { lock.unlock() }
75-
7681
let subscriberIdentifier = subscriber.combineIdentifier
7782

7883
let subscription = Subscription(downstream: AnySubscriber(subscriber)) { [weak self] in
79-
guard let self = self,
80-
let subscriptionIndex = self.subscriptions
81-
.firstIndex(where: { $0.innerSubscriberIdentifier == subscriberIdentifier }) else { return }
82-
83-
self.subscriptions.remove(at: subscriptionIndex)
84+
self?.completeSubscriber(withIdentifier: subscriberIdentifier)
8485
}
8586

86-
subscriptions.append(subscription)
87+
let buffer: [Output]
88+
let completion: Subscribers.Completion<Failure>?
89+
90+
do {
91+
lock.lock()
92+
defer { lock.unlock() }
93+
94+
subscriptions.append(subscription)
95+
96+
buffer = self.buffer
97+
completion = self.completion
98+
}
8799

88100
subscriber.receive(subscription: subscription)
89101
subscription.replay(buffer, completion: completion)
90102
}
103+
104+
private func completeSubscriber(withIdentifier subscriberIdentifier: CombineIdentifier) {
105+
lock.lock()
106+
defer { self.lock.unlock() }
107+
108+
self.subscriptions.removeAll { $0.innerSubscriberIdentifier == subscriberIdentifier }
109+
}
91110
}
92111

93112
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)

Tests/ReplaySubjectTests.swift

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
#if !os(watchOS)
99
import Combine
10-
import CombineExt
10+
@testable import CombineExt
1111
import XCTest
1212

1313
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
@@ -347,5 +347,49 @@ final class ReplaySubjectTests: XCTestCase {
347347
XCTAssertEqual(["a2", "b2"], results)
348348
XCTAssertEqual([.finished, .finished], completions)
349349
}
350+
351+
func testRemovesSubscriptionsAfterCancellation() {
352+
let subject = ReplaySubject<Int, Never>(bufferSize: 1)
353+
354+
var subscription1: Subscription?
355+
let subscriber1 = AnySubscriber<Int, Never>(
356+
receiveSubscription: { subscription1 = $0 }
357+
)
358+
359+
var subscription2: Subscription?
360+
let subscriber2 = AnySubscriber<Int, Never>(
361+
receiveSubscription: { subscription2 = $0 }
362+
)
363+
364+
XCTAssertTrue(subject.subscriptions.isEmpty)
365+
366+
subject
367+
.subscribe(subscriber1)
368+
subject
369+
.subscribe(subscriber2)
370+
371+
XCTAssertEqual(
372+
subject
373+
.subscriptions
374+
.map(\.combineIdentifier),
375+
[
376+
subscription1?.combineIdentifier,
377+
subscription2?.combineIdentifier
378+
]
379+
)
380+
381+
subscription1?.cancel()
382+
383+
XCTAssertEqual(
384+
subject
385+
.subscriptions
386+
.map(\.combineIdentifier),
387+
[subscription2?.combineIdentifier]
388+
)
389+
390+
subscription2?.cancel()
391+
392+
XCTAssertTrue(subject.subscriptions.isEmpty)
393+
}
350394
}
351395
#endif

0 commit comments

Comments
 (0)