Skip to content

Commit b8f2de0

Browse files
committed
Failing test cases for race conditions in signal interruption.
1 parent 0fcbdc6 commit b8f2de0

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed

Tests/ReactiveSwiftTests/SignalSpec.swift

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,96 @@ class SignalSpec: QuickSpec {
213213
}
214214
}
215215

216+
describe("interruption") {
217+
it("should not send events after sending an interrupted event") {
218+
let queue: DispatchQueue
219+
let counter = Atomic<Int>(0)
220+
221+
if #available(macOS 10.10, *) {
222+
queue = DispatchQueue.global(qos: .userInitiated)
223+
} else {
224+
queue = DispatchQueue.global(priority: .high)
225+
}
226+
227+
let (signal, observer) = Signal<Int, NoError>.pipe()
228+
229+
var hasSlept = false
230+
var events = [Event<Int, NoError>]()
231+
232+
let sema = DispatchSemaphore(value: 0)
233+
234+
signal.observe { event in
235+
if !hasSlept {
236+
sema.signal()
237+
sleep(5)
238+
hasSlept = true
239+
}
240+
events.append(event)
241+
}
242+
243+
let group = DispatchGroup()
244+
245+
DispatchQueue.concurrentPerform(iterations: 10) { index in
246+
queue.async(group: group) {
247+
observer.send(value: index)
248+
}
249+
250+
if index == 0 {
251+
sema.wait()
252+
observer.sendInterrupted()
253+
}
254+
}
255+
256+
group.wait()
257+
258+
expect(events.count) == 2
259+
expect(events.count >= 2 ? events[1].isTerminating : false) == true
260+
}
261+
262+
it("should interrupt concurrently") {
263+
let queue: DispatchQueue
264+
let counter = Atomic<Int>(0)
265+
let executionCounter = Atomic<Int>(0)
266+
267+
if #available(macOS 10.10, *) {
268+
queue = DispatchQueue.global(qos: .userInitiated)
269+
} else {
270+
queue = DispatchQueue.global(priority: .high)
271+
}
272+
273+
let iterations = 100000
274+
let group = DispatchGroup()
275+
276+
queue.async(group: group) {
277+
DispatchQueue.concurrentPerform(iterations: iterations) { _ in
278+
let (signal, observer) = Signal<(), NoError>.pipe()
279+
280+
var isInterrupted = false
281+
signal.observeInterrupted { counter.modify { $0 += 1 } }
282+
283+
let sema = DispatchSemaphore(value: 0)
284+
285+
queue.async(group: group) {
286+
sema.signal()
287+
observer.send(value: ())
288+
executionCounter.modify { $0 += 1 }
289+
}
290+
291+
queue.async(group: group) {
292+
sema.wait()
293+
observer.sendInterrupted()
294+
executionCounter.modify { $0 += 1 }
295+
}
296+
}
297+
}
298+
299+
group.wait()
300+
301+
expect(executionCounter.value) == iterations * 2
302+
expect(counter.value).toEventually(equal(iterations), timeout: 10)
303+
}
304+
}
305+
216306
describe("observe") {
217307
var testScheduler: TestScheduler!
218308

0 commit comments

Comments
 (0)