Skip to content

Commit dd015c3

Browse files
authored
Merge pull request #123 from ReactiveCocoa/interrupt-fix
Mitigate race conditions in `Signal` interrupt handling.
2 parents dba767d + 7a73f82 commit dd015c3

File tree

2 files changed

+183
-29
lines changed

2 files changed

+183
-29
lines changed

Sources/Signal.swift

Lines changed: 84 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -40,58 +40,113 @@ public final class Signal<Value, Error: Swift.Error> {
4040
public init(_ generator: (Observer) -> Disposable?) {
4141
state = Atomic(SignalState())
4242

43+
/// Holds the final signal state captured by an `interrupted` event. If it
44+
/// is set, the Signal should interrupt as soon as possible. Implicitly
45+
/// protected by `state` and `sendLock`.
46+
var interruptedState: SignalState<Value, Error>? = nil
47+
48+
/// Used to track if the signal has terminated. Protected by `sendLock`.
49+
var terminated = false
50+
4351
/// Used to ensure that events are serialized during delivery to observers.
4452
let sendLock = NSLock()
4553
sendLock.name = "org.reactivecocoa.ReactiveSwift.Signal"
4654

47-
/// When set to `true`, the Signal should interrupt as soon as possible.
48-
let interrupted = Atomic(false)
49-
5055
let observer = Observer { [weak self] event in
5156
guard let signal = self else {
5257
return
5358
}
5459

55-
func interrupt() {
56-
if let state = signal.state.swap(nil) {
57-
for observer in state.observers {
58-
observer.sendInterrupted()
59-
}
60+
@inline(__always)
61+
func interrupt(_ observers: Bag<Observer>) {
62+
for observer in observers {
63+
observer.sendInterrupted()
6064
}
65+
terminated = true
66+
interruptedState = nil
6167
}
6268

6369
if case .interrupted = event {
64-
// Normally we disallow recursive events, but `interrupted` is
65-
// kind of a special snowflake, since it can inadvertently be
66-
// sent by downstream consumers.
70+
// Recursive events are generally disallowed. But `interrupted` is kind
71+
// of a special snowflake, since it can inadvertently be sent by
72+
// downstream consumers.
6773
//
68-
// So we'll flag Interrupted events specially, and if it
69-
// happened to occur while we're sending something else, we'll
70-
// wait to deliver it.
71-
interrupted.value = true
72-
73-
if sendLock.try() {
74-
interrupt()
75-
sendLock.unlock()
76-
77-
signal.generatorDisposable?.dispose()
74+
// So we would treat `interrupted` events specially. If it happens
75+
// to occur while the `sendLock` is acquired, the observer call-out and
76+
// the disposal would be delegated to the current sender, or
77+
// occasionally one of the senders waiting on `sendLock`.
78+
if let state = signal.state.swap(nil) {
79+
// Writes to `interruptedState` are implicitly synchronized. So we do
80+
// not need to guard it with locks.
81+
//
82+
// Specifically, senders serialized by `sendLock` can react to and
83+
// clear `interruptedState` only if they see the write made below.
84+
// The write can happen only once, since `state` being swapped with
85+
// `nil` is a point of no return.
86+
//
87+
// Even in the case that both a previous sender and its successor see
88+
// the write (the `interruptedState` check before & after the unlock
89+
// of `sendLock`), the senders are still bound to the `sendLock`.
90+
// So whichever sender loses the battle of acquring `sendLock` is
91+
// guaranteed to be blocked.
92+
interruptedState = state
93+
94+
if sendLock.try() {
95+
if !terminated, let state = interruptedState {
96+
interrupt(state.observers)
97+
}
98+
sendLock.unlock()
99+
signal.generatorDisposable?.dispose()
100+
}
78101
}
79102
} else {
80-
if let state = (event.isTerminating ? signal.state.swap(nil) : signal.state.value) {
103+
let isTerminating = event.isTerminating
104+
105+
if let observers = (isTerminating ? signal.state.swap(nil)?.observers : signal.state.value?.observers) {
106+
var shouldDispose = false
107+
81108
sendLock.lock()
82109

83-
for observer in state.observers {
84-
observer.action(event)
85-
}
110+
if !terminated {
111+
for observer in observers {
112+
observer.action(event)
113+
}
114+
115+
// Check if a downstream consumer or a concurrent sender has
116+
// interrupted the signal.
117+
if !isTerminating, let state = interruptedState {
118+
interrupt(state.observers)
119+
shouldDispose = true
120+
}
86121

87-
let shouldInterrupt = !event.isTerminating && interrupted.value
88-
if shouldInterrupt {
89-
interrupt()
122+
if isTerminating {
123+
terminated = true
124+
shouldDispose = true
125+
}
90126
}
91127

92128
sendLock.unlock()
93129

94-
if event.isTerminating || shouldInterrupt {
130+
// Based on the implicit memory order, any updates to the
131+
// `interruptedState` should always be visible after `sendLock` is
132+
// released. So we check it again and handle the interruption if
133+
// it has not been taken over.
134+
if !shouldDispose && !terminated && !isTerminating, let state = interruptedState {
135+
sendLock.lock()
136+
137+
// `terminated` before acquring the lock could be a false negative,
138+
// since it might race against other concurrent senders until the
139+
// lock acquisition above succeeds. So we have to check again if the
140+
// signal is really still alive.
141+
if !terminated {
142+
interrupt(state.observers)
143+
shouldDispose = true
144+
}
145+
146+
sendLock.unlock()
147+
}
148+
149+
if shouldDispose {
95150
// Dispose only after notifying observers, so disposal
96151
// logic is consistently the last thing to run.
97152
signal.generatorDisposable?.dispose()

Tests/ReactiveSwiftTests/SignalSpec.swift

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
//
88

99
import Foundation
10+
import Dispatch
1011

1112
import Result
1213
import Nimble
@@ -213,6 +214,104 @@ class SignalSpec: QuickSpec {
213214
}
214215
}
215216

217+
describe("interruption") {
218+
it("should not send events after sending an interrupted event") {
219+
let queue: DispatchQueue
220+
let counter = Atomic<Int>(0)
221+
222+
if #available(macOS 10.10, *) {
223+
queue = DispatchQueue.global(qos: .userInitiated)
224+
} else {
225+
queue = DispatchQueue.global(priority: .high)
226+
}
227+
228+
let (signal, observer) = Signal<Int, NoError>.pipe()
229+
230+
var hasSlept = false
231+
var events: [Event<Int, NoError>] = []
232+
233+
// Used to synchronize the `interrupt` sender to only act after the
234+
// chosen observer has started sending its event, but before it is done.
235+
let semaphore = DispatchSemaphore(value: 0)
236+
237+
signal.observe { event in
238+
if !hasSlept {
239+
semaphore.signal()
240+
// 100000 us = 0.1 s
241+
usleep(100000)
242+
hasSlept = true
243+
}
244+
events.append(event)
245+
}
246+
247+
let group = DispatchGroup()
248+
249+
DispatchQueue.concurrentPerform(iterations: 10) { index in
250+
queue.async(group: group) {
251+
observer.send(value: index)
252+
}
253+
254+
if index == 0 {
255+
semaphore.wait()
256+
observer.sendInterrupted()
257+
}
258+
}
259+
260+
group.wait()
261+
262+
expect(events.count) == 2
263+
264+
if events.count >= 2 {
265+
expect(events[1].isTerminating) == true
266+
}
267+
}
268+
269+
it("should interrupt concurrently") {
270+
let queue: DispatchQueue
271+
let counter = Atomic<Int>(0)
272+
let executionCounter = Atomic<Int>(0)
273+
274+
if #available(macOS 10.10, *) {
275+
queue = DispatchQueue.global(qos: .userInitiated)
276+
} else {
277+
queue = DispatchQueue.global(priority: .high)
278+
}
279+
280+
let iterations = 1000
281+
let group = DispatchGroup()
282+
283+
queue.async(group: group) {
284+
DispatchQueue.concurrentPerform(iterations: iterations) { _ in
285+
let (signal, observer) = Signal<(), NoError>.pipe()
286+
287+
var isInterrupted = false
288+
signal.observeInterrupted { counter.modify { $0 += 1 } }
289+
290+
// Used to synchronize the `value` sender and the `interrupt`
291+
// sender, giving a slight priority to the former.
292+
let semaphore = DispatchSemaphore(value: 0)
293+
294+
queue.async(group: group) {
295+
semaphore.signal()
296+
observer.send(value: ())
297+
executionCounter.modify { $0 += 1 }
298+
}
299+
300+
queue.async(group: group) {
301+
semaphore.wait()
302+
observer.sendInterrupted()
303+
executionCounter.modify { $0 += 1 }
304+
}
305+
}
306+
}
307+
308+
group.wait()
309+
310+
expect(executionCounter.value) == iterations * 2
311+
expect(counter.value).toEventually(equal(iterations), timeout: 5)
312+
}
313+
}
314+
216315
describe("observe") {
217316
var testScheduler: TestScheduler!
218317

0 commit comments

Comments
 (0)