Skip to content

Commit 21a1e99

Browse files
inamiyandersio
authored andcommitted
Add FlatMapStrategy.throttle (#713)
* Add `FlatMapStrategy.first` * Add doc-comments on `FlattenStrategy.first` * [Test] Remove `fdescribe` * Update CHANGELOG.md * Update Tests/ReactiveSwiftTests/SignalProducerSpec.swift Co-Authored-By: Anders Ha <[email protected]> * Rename to `FlattenStrategy.throttle` * Update CHANGELOG.md
1 parent 430dca6 commit 21a1e99

File tree

3 files changed

+266
-3
lines changed

3 files changed

+266
-3
lines changed

CHANGELOG.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# master
22
*Please add new entries at the top.*
33

4+
1. `FlattenStrategy.throttle` is introduced. (#713, kudos to @inamiy)
45
1. Updated `README.md` to reflect Swift 5.1 compatibility and point snippets to 6.1.0 (#763, kudos to @Marcocanc)
56
1. Update travis to Xcode 11.1 and Swift 5.1 (#764, kudos @petrpavlik)
6-
2. [SwiftPM] Add platforms (#761, kudos to @ikesyo)
7-
3. Renamed `filterMap` to `compactMap` and deprecated `filterMap` (#746, kudos to @Marcocanc)
7+
1. [SwiftPM] Add platforms (#761, kudos to @ikesyo)
8+
1. Renamed `filterMap` to `compactMap` and deprecated `filterMap` (#746, kudos to @Marcocanc)
89

910
# 6.1.0
1011

@@ -25,7 +26,6 @@
2526
* Replace all cases where `NoError` was used in a `Signal` or `SignalProducer` with `Never`
2627
* Replace all cases where `AnyError` was used in a `Signal` or `SignalProducer` with `Swift.Error`
2728

28-
2929
# 5.0.1
3030
1. Fix warnings in Xcode 10.2
3131

Sources/Flatten.swift

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public struct FlattenStrategy {
1212
case concurrent(limit: UInt)
1313
case latest
1414
case race
15+
case throttle
1516
}
1617

1718
fileprivate let kind: Kind
@@ -96,6 +97,21 @@ public struct FlattenStrategy {
9697
/// Any failure from the inner streams is propagated immediately to the flattened
9798
/// stream of values.
9899
public static let race = FlattenStrategy(kind: .race)
100+
101+
/// Forward only events from the "first inner stream" that sends an event if not exists.
102+
/// Other inner streams is disposed of until the first inner stream is completed.
103+
/// Note that next inner stream after previous completion can become
104+
/// first inner stream again.
105+
///
106+
/// The flattened stream of values completes only when the stream of streams has completed,
107+
/// and first inner stream has completed if exists.
108+
///
109+
/// Any interruption of inner streams is propagated immediately to the flattened
110+
/// stream of values.
111+
///
112+
/// Any failure from the inner streams is propagated immediately to the flattened
113+
/// stream of values.
114+
public static let throttle = FlattenStrategy(kind: .throttle)
99115
}
100116

101117
extension Signal where Value: SignalProducerConvertible, Error == Value.Error {
@@ -120,6 +136,9 @@ extension Signal where Value: SignalProducerConvertible, Error == Value.Error {
120136

121137
case .race:
122138
return self.race()
139+
140+
case .throttle:
141+
return self.throttle()
123142
}
124143
}
125144
}
@@ -162,6 +181,9 @@ extension Signal where Value: SignalProducerConvertible, Error == Never, Value.E
162181

163182
case .race:
164183
return self.race()
184+
185+
case .throttle:
186+
return self.throttle()
165187
}
166188
}
167189
}
@@ -205,6 +227,9 @@ extension SignalProducer where Value: SignalProducerConvertible, Error == Value.
205227

206228
case .race:
207229
return self.race()
230+
231+
case .throttle:
232+
return self.throttle()
208233
}
209234
}
210235
}
@@ -247,6 +272,9 @@ extension SignalProducer where Value: SignalProducerConvertible, Error == Never,
247272

248273
case .race:
249274
return self.race()
275+
276+
case .throttle:
277+
return self.throttle()
250278
}
251279
}
252280
}
@@ -819,6 +847,112 @@ private struct RaceState {
819847
var isActivated = false
820848
}
821849

