33// (c) CombineOpenSource, Created by Marin Todorov.
44//
55
6+ /*
7+ Data flow in BatchesDataSource:
8+ Dashed boxes represent the inputs provided to `BatchesDataSource.init(...)`.
9+ Single line boxes are the intermediate publishers.
10+ Double line boxes are the published outputs.
11+
12+ ┌──────────────────────┐ ╔════════════════════╗
13+ ┌──────────────────────▶│ itemsSubject │──────────────────▶║ Output.$items ║◀───┐
14+ │ └──────────────────────┘ ╚════════════════════╝ │
15+ │ ╔════════════════════╗ │
16+ │ ┌──────────────────────┬──────────────────▶║ Output.$isLoading ║ │
17+ │ │ │ ╚════════════════════╝ │
18+ │ │ │ │
19+ │ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
20+ ┌──────────────┐ │ │ │ │ │ │ │ │
21+ ┌─┬──▶│ reload │──┬──▶│ batchRequest │─▶│ batchResponse │─▶│ successResponse │─▶│ result │
22+ │ │ └──────────────┘ │ │ │ │ │ │ │ │ │
23+ │ │ │ └───────────────────┘ └───────────────────┘ └───────────────────┘ └───────────────────┘
24+ │ │ ┌──────────────┐ ▲ │ │ │
25+ │ │ │ loadNext │ └───────┐ │ │ │
26+ │ │ └──────────────┘ │ │ ┌─────┘ │
27+ │ │ ▲ │ │ │ │
28+ │ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ ╔════════════════════╗ │
29+ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ │ loadNextBatch() │ │ └─▶║Output.$isCompleted ║ │
30+ │ └── initialToken │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ╚════════════════════╝ │
31+ │ └ ─ ─ ─ ─ ─ ─ ─ │ │ ╔════════════════════╗ │
32+ │ │ └──────────────────▶║ Output.$error ║ │
33+ │ ┌ ─ ─ ─ ─ ─ ─ ─ │ ╚════════════════════╝ │
34+ └── items │ │ ┌──────────────────┐ │
35+ └ ─ ─ ─ ─ ─ ─ ─ └─────────────────────────────│ token │◀────────────────────────────────┘
36+ └──────────────────┘
37+ */
38+
639import Foundation
740import Combine
841
@@ -46,83 +79,67 @@ public struct BatchesDataSource<Element> {
4679
4780 /// The result of loading of a batch of items.
4881 public enum LoadResult {
49- /// A batch of `Element` items.
82+ /// A batch of `Element` items to use with pages .
5083 case items( [ Element ] )
5184
5285 /// A batch of `Element` items and a token to provide
5386 /// to the loader in order to fetch the next batch.
54- case itemsToken( [ Element ] , nextToken: Data )
87+ case itemsToken( [ Element ] , nextToken: Data ? )
5588
5689 /// No more items available to fetch.
5790 case completed
5891 }
5992
6093 enum ResponseResult {
61- case result( ( token: Data ? , result: BatchesDataSource < Element > . LoadResult ) )
94+ case result( ( token: Token , result: BatchesDataSource < Element > . LoadResult ) )
6295 case error( Error )
6396 }
6497
65- /// Initializes a list data source using a token to fetch batches of items.
66- /// - Parameter items: initial list of items.
67- /// - Parameter input: the input to control the data source.
68- /// - Parameter initialToken: the token to use to fetch the first batch.
69- /// - Parameter loadItemsWithToken: a `(Data?) -> (Publisher<LoadResult, Error>)` closure that fetches a batch of items and returns the items fetched
70- /// plus a token to use for the next batch. The token can be an alphanumerical id, a URL, or another type of token.
71- /// - Todo: if `withLatestFrom` is introduced, use it instead of grabbing the latest value unsafely.
72- public init ( items: [ Element ] = [ ] , input: BatchesInput , initialToken: Data ? , loadItemsWithToken: @escaping ( Data ? ) -> AnyPublisher < LoadResult , Error > ) {
98+ enum Token {
99+ case int( Int )
100+ case data( Data ? )
101+ }
102+
103+ private init ( items: [ Element ] = [ ] , input: BatchesInput , initial: Token , loadNextCallback: @escaping ( Token ) -> AnyPublisher < LoadResult , Error > ) {
73104 let itemsSubject = CurrentValueSubject < [ Element ] , Never > ( items)
74- let token = CurrentValueSubject < Data ? , Never > ( initialToken )
105+ let token = CurrentValueSubject < Token , Never > ( initial )
75106
76107 self . input = input
77108 let output = self . output
78109
79- let reload = input. reload
80- . share ( )
81-
82- reload
83- . map { _ in
84- return items
85- }
110+ input. reload
111+ . map { _ in items }
112+ . append ( Empty ( completeImmediately: false ) )
86113 . subscribe ( itemsSubject)
87114 . store ( in: & subscriptions)
88-
115+
89116 let loadNext = input. loadNext
90117 . map { token. value }
91118
92- let batchRequest = loadNext. merge ( with: reload. map { initialToken } )
93- . share ( )
94- . prepend ( initialToken)
95-
96- let batchResponse = batchRequest
97- . flatMap { token in
98- return loadItemsWithToken ( token)
99- . map { result -> ResponseResult in
100- return . result( ( token: token, result: result) )
101- }
102- . catch { error in
103- Just ( ResponseResult . error ( error) )
104- }
105- }
119+ let batchRequest = loadNext
120+ . merge ( with: input. reload. prepend ( ( ) ) . map { initial } )
106121 . eraseToAnyPublisher ( )
107- . share ( )
108-
122+
123+ // TODO: avoid having extra subject when `shareReplay()` is introduced.
124+ let batchResponse = PassthroughSubject < ResponseResult , Never > ( )
125+
109126 batchResponse
110- . compactMap { result -> Error ? in
127+ . map { result -> Error ? in
111128 switch result {
112129 case . error( let error) : return error
113130 default : return nil
114131 }
115132 }
116- . assign ( to: \Output . error, on: output)
117- . store ( in: & subscriptions)
118-
133+ . assign ( to: \Output . error, on: output)
134+ . store ( in: & subscriptions)
135+
119136 // Bind `Output.isLoading`
120137 Publishers . Merge ( batchRequest. map { _ in true } , batchResponse. map { _ in false } )
121138 . assign ( to: \Output . isLoading, on: output)
122139 . store ( in: & subscriptions)
123140
124141 let successResponse = batchResponse
125- . compactMap { result -> ( token: Data ? , result: BatchesDataSource < Element > . LoadResult ) ? in
142+ . compactMap { result -> ( token: Token , result: BatchesDataSource < Element > . LoadResult ) ? in
126143 switch result {
127144 case . result( let result) : return result
128145 default : return nil
@@ -142,11 +159,16 @@ public struct BatchesDataSource<Element> {
142159 . store ( in: & subscriptions)
143160
144161 let result = successResponse
145- . compactMap { tuple -> ( token: Data ? , items: [ Element ] , nextToken: Data ? ) ? in
162+ . compactMap { tuple -> ( token: Token , items: [ Element ] , nextToken: Token ) ? in
146163 switch tuple. result {
147- case . completed: return nil
148- case . itemsToken( let elements, let nextToken) : return ( token: tuple. token, items: elements, nextToken: nextToken)
149- default : fatalError ( )
164+ case . completed:
165+ return nil
166+ case . items( let elements) :
167+ // Fix incremeneting page number
168+ guard case Token . int( let currentPage) = tuple. token else { fatalError ( ) }
169+ return ( token: tuple. token, items: elements, nextToken: . int( currentPage+ 1 ) )
170+ case . itemsToken( let elements, let nextToken) :
171+ return ( token: tuple. token, items: elements, nextToken: . data( nextToken) )
150172 }
151173 }
152174 . share ( )
@@ -160,6 +182,7 @@ public struct BatchesDataSource<Element> {
160182 // Bind `items`
161183 result
162184 . map {
185+ // TODO: Solve for `withLatestFrom(_)`
163186 let currentItems = itemsSubject. value
164187 return currentItems + $0. items
165188 }
@@ -170,6 +193,39 @@ public struct BatchesDataSource<Element> {
170193 itemsSubject
171194 . assign ( to: \Output . items, on: output)
172195 . store ( in: & subscriptions)
196+
197+ batchRequest
198+ . assertMaxSubscriptions ( 1 )
199+ . flatMap { token in
200+ return loadNextCallback ( token)
201+ . map { result -> ResponseResult in
202+ return . result( ( token: token, result: result) )
203+ }
204+ . catch { error in
205+ Just ( ResponseResult . error ( error) )
206+ }
207+ . append ( Empty ( completeImmediately: true ) )
208+ }
209+ . sink ( receiveValue: batchResponse. send)
210+ . store ( in: & subscriptions)
211+
212+ }
213+
214+ /// Initializes a list data source using a token to fetch batches of items.
215+ /// - Parameter items: initial list of items.
216+ /// - Parameter input: the input to control the data source.
217+ /// - Parameter initialToken: the token to use to fetch the first batch.
218+ /// - Parameter loadItemsWithToken: a `(Data?) -> (Publisher<LoadResult, Error>)` closure that fetches a batch of items and returns the items fetched
219+ /// plus a token to use for the next batch. The token can be an alphanumerical id, a URL, or another type of token.
220+ /// - Todo: if `withLatestFrom` is introduced, use it instead of grabbing the latest value unsafely.
221+ public init ( items: [ Element ] = [ ] , input: BatchesInput , initialToken: Data ? , loadItemsWithToken: @escaping ( Data ? ) -> AnyPublisher < LoadResult , Error > ) {
222+ self . init ( items: items, input: input, initial: Token . data ( initialToken) , loadNextCallback: { token -> AnyPublisher < LoadResult , Error > in
223+ switch token {
224+ case . data( let data) :
225+ return loadItemsWithToken ( data)
226+ default : fatalError ( )
227+ }
228+ } )
173229 }
174230
175231 /// Initialiazes a list data source of items batched in numbered pages.
@@ -179,100 +235,29 @@ public struct BatchesDataSource<Element> {
179235 /// - Parameter loadPage: a `(Int) -> (Publisher<LoadResult, Error>)` closure that fetches a batch of items.
180236 /// - Todo: if `withLatestFrom` is introduced, use it instead of grabbing the latest value unsafely.
181237 public init ( items: [ Element ] = [ ] , input: BatchesInput , initialPage: Int = 0 , loadPage: @escaping ( Int ) -> AnyPublisher < LoadResult , Error > ) {
182- let itemsSubject = CurrentValueSubject < [ Element ] , Never > ( items)
183- let currentPage = CurrentValueSubject < Int , Never > ( initialPage)
184-
185- self . input = input
186- let output = self . output
187-
188- let reload = input. reload
189- . share ( )
190-
191- reload
192- . map { _ in
193- return items
194- }
195- . subscribe ( itemsSubject)
196- . store ( in: & subscriptions)
197-
198- let loadNext = input. loadNext
199- . map { currentPage. value + 1 }
200-
201- let pageRequest = loadNext. merge ( with: reload. map { - 1 } )
202- . share ( )
203- . prepend ( 1 )
204-
205- // TODO: Add the response error handling like for batches
206-
207- // Bind `Output.isLoading = true`
208- pageRequest
209- . map { _ in true }
210- . assign ( to: \Output . isLoading, on: output)
211- . store ( in: & subscriptions)
212-
213- let pageResponse = pageRequest
214- . flatMap { page in
215- return loadPage ( page == - 1 ? 1 : page)
216- . handleEvents ( receiveOutput: { _ in
217- output. error = nil
218- } ,
219- receiveCompletion: { completion in
220- if case Subscribers . Completion . failure( let error) = completion {
221- output. error = error
222- } else {
223- output. error = nil
224- }
225- } )
226- . catch { _ in
227- return Empty ( )
228- }
229- . map { ( pageNumber: page, result: $0) }
238+ self . init ( items: items, input: input, initial: Token . int ( initialPage) , loadNextCallback: { page -> AnyPublisher < LoadResult , Error > in
239+ switch page {
240+ case . int( let page) :
241+ return loadPage ( page)
242+ default : fatalError ( )
230243 }
231- . eraseToAnyPublisher ( )
232- . share ( )
244+ } )
245+ }
246+ }
233247
234- // Bind `Output.isLoading = false`
235- Publishers . Merge ( pageRequest. map { _ in true } , pageResponse. map { _ in false } )
236- . assign ( to: \Output . isLoading, on: output)
237- . store ( in: & subscriptions)
248+ fileprivate var uuids = [ String: Int] ( )
238249
239- // Bind `Output.isCompleted`
240- pageResponse
241- . map { tuple -> Bool in
242- switch tuple. result {
243- case . completed: return true
244- default : return false
245- }
246- }
247- . assign ( to: \Output . isCompleted, on: output)
248- . store ( in: & subscriptions)
250+ extension Publisher {
251+ public func assertMaxSubscriptions( _ max: Int , file: StaticString = #file, line: UInt = #line) -> AnyPublisher < Output , Failure > {
252+ let uuid = " \( file) : \( line) "
249253
250- // Bind `items`
251- pageResponse
252- . compactMap { tuple -> ( pageNumber: Int , items: [ Element ] ) ? in
253- switch tuple. result {
254- case . completed: return nil
255- case . items( let elements) : return ( pageNumber: tuple. pageNumber, items: elements)
256- default : fatalError ( )
257- }
258- }
259- . map {
260- let currentItems = itemsSubject. value
261- return currentItems + $0. items
254+ return handleEvents ( receiveSubscription: { _ in
255+ let count = uuids [ uuid] ?? 0
256+ guard count < max else {
257+ assert ( false , " Publisher subscribed more than \( max) times. " )
258+ return
262259 }
263- . subscribe ( itemsSubject)
264- . store ( in: & subscriptions)
265-
266- // Bind `currentPage`
267- pageResponse
268- . map { $0. pageNumber }
269- . subscribe ( currentPage)
270- . store ( in: & subscriptions)
271-
272- // Bind `Output.items`
273- itemsSubject
274- . assign ( to: \Output . items, on: output)
275- . store ( in: & subscriptions)
260+ uuids [ uuid] = count + 1
261+ } ) . eraseToAnyPublisher ( )
276262 }
277263}
278-
0 commit comments