@@ -12,111 +12,227 @@ import Combine
1212// MARK: - Operator methods
1313@available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
1414public extension Publisher {
15- /// Merges two publishers into a single publisher by combining each value
16- /// from self with the latest value from the second publisher, if any.
17- ///
18- /// - parameter other: A second publisher source.
19- /// - parameter resultSelector: Function to invoke for each value from the self combined
20- /// with the latest value from the second source, if any.
21- ///
22- /// - returns: A publisher containing the result of combining each value of the self
23- /// with the latest value from the second publisher, if any, using the
24- /// specified result selector function.
25- func withLatestFrom< Other: Publisher , Result> ( _ other: Other ,
26- resultSelector: @escaping ( Output , Other . Output ) -> Result )
27- -> AnyPublisher < Result , Failure >
28- where Other. Failure == Failure {
29- let upstream = share ( )
30-
31- return other
32- . map { second in upstream. map { resultSelector ( $0, second) } }
33- . switchToLatest ( )
34- . zip ( upstream) // `zip`ping and discarding `\.1` allows for
35- // upstream completions to be projected down immediately.
36- . map ( \. 0 )
37- . eraseToAnyPublisher ( )
38- }
15+ /// Merges two publishers into a single publisher by combining each value
16+ /// from self with the latest value from the second publisher, if any.
17+ ///
18+ /// - parameter other: A second publisher source.
19+ /// - parameter resultSelector: Function to invoke for each value from the self combined
20+ /// with the latest value from the second source, if any.
21+ ///
22+ /// - returns: A publisher containing the result of combining each value of the self
23+ /// with the latest value from the second publisher, if any, using the
24+ /// specified result selector function.
25+ func withLatestFrom< Other: Publisher , Result> ( _ other: Other ,
26+ resultSelector: @escaping ( Output , Other . Output ) -> Result )
27+ -> Publishers . WithLatestFrom < Self , Other , Result > {
28+ return . init( upstream: self , second: other, resultSelector: resultSelector)
29+ }
3930
40- /// Merges three publishers into a single publisher by combining each value
41- /// from self with the latest value from the second and third publisher, if any.
42- ///
43- /// - parameter other: A second publisher source.
44- /// - parameter other1: A third publisher source.
45- /// - parameter resultSelector: Function to invoke for each value from the self combined
46- /// with the latest value from the second and third source, if any.
47- ///
48- /// - returns: A publisher containing the result of combining each value of the self
49- /// with the latest value from the second and third publisher, if any, using the
50- /// specified result selector function.
51- func withLatestFrom< Other: Publisher , Other1: Publisher , Result> ( _ other: Other ,
52- _ other1: Other1 ,
53- resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output ) ) -> Result )
54- -> AnyPublisher < Result , Failure >
31+ /// Merges three publishers into a single publisher by combining each value
32+ /// from self with the latest value from the second and third publisher, if any.
33+ ///
34+ /// - parameter other: A second publisher source.
35+ /// - parameter other1: A third publisher source.
36+ /// - parameter resultSelector: Function to invoke for each value from the self combined
37+ /// with the latest value from the second and third source, if any.
38+ ///
39+ /// - returns: A publisher containing the result of combining each value of the self
40+ /// with the latest value from the second and third publisher, if any, using the
41+ /// specified result selector function.
42+ func withLatestFrom< Other: Publisher , Other1: Publisher , Result> ( _ other: Other ,
43+ _ other1: Other1 ,
44+ resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output ) ) -> Result )
45+ -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output ) , Self . Failure > , Result >
5546 where Other. Failure == Failure , Other1. Failure == Failure {
56- withLatestFrom ( other. combineLatest ( other1) , resultSelector: resultSelector)
57- }
47+ let combined = other. combineLatest ( other1)
48+ . eraseToAnyPublisher ( )
49+ return . init( upstream: self , second: combined, resultSelector: resultSelector)
50+ }
5851
59- /// Merges four publishers into a single publisher by combining each value
60- /// from self with the latest value from the second, third and fourth publisher, if any.
61- ///
62- /// - parameter other: A second publisher source.
63- /// - parameter other1: A third publisher source.
64- /// - parameter other2: A fourth publisher source.
65- /// - parameter resultSelector: Function to invoke for each value from the self combined
66- /// with the latest value from the second, third and fourth source, if any.
67- ///
68- /// - returns: A publisher containing the result of combining each value of the self
69- /// with the latest value from the second, third and fourth publisher, if any, using the
70- /// specified result selector function.
71- func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher , Result> ( _ other: Other ,
72- _ other1: Other1 ,
73- _ other2: Other2 ,
74- resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output , Other2 . Output ) ) -> Result )
75- -> AnyPublisher < Result , Failure >
52+ /// Merges four publishers into a single publisher by combining each value
53+ /// from self with the latest value from the second, third and fourth publisher, if any.
54+ ///
55+ /// - parameter other: A second publisher source.
56+ /// - parameter other1: A third publisher source.
57+ /// - parameter other2: A fourth publisher source.
58+ /// - parameter resultSelector: Function to invoke for each value from the self combined
59+ /// with the latest value from the second, third and fourth source, if any.
60+ ///
61+ /// - returns: A publisher containing the result of combining each value of the self
62+ /// with the latest value from the second, third and fourth publisher, if any, using the
63+ /// specified result selector function.
64+ func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher , Result> ( _ other: Other ,
65+ _ other1: Other1 ,
66+ _ other2: Other2 ,
67+ resultSelector: @escaping ( Output , ( Other . Output , Other1 . Output , Other2 . Output ) ) -> Result )
68+ -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output , Other2 . Output ) , Self . Failure > , Result >
7669 where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
77- withLatestFrom ( other. combineLatest ( other1, other2) , resultSelector: resultSelector)
70+ let combined = other. combineLatest ( other1, other2)
71+ . eraseToAnyPublisher ( )
72+ return . init( upstream: self , second: combined, resultSelector: resultSelector)
73+ }
74+
75+ /// Upon an emission from self, emit the latest value from the
76+ /// second publisher, if any exists.
77+ ///
78+ /// - parameter other: A second publisher source.
79+ ///
80+ /// - returns: A publisher containing the latest value from the second publisher, if any.
81+ func withLatestFrom< Other: Publisher > ( _ other: Other )
82+ -> Publishers . WithLatestFrom < Self , Other , Other . Output > {
83+ return . init( upstream: self , second: other) { $1 }
84+ }
85+
86+ /// Upon an emission from self, emit the latest value from the
87+ /// second and third publisher, if any exists.
88+ ///
89+ /// - parameter other: A second publisher source.
90+ /// - parameter other1: A third publisher source.
91+ ///
92+ /// - returns: A publisher containing the latest value from the second and third publisher, if any.
93+ func withLatestFrom< Other: Publisher , Other1: Publisher > ( _ other: Other ,
94+ _ other1: Other1 )
95+ -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output ) , Self . Failure > , ( Other . Output , Other1 . Output ) >
96+ where Other. Failure == Failure , Other1. Failure == Failure {
97+ withLatestFrom ( other, other1) { $1 }
98+ }
99+
100+ /// Upon an emission from self, emit the latest value from the
101+ /// second, third and forth publisher, if any exists.
102+ ///
103+ /// - parameter other: A second publisher source.
104+ /// - parameter other1: A third publisher source.
105+ /// - parameter other2: A forth publisher source.
106+ ///
107+ /// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
108+ func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher > ( _ other: Other ,
109+ _ other1: Other1 ,
110+ _ other2: Other2 )
111+ -> Publishers . WithLatestFrom < Self , AnyPublisher < ( Other . Output , Other1 . Output , Other2 . Output ) , Self . Failure > , ( Other . Output , Other1 . Output , Other2 . Output ) >
112+ where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
113+ withLatestFrom ( other, other1, other2) { $1 }
114+ }
115+ }
116+
117+ // MARK: - Publisher
118+ @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
119+ public extension Publishers {
120+ struct WithLatestFrom < Upstream: Publisher ,
121+ Other: Publisher ,
122+ Output> : Publisher where Upstream. Failure == Other . Failure {
123+ public typealias Failure = Upstream . Failure
124+ public typealias ResultSelector = ( Upstream . Output , Other . Output ) -> Output
125+
126+ private let upstream : Upstream
127+ private let second : Other
128+ private let resultSelector : ResultSelector
129+ private var latestValue : Other . Output ?
130+
131+ init ( upstream: Upstream ,
132+ second: Other ,
133+ resultSelector: @escaping ResultSelector ) {
134+ self . upstream = upstream
135+ self . second = second
136+ self . resultSelector = resultSelector
78137 }
79138
80- /// Upon an emission from self, emit the latest value from the
81- /// second publisher, if any exists.
82- ///
83- /// - parameter other: A second publisher source.
84- ///
85- /// - returns: A publisher containing the latest value from the second publisher, if any.
86- func withLatestFrom< Other: Publisher > ( _ other: Other )
87- -> AnyPublisher < Other . Output , Failure >
88- where Other. Failure == Failure {
89- withLatestFrom ( other) { $1 }
139+ public func receive< S: Subscriber > ( subscriber: S ) where Failure == S . Failure , Output == S . Input {
140+ subscriber. receive ( subscription: Subscription ( upstream: upstream,
141+ downstream: subscriber,
142+ second: second,
143+ resultSelector: resultSelector) )
90144 }
145+ }
146+ }
91147
92- /// Upon an emission from self, emit the latest value from the
93- /// second and third publisher, if any exists.
94- ///
95- /// - parameter other: A second publisher source.
96- /// - parameter other1: A third publisher source.
97- ///
98- /// - returns: A publisher containing the latest value from the second and third publisher, if any.
99- func withLatestFrom< Other: Publisher , Other1: Publisher > ( _ other: Other ,
100- _ other1: Other1 )
101- -> AnyPublisher < ( Other . Output , Other1 . Output ) , Failure >
102- where Other. Failure == Failure , Other1. Failure == Failure {
103- withLatestFrom ( other, other1) { $1 }
148+ // MARK: - Subscription
149+ @available ( OSX 10 . 15 , iOS 13 . 0 , tvOS 13 . 0 , watchOS 6 . 0 , * )
150+ private extension Publishers . WithLatestFrom {
151+ class Subscription < Downstream: Subscriber > : Combine . Subscription , CustomStringConvertible where Downstream. Input == Output , Downstream. Failure == Failure {
152+ private let resultSelector : ResultSelector
153+ private var sink : Sink < Upstream , Downstream > ?
154+
155+ private let upstream : Upstream
156+ private let downstream : Downstream
157+ private let second : Other
158+
159+ // Secondary (other) publisher
160+ private var latestValue : Other . Output ?
161+ private var otherSubscription : Cancellable ?
162+ private var preInitialDemand = Subscribers . Demand. none
163+
164+ init ( upstream: Upstream ,
165+ downstream: Downstream ,
166+ second: Other ,
167+ resultSelector: @escaping ResultSelector ) {
168+ self . upstream = upstream
169+ self . second = second
170+ self . downstream = downstream
171+ self . resultSelector = resultSelector
172+
173+ trackLatestFromSecond { [ weak self] in
174+ guard let self = self else { return }
175+ self . request ( self . preInitialDemand)
176+ self . preInitialDemand = . none
177+ }
104178 }
105179
106- /// Upon an emission from self, emit the latest value from the
107- /// second, third and forth publisher, if any exists.
108- ///
109- /// - parameter other: A second publisher source.
110- /// - parameter other1: A third publisher source.
111- /// - parameter other2: A forth publisher source.
112- ///
113- /// - returns: A publisher containing the latest value from the second, third and forth publisher, if any.
114- func withLatestFrom< Other: Publisher , Other1: Publisher , Other2: Publisher > ( _ other: Other ,
115- _ other1: Other1 ,
116- _ other2: Other2 )
117- -> AnyPublisher < ( Other . Output , Other1 . Output , Other2 . Output ) , Failure >
118- where Other. Failure == Failure , Other1. Failure == Failure , Other2. Failure == Failure {
119- withLatestFrom ( other, other1, other2) { $1 }
180+ func request( _ demand: Subscribers . Demand ) {
181+ guard latestValue != nil else {
182+ preInitialDemand += demand
183+ return
184+ }
185+
186+ self . sink? . demand ( demand)
187+ }
188+
189+ // Create an internal subscription to the `Other` publisher,
190+ // constantly tracking its latest value
191+ private func trackLatestFromSecond( onInitialValue: @escaping ( ) -> Void ) {
192+ var gotInitialValue = false
193+
194+ let subscriber = AnySubscriber < Other . Output , Other . Failure > (
195+ receiveSubscription: { [ weak self] subscription in
196+ self ? . otherSubscription = subscription
197+ subscription. request ( . unlimited)
198+ } ,
199+ receiveValue: { [ weak self] value in
200+ guard let self = self else { return . none }
201+ self . latestValue = value
202+
203+ if !gotInitialValue {
204+ // When getting initial value, start pulling values
205+ // from upstream in the main sink
206+ self . sink = Sink ( upstream: self . upstream,
207+ downstream: self . downstream,
208+ transformOutput: { [ weak self] value in
209+ guard let self = self ,
210+ let other = self . latestValue else { return nil }
211+
212+ return self . resultSelector ( value, other)
213+ } ,
214+ transformFailure: { $0 } )
215+
216+ // Signal initial value to start fulfilling downstream demand
217+ gotInitialValue = true
218+ onInitialValue ( )
219+ }
220+
221+ return . unlimited
222+ } ,
223+ receiveCompletion: nil )
224+
225+ self . second. subscribe ( subscriber)
226+ }
227+
228+ var description : String {
229+ return " WithLatestFrom.Subscription< \( Output . self) , \( Failure . self) > "
230+ }
231+
232+ func cancel( ) {
233+ sink = nil
234+ otherSubscription? . cancel ( )
120235 }
236+ }
121237}
122238#endif
0 commit comments