@@ -213,6 +213,96 @@ class SignalSpec: QuickSpec {
213213 }
214214 }
215215
216+ describe ( " interruption " ) {
217+ it ( " should not send events after sending an interrupted event " ) {
218+ let queue : DispatchQueue
219+ let counter = Atomic < Int > ( 0 )
220+
221+ if #available( macOS 10 . 10 , * ) {
222+ queue = DispatchQueue . global ( qos: . userInitiated)
223+ } else {
224+ queue = DispatchQueue . global ( priority: . high)
225+ }
226+
227+ let ( signal, observer) = Signal < Int , NoError > . pipe ( )
228+
229+ var hasSlept = false
230+ var events = [ Event < Int , NoError > ] ( )
231+
232+ let sema = DispatchSemaphore ( value: 0 )
233+
234+ signal. observe { event in
235+ if !hasSlept {
236+ sema. signal ( )
237+ sleep ( 5 )
238+ hasSlept = true
239+ }
240+ events. append ( event)
241+ }
242+
243+ let group = DispatchGroup ( )
244+
245+ DispatchQueue . concurrentPerform ( iterations: 10 ) { index in
246+ queue. async ( group: group) {
247+ observer. send ( value: index)
248+ }
249+
250+ if index == 0 {
251+ sema. wait ( )
252+ observer. sendInterrupted ( )
253+ }
254+ }
255+
256+ group. wait ( )
257+
258+ expect ( events. count) == 2
259+ expect ( events. count >= 2 ? events [ 1 ] . isTerminating : false ) == true
260+ }
261+
262+ it ( " should interrupt concurrently " ) {
263+ let queue : DispatchQueue
264+ let counter = Atomic < Int > ( 0 )
265+ let executionCounter = Atomic < Int > ( 0 )
266+
267+ if #available( macOS 10 . 10 , * ) {
268+ queue = DispatchQueue . global ( qos: . userInitiated)
269+ } else {
270+ queue = DispatchQueue . global ( priority: . high)
271+ }
272+
273+ let iterations = 100000
274+ let group = DispatchGroup ( )
275+
276+ queue. async ( group: group) {
277+ DispatchQueue . concurrentPerform ( iterations: iterations) { _ in
278+ let ( signal, observer) = Signal < ( ) , NoError > . pipe ( )
279+
280+ var isInterrupted = false
281+ signal. observeInterrupted { counter. modify { $0 += 1 } }
282+
283+ let sema = DispatchSemaphore ( value: 0 )
284+
285+ queue. async ( group: group) {
286+ sema. signal ( )
287+ observer. send ( value: ( ) )
288+ executionCounter. modify { $0 += 1 }
289+ }
290+
291+ queue. async ( group: group) {
292+ sema. wait ( )
293+ observer. sendInterrupted ( )
294+ executionCounter. modify { $0 += 1 }
295+ }
296+ }
297+ }
298+
299+ group. wait ( )
300+
301+ expect ( executionCounter. value) == iterations * 2
302+ expect ( counter. value) . toEventually ( equal ( iterations) , timeout: 10 )
303+ }
304+ }
305+
216306 describe ( " observe " ) {
217307 var testScheduler : TestScheduler !
218308
0 commit comments