diff --git a/RxSwift/Observables/Multicast.swift b/RxSwift/Observables/Multicast.swift index 1ebcf0212..ae1eb6033 100644 --- a/RxSwift/Observables/Multicast.swift +++ b/RxSwift/Observables/Multicast.swift @@ -108,6 +108,20 @@ extension ConnectableObservableType { public func refCount() -> Observable { RefCount(source: self) } + + /** + Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + Waits the given amount of time on the given scheduler after the last subscription has been disposed before disconnecting from the source. + + - seealso: [refCount operator on reactivex.io](http://reactivex.io/documentation/operators/refcount.html) + + - parameter timeout: Time interval to wait after the last subscription to the result is disposed before disconnecting from the source. + - parameter scheduler: Scheduler to run the timeout timer on. + - returns: An observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence. + */ + public func refCount(timeout: RxTimeInterval, scheduler: SchedulerType) -> Observable { + RefCount(source: self, timeout: RefCount.TimeoutConfig(interval: timeout, scheduler: scheduler)) + } } extension ObservableType { @@ -270,6 +284,8 @@ final private class RefCountSink 1 { self.parent.count -= 1 @@ -308,10 +338,12 @@ final private class RefCountSink: Producer { + struct TimeoutConfig { + let interval: RxTimeInterval + let scheduler: SchedulerType + } + fileprivate let lock = RecursiveLock() // state fileprivate var count = 0 fileprivate var connectionId: Int64 = 0 fileprivate var connectableSubscription = nil as Disposable? + fileprivate var timeoutSubscription = nil as Disposable? fileprivate let source: ConnectableSource + fileprivate let timeout: TimeoutConfig? - init(source: ConnectableSource) { + init(source: ConnectableSource, timeout: TimeoutConfig? = nil) { self.source = source + self.timeout = timeout } override func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) diff --git a/Tests/RxSwiftTests/Observable+MulticastTests.swift b/Tests/RxSwiftTests/Observable+MulticastTests.swift index ccf2f437b..dd03d8eab 100644 --- a/Tests/RxSwiftTests/Observable+MulticastTests.swift +++ b/Tests/RxSwiftTests/Observable+MulticastTests.swift @@ -1269,6 +1269,105 @@ extension ObservableMulticastTest { ]) } + func testRefCount_timeoutKeepsConnectionAlive() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + .next(210, 1), + .next(220, 2), + .next(230, 3), + .next(240, 4), + .next(250, 5), + .next(260, 6), + .next(270, 7), + .next(280, 8), + .next(290, 9), + .completed(300) + ]) + + let res = scheduler.start(disposed: 245) { + xs.publish().refCount(timeout: .seconds(50), scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + .next(210, 1), + .next(220, 2), + .next(230, 3), + .next(240, 4) + ]) + + XCTAssertEqual(xs.subscriptions, [Subscription(200, 295)]) + } + + func testRefCount_timeoutIsCancelledOnceAnotherSubscriberIsAdded() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + .next(210, 1), + .next(220, 2), + .next(230, 3), + .next(240, 4), + .next(250, 5), + .next(260, 6), + .next(270, 7), + .next(280, 8), + .next(290, 9), + .completed(300) + ]) + + let refCounted = xs.publish().refCount(timeout: .seconds(50), scheduler: scheduler) + + let dis1 = refCounted.subscribe() + scheduler.scheduleAt(200, action: dis1.dispose) + + let res = scheduler.start { + refCounted + } + + XCTAssertEqual(res.events, [ + .next(210, 1), + .next(220, 2), + .next(230, 3), + .next(240, 4), + .next(250, 5), + .next(260, 6), + .next(270, 7), + .next(280, 8), + .next(290, 9), + .completed(300) + ]) + + XCTAssertEqual(xs.subscriptions, [Subscription(0, 300)]) + } + + func testRefCount_timeoutNotUsedWhenOneOfTwoSubscribersDisconnects() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + .next(210, 1), + .next(220, 2), + .next(230, 3), + .next(240, 4), + .next(250, 5), + .next(260, 6), + .next(270, 7), + .next(280, 8), + .next(290, 9), + .completed(300) + ]) + + let refCounted = xs.publish().refCount(timeout: .seconds(50), scheduler: scheduler) + + let dis1 = refCounted.subscribe() + defer { dis1.dispose() } + + _ = scheduler.start(disposed: 240) { + refCounted + } + + XCTAssertEqual(xs.subscriptions, [Subscription(0, 300)]) + } + #if TRACE_RESOURCES func testRefCountReleasesResourcesOnComplete() { _ = Observable.just(1).publish().refCount().subscribe()