850+
extension Signal where Value: SignalProducerConvertible, Error == Value.Error {
851+
/// Returns a signal that forwards values from the "first inner producer" if not exists,
852+
/// ignoring values sent from other inner producers until first inner producer is completed.
853+
/// Note that next inner producer after previous completion can become
854+
/// first inner producer again.
855+
///
856+
/// An error sent on `self` or the first inner producer will be sent on the
857+
/// returned signal.
858+
///
859+
/// The returned signal completes when `self` is completed, and also first inner producer
860+
/// is completed if it exists.
861+
fileprivate func throttle() -> Signal<Value.Value, Error> {
862+
return Signal<Value.Value, Error> { observer, lifetime in
863+
let relayDisposable = CompositeDisposable()
864+
lifetime += relayDisposable
865+
lifetime += self.observeThrottle(observer, relayDisposable)
866+
}
867+
}
868+
869+
fileprivate func observeThrottle(_ observer: Signal<Value.Value, Error>.Observer, _ relayDisposable: CompositeDisposable) -> Disposable? {
870+
let state = Atomic(ThrottleState())
871+
872+
return self.observe { event in
873+
switch event {
874+
case let .value(innerProducer):
875+
let isFirstInnerProducer: Bool = state.modify { state in
876+
guard !state.hasFirstInnerProducer else {
877+
return false
878+
}
879+
880+
state.hasFirstInnerProducer = true
881+
return true
882+
}
883+
884+
// Ignore consecutive `innerProducer`s while `isFirstInnerProducer` is true.
885+
guard isFirstInnerProducer else { return }
886+
887+
innerProducer.producer.startWithSignal { innerSignal, innerDisposable in
888+
relayDisposable.add(innerDisposable)
889+
890+
innerSignal.observe { event in
891+
switch event {
892+
case .completed:
893+
let shouldComplete: Bool = state.modify { state in
894+
state.hasFirstInnerProducer = false
895+
return state.outerSignalComplete
896+
}
897+
898+
if shouldComplete {
899+
observer.sendCompleted()
900+
}
901+
902+
case .value, .failed, .interrupted:
903+
observer.send(event)
904+
}
905+
}
906+
}
907+
908+
case let .failed(error):
909+
observer.send(error: error)
910+
911+
case .completed:
912+
let shouldComplete: Bool = state.modify { state in
913+
state.outerSignalComplete = true
914+
return !state.hasFirstInnerProducer
915+
}
916+
917+
if shouldComplete {
918+
observer.sendCompleted()
919+
}
920+
921+
case .interrupted:
922+
observer.sendInterrupted()
923+
}
924+
}
925+
}
926+
}
927+
928+
extension SignalProducer where Value: SignalProducerConvertible, Error == Value.Error {
929+
/// Returns a producer that forwards values from the "first inner producer" if not exists,
930+
/// ignoring values sent from other inner producers until first inner producer is completed.
931+
/// Note that next inner producer after previous completion can become first inner producer again.
932+
///
933+
/// An error sent on `self` or the first inner producer will be sent on the
934+
/// returned producer.
935+
///
936+
/// The returned signal completes when `self` is completed, and also first inner producer
937+
/// is completed if it exists.
938+
fileprivate func throttle() -> SignalProducer<Value.Value, Error> {
939+
return SignalProducer<Value.Value, Error> { observer, lifetime in
940+
let relayDisposable = CompositeDisposable()
941+
lifetime += relayDisposable
942+
943+
self.startWithSignal { signal, signalDisposable in
944+
lifetime += signalDisposable
945+
lifetime += signal.observeThrottle(observer, relayDisposable)
946+
}
947+
}
948+
}
949+
}
950+
951+
private struct ThrottleState {
952+
var outerSignalComplete = false
953+
var hasFirstInnerProducer = false
954+
}
955+
822956
extension Signal {
823957
/// Maps each event from `signal` to a new signal, then flattens the
824958
/// resulting producers (into a signal of values), according to the

Tests/ReactiveSwiftTests/SignalProducerSpec.swift

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1781,6 +1781,135 @@ class SignalProducerSpec: QuickSpec {
17811781
}
17821782
}
17831783

1784+
describe("FlattenStrategy.throttle") {
1785+
it("should forward values from the first and third inner producer to send an event") {
1786+
let (outer, outerObserver) = SignalProducer<SignalProducer<Int, TestError>, TestError>.pipe()
1787+
let (firstInner, firstInnerObserver) = SignalProducer<Int, TestError>.pipe()
1788+
let (secondInner, secondInnerObserver) = SignalProducer<Int, TestError>.pipe()
1789+
let (thirdInner, thirdInnerObserver) = SignalProducer<Int, TestError>.pipe()
1790+
1791+
var receivedValues: [Int] = []
1792+
var errored = false
1793+
var completed = false
1794+
1795+
outer.flatten(.throttle).start { event in
1796+
switch event {
1797+
case let .value(value):
1798+
receivedValues.append(value)
1799+
case .completed:
1800+
completed = true
1801+
case .failed:
1802+
errored = true
1803+
case .interrupted:
1804+
break
1805+
}
1806+
}
1807+
1808+
outerObserver.send(value: firstInner)
1809+
outerObserver.send(value: secondInner)
1810+
1811+
firstInnerObserver.send(value: 1)
1812+
secondInnerObserver.send(value: 2)
1813+
1814+
expect(receivedValues) == [ 1 ]
1815+
expect(errored) == false
1816+
expect(completed) == false
1817+
1818+
secondInnerObserver.send(value: 3)
1819+
secondInnerObserver.sendCompleted()
1820+
1821+
expect(receivedValues) == [ 1 ]
1822+
expect(errored) == false
1823+
expect(completed) == false
1824+
1825+
firstInnerObserver.sendCompleted()
1826+
1827+
expect(receivedValues) == [ 1 ]
1828+
expect(errored) == false
1829+
expect(completed) == false
1830+
1831+
outerObserver.send(value: thirdInner)
1832+
thirdInnerObserver.send(value: 4)
1833+
1834+
// NOTE:
1835+
// `4` will be observed because `firstInner` is completed then `thirdInner` is emitted,
1836+
// which is also considered as "first" producer.
1837+
expect(receivedValues) == [ 1, 4 ]
1838+
expect(errored) == false
1839+
expect(completed) == false
1840+
1841+
outerObserver.sendCompleted()
1842+
1843+
expect(receivedValues) == [ 1, 4 ]
1844+
expect(errored) == false
1845+
expect(completed) == false
1846+
1847+
thirdInnerObserver.sendCompleted()
1848+
1849+
expect(receivedValues) == [ 1, 4 ]
1850+
expect(errored) == false
1851+
expect(completed) == true
1852+
}
1853+
1854+
it("should forward an error from the first inner producer to send an error") {
1855+
let inner = SignalProducer<Int, TestError>(error: .default)
1856+
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: inner)
1857+
1858+
let result = outer.flatten(.throttle).first()
1859+
expect(result?.error) == TestError.default
1860+
}
1861+
1862+
it("should forward an error from the outer producer") {
1863+
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(error: .default)
1864+
1865+
let result = outer.flatten(.throttle).first()
1866+
expect(result?.error) == TestError.default
1867+
}
1868+
1869+
it("should complete when the 'outer producer' and 'first inner producer to send an event' have completed") {
1870+
let inner = SignalProducer<Int, TestError>.empty
1871+
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: inner)
1872+
1873+
var completed = false
1874+
outer.flatten(.throttle).startWithCompleted {
1875+
completed = true
1876+
}
1877+
1878+
expect(completed) == true
1879+
}
1880+
1881+
it("should complete when the outer producer completes before sending any inner producers") {
1882+
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>.empty
1883+
1884+
var completed = false
1885+
outer.flatten(.throttle).startWithCompleted {
1886+
completed = true
1887+
}
1888+
1889+
expect(completed) == true
1890+
}
1891+
1892+
it("should not complete when the outer producer completes after sending an inner producer but it doesn't send an event") {
1893+
let inner = SignalProducer<Int, TestError>.never
1894+
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: inner)
1895+
1896+
var completed = false
1897+
outer.flatten(.throttle).startWithCompleted {
1898+
completed = true
1899+
}
1900+
1901+
expect(completed) == false
1902+
}
1903+
1904+
it("should not deadlock") {
1905+
let producer = SignalProducer<Int, Never>(value: 1)
1906+
.flatMap(.throttle) { _ in SignalProducer(value: 10) }
1907+
1908+
let result = producer.take(first: 1).last()
1909+
expect(result?.value) == 10
1910+
}
1911+
}
1912+
17841913
describe("interruption") {
17851914
var innerObserver: Signal<(), Never>.Observer!
17861915
var outerObserver: Signal<SignalProducer<(), Never>, Never>.Observer!

0 commit comments

Comments
 (0)