@@ -23,157 +23,114 @@ public final class Action<Input, Element> {
2323 /// All inputs are always appear in this subject even if the action is not enabled.
2424 /// Thus, inputs count equals elements count + errors count.
2525 public let inputs = PublishSubject < Input > ( )
26- fileprivate let _completed = PublishSubject < Void > ( )
2726
28- /// Errors aggrevated from invocations of execute().
27+ /// Errors aggrevated from invocations of execute().
2928 /// Delivered on whatever scheduler they were sent from.
30- public var errors : Observable < ActionError > {
31- return self . _errors. asObservable ( )
32- }
33- fileprivate let _errors = PublishSubject < ActionError > ( )
29+ public let errors : Observable < ActionError >
3430
35- /// Whether or not we're currently executing.
31+ /// Whether or not we're currently executing.
3632 /// Delivered on whatever scheduler they were sent from.
37- public var elements : Observable < Element > {
38- return self . _elements. asObservable ( )
39- }
40- fileprivate let _elements = PublishSubject < Element > ( )
33+ public let elements : Observable < Element >
4134
4235 /// Whether or not we're currently executing.
4336 /// Always observed on MainScheduler.
44- public var executing : Observable < Bool > {
45- return self . _executing. asObservable ( ) . observeOn ( MainScheduler . instance)
46- }
47- fileprivate let _executing = Variable ( false )
48-
37+ public let executing : Observable < Bool >
38+
4939 /// Observables returned by the workFactory.
5040 /// Useful for sending results back from work being completed
5141 /// e.g. response from a network call.
52- public var executionObservables : Observable < Observable < Element > > {
53- return self . _executionObservables. asObservable ( ) . observeOn ( MainScheduler . instance)
54- }
55- fileprivate let _executionObservables = PublishSubject < Observable < Element > > ( )
56-
42+ public let executionObservables : Observable < Observable < Element > >
43+
5744 /// Whether or not we're enabled. Note that this is a *computed* sequence
5845 /// property based on enabledIf initializer and if we're currently executing.
5946 /// Always observed on MainScheduler.
60- public var enabled : Observable < Bool > {
61- return _enabled. asObservable ( ) . observeOn ( MainScheduler . instance)
62- }
63- public fileprivate( set) var _enabled = BehaviorSubject ( value: true )
47+ public let enabled : Observable < Bool >
6448
65- fileprivate let executingQueue = DispatchQueue ( label: " com.ashfurrow.Action.executingQueue " , attributes: [ ] )
66- fileprivate let disposeBag = DisposeBag ( )
49+ private let disposeBag = DisposeBag ( )
6750
68- public init ( enabledIf: Observable < Bool > = Observable . just ( true ) , workFactory: @escaping WorkFactory ) {
69- self . _enabledIf = enabledIf
51+ public init (
52+ enabledIf: Observable < Bool > = Observable . just ( true ) ,
53+ workFactory: @escaping WorkFactory ) {
7054
55+ self . _enabledIf = enabledIf
7156 self . workFactory = workFactory
7257
73- Observable . combineLatest ( self . _enabledIf, self . executing) { ( enabled, executing) -> Bool in
74- return enabled && !executing
75- } . bindTo ( _enabled) . addDisposableTo ( disposeBag)
76-
77- self . inputs. subscribe ( onNext: { [ weak self] ( input) in
78- self ? . _execute ( input)
79- } ) . addDisposableTo ( disposeBag)
80- }
81- }
82-
83-
84- // MARK: Execution!
85- public extension Action {
86-
87- @discardableResult
88- public func execute( _ input: Input ) -> Observable < Element > {
89- let buffer = ReplaySubject< Element> . createUnbounded( )
90- let error = errors
91- . flatMap { error -> Observable < Element > in
92- if case . underlyingError( let error) = error {
93- throw error
58+ let enabledSubject = BehaviorSubject < Bool > ( value: false )
59+ enabled = enabledSubject. asObservable ( )
60+
61+ executionObservables = inputs
62+ . withLatestFrom ( enabled) { $0 }
63+ . flatMap { input, enabled -> Observable < Observable < Element > > in
64+ if enabled {
65+ return Observable . of ( workFactory ( input) . shareReplay ( 1 ) )
9466 } else {
9567 return Observable . empty ( )
9668 }
9769 }
70+ . shareReplay ( 1 )
71+
72+ elements = executionObservables
73+ . flatMap { $0. catchError { _ in Observable . empty ( ) } }
74+
75+ let notEnabledError = inputs
76+ . withLatestFrom ( enabled)
77+ . flatMap { $0 ? Observable . empty ( ) : Observable . of ( ActionError . notEnabled) }
78+
79+ let underlyingError = executionObservables
80+ . flatMap { elements in
81+ return elements
82+ . flatMap { _ in Observable< ActionError> . never( ) }
83+ . catchError { error in
84+ if let actionError = error as? ActionError {
85+ return Observable . of ( actionError)
86+ } else {
87+ return Observable . of ( . underlyingError( error) )
88+ }
89+ }
90+ }
9891
99- Observable
100- . of ( elements , error )
92+ errors = Observable
93+ . of ( notEnabledError , underlyingError )
10194 . merge ( )
102- . takeUntil ( _completed)
103- . bindTo ( buffer)
104- . addDisposableTo ( disposeBag)
105-
106- inputs. onNext ( input)
107-
108- return buffer. asObservable ( )
109- }
110-
111- @discardableResult
112- fileprivate func _execute( _ input: Input ) -> Observable < Element > {
113-
114- // Buffer from the work to a replay subject.
115- let buffer = ReplaySubject< Element> . createUnbounded( )
11695
117- // See if we're already executing.
118- var startedExecuting = false
119- self . doLocked {
120- if self . _enabled. valueOrFalse {
121- self . _executing. value = true
122- startedExecuting = true
96+ let executionStart = executionObservables. catchError { _ in Observable . empty ( ) }
97+ let executionEnd = executionObservables
98+ . catchError { _ in Observable . empty ( ) }
99+ . flatMap { observable -> Observable < Void > in
100+ return observable
101+ . flatMap { _ in Observable< Void> . empty( ) }
102+ . concat ( Observable . just ( ) )
103+ . catchErrorJustReturn ( )
123104 }
124- }
125-
126- // Make sure we started executing and we're accidentally disabled.
127- guard startedExecuting else {
128- let error = ActionError . notEnabled
129- self . _errors. onNext ( error)
130- buffer. onError ( error)
131-
132- return buffer
133- }
134-
135- let work = self . workFactory ( input)
136- defer {
137- // Subscribe to the work.
138- work. multicast ( buffer) . connect ( ) . addDisposableTo ( disposeBag)
139- }
140-
141- self . _executionObservables. onNext ( buffer)
142-
143- buffer. subscribe ( onNext: { [ weak self] element in
144- self ? . _elements. onNext ( element)
145- } ,
146- onError: { [ weak self] error in
147- self ? . _errors. onNext ( ActionError . underlyingError ( error) )
148- } ,
149- onCompleted: { [ weak self] in
150- self ? . _completed. onNext ( )
151- } ,
152- onDisposed: { [ weak self] in
153- self ? . doLocked { self ? . _executing. value = false }
154- } )
155- . addDisposableTo ( disposeBag)
156105
106+ executing = Observable
107+ . of ( executionStart. map { _ in true } , executionEnd. map { _ in false } )
108+ . merge ( )
109+ . startWith ( false )
157110
158- return buffer. asObservable ( )
111+ Observable
112+ . combineLatest ( executing, enabledIf) { !$0 && $1 }
113+ . bindTo ( enabledSubject)
114+ . addDisposableTo ( disposeBag)
159115 }
160- }
161116
162- private extension Action {
163- func doLocked( _ closure: ( ) -> Void ) {
164- executingQueue. sync ( execute: closure)
165- }
166- }
117+ @discardableResult
118+ public func execute( _ value: Input ) -> Observable < Element > {
119+ let subject = ReplaySubject< Element> . createUnbounded( )
167120
168- fileprivate let executingQueue = DispatchQueue ( label: " com.ashfurrow.Action.executingQueue " , attributes: [ ] )
169- internal func doLocked( _ closure: ( ) -> Void ) {
170- executingQueue. sync ( execute: closure)
171- }
121+ executionObservables
122+ . take ( 1 )
123+ . flatMap { $0. catchError { _ in Observable . never ( ) } }
124+ . bindTo ( subject)
125+ . addDisposableTo ( disposeBag)
126+
127+ errors
128+ . map { throw $0 }
129+ . bindTo ( subject)
130+ . addDisposableTo ( disposeBag)
172131
173- internal extension BehaviorSubject where Element: ExpressibleByBooleanLiteral {
174- var valueOrFalse : Element {
175- guard let value = try ? value ( ) else { return false }
132+ inputs. onNext ( value)
176133
177- return value
134+ return subject . asObservable ( )
178135 }
179136}
0 commit comments