Skip to content

Commit 8070bcb

Browse files
matthewvalentinefreak4pc
authored andcommitted
Make pausableBuffered robust to reentrancy (#159)
1 parent 841edc2 commit 8070bcb

File tree

2 files changed

+103
-6
lines changed

2 files changed

+103
-6
lines changed

Source/RxSwift/pausableBuffered.swift

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,24 +35,29 @@ extension ObservableType {
3535
}
3636

3737
var paused = true
38+
var flushIndex = 0
3839
let lock = NSRecursiveLock()
3940

4041
let flush = {
41-
for value in buffer {
42-
observer.onNext(value)
42+
while flushIndex < buffer.count {
43+
flushIndex += 1
44+
observer.onNext(buffer[flushIndex - 1])
45+
}
46+
if buffer.count > 0 {
47+
flushIndex = 0
48+
buffer.removeAll(keepingCapacity: limit != nil)
4349
}
44-
buffer.removeAll(keepingCapacity: limit != nil)
4550
}
4651

47-
let boundaryDisposable = pauser.subscribe { event in
52+
let boundaryDisposable = pauser.distinctUntilChanged(==).subscribe { event in
4853
lock.lock(); defer { lock.unlock() }
4954
switch event {
5055
case .next(let resume):
51-
paused = !resume
52-
5356
if resume && buffer.count > 0 {
5457
flush()
5558
}
59+
paused = !resume
60+
5661
case .completed:
5762
observer.onCompleted()
5863
case .error(let error):

Tests/RxSwift/pausableBufferedTests.swift

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,96 @@ class PausableBufferedTests: XCTestCase {
184184
Subscription(200, 230)
185185
])
186186
}
187+
188+
func testPausedReentrantPauser() {
189+
let underlying = scheduler.createHotObservable([
190+
next(110, 1),
191+
next(210, 2),
192+
next(310, 3),
193+
next(410, 4),
194+
completed(500),
195+
])
196+
197+
let pauser = scheduler.createHotObservable([
198+
next(301, true),
199+
])
200+
201+
let res = scheduler.start(disposed: 1000) { () -> Observable<Int> in
202+
return Observable.create { observer in
203+
let pauserSubject = PublishSubject<Bool>()
204+
let allPausers = Observable.merge(
205+
pauser.asObservable(),
206+
pauserSubject.asObservable())
207+
208+
let buffered = underlying
209+
.pausableBuffered(allPausers, limit: nil)
210+
.share()
211+
212+
let pauserEcho = buffered
213+
.take(2)
214+
.map { _ in true }
215+
.bind(to: pauserSubject)
216+
217+
return Disposables.create(pauserEcho, buffered.subscribe(observer))
218+
}
219+
}
220+
221+
XCTAssertEqual(res.events, [
222+
next(301, 2),
223+
next(310, 3),
224+
next(410, 4),
225+
completed(500),
226+
])
227+
228+
XCTAssertEqual(underlying.subscriptions, [
229+
Subscription(200, 500),
230+
])
231+
}
232+
233+
func testPausedReentrantUnderlying() {
234+
let underlying = scheduler.createHotObservable([
235+
next(210, 1),
236+
next(210, 2),
237+
next(210, 3),
238+
next(310, 4),
239+
completed(500),
240+
])
241+
242+
let pauser = scheduler.createHotObservable([
243+
next(301, true),
244+
])
245+
246+
let res = scheduler.start(disposed: 1000) { () -> Observable<Int> in
247+
return Observable.create { observer in
248+
let underlyingSubject = PublishSubject<Int>()
249+
let allUnderlying = Observable.merge(
250+
underlying.asObservable(),
251+
underlyingSubject.asObservable())
252+
253+
let buffered = allUnderlying
254+
.pausableBuffered(pauser, limit: nil)
255+
.share()
256+
257+
let underlyingEcho = buffered
258+
.take(2)
259+
.bind(to: underlyingSubject)
260+
261+
return Disposables.create(underlyingEcho, buffered.subscribe(observer))
262+
}
263+
}
264+
265+
XCTAssertEqual(res.events, [
266+
next(301, 1),
267+
next(301, 2),
268+
next(301, 3),
269+
next(301, 1),
270+
next(301, 2),
271+
next(310, 4),
272+
completed(500),
273+
])
274+
275+
XCTAssertEqual(underlying.subscriptions, [
276+
Subscription(200, 500),
277+
])
278+
}
187279
}

0 commit comments

Comments
 (0)