@@ -23,157 +23,113 @@ public final class Action<Input, Element> {
2323 /// All inputs are always appear in this subject even if the action is not enabled.
2424 /// Thus, inputs count equals elements count + errors count.
2525 public let inputs = PublishSubject < Input > ( )
26- fileprivate let _completed = PublishSubject < Void > ( )
2726
28- /// Errors aggrevated from invocations of execute().
27+ /// Errors aggrevated from invocations of execute().
2928 /// Delivered on whatever scheduler they were sent from.
30- public var errors : Observable < ActionError > {
31- return self . _errors. asObservable ( )
32- }
33- fileprivate let _errors = PublishSubject < ActionError > ( )
29+ public let errors : Observable < ActionError >
3430
35- /// Whether or not we're currently executing.
31+ /// Whether or not we're currently executing.
3632 /// Delivered on whatever scheduler they were sent from.
37- public var elements : Observable < Element > {
38- return self . _elements. asObservable ( )
39- }
40- fileprivate let _elements = PublishSubject < Element > ( )
33+ public let elements : Observable < Element >
4134
4235 /// Whether or not we're currently executing.
4336 /// Always observed on MainScheduler.
44- public var executing : Observable < Bool > {
45- return self . _executing. asObservable ( ) . observeOn ( MainScheduler . instance)
46- }
47- fileprivate let _executing = Variable ( false )
48-
37+ public let executing : Observable < Bool >
38+
4939 /// Observables returned by the workFactory.
5040 /// Useful for sending results back from work being completed
5141 /// e.g. response from a network call.
52- public var executionObservables : Observable < Observable < Element > > {
53- return self . _executionObservables. asObservable ( ) . observeOn ( MainScheduler . instance)
54- }
55- fileprivate let _executionObservables = PublishSubject < Observable < Element > > ( )
56-
42+ public let executionObservables : Observable < Observable < Element > >
43+
5744 /// Whether or not we're enabled. Note that this is a *computed* sequence
5845 /// property based on enabledIf initializer and if we're currently executing.
5946 /// Always observed on MainScheduler.
60- public var enabled : Observable < Bool > {
61- return _enabled. asObservable ( ) . observeOn ( MainScheduler . instance)
62- }
63- public fileprivate( set) var _enabled = BehaviorSubject ( value: true )
47+ public let enabled : Observable < Bool >
6448
65- fileprivate let executingQueue = DispatchQueue ( label: " com.ashfurrow.Action.executingQueue " , attributes: [ ] )
66- fileprivate let disposeBag = DisposeBag ( )
49+ private let disposeBag = DisposeBag ( )
6750
68- public init ( enabledIf: Observable < Bool > = Observable . just ( true ) , workFactory: @escaping WorkFactory ) {
69- self . _enabledIf = enabledIf
51+ public init (
52+ enabledIf: Observable < Bool > = Observable . just ( true ) ,
53+ workFactory: @escaping WorkFactory ) {
7054
55+ self . _enabledIf = enabledIf
7156 self . workFactory = workFactory
7257
73- Observable . combineLatest ( self . _enabledIf, self . executing) { ( enabled, executing) -> Bool in
74- return enabled && !executing
75- } . bindTo ( _enabled) . addDisposableTo ( disposeBag)
76-
77- self . inputs. subscribe ( onNext: { [ weak self] ( input) in
78- self ? . _execute ( input)
79- } ) . addDisposableTo ( disposeBag)
80- }
81- }
82-
83-
84- // MARK: Execution!
85- public extension Action {
86-
87- @discardableResult
88- public func execute( _ input: Input ) -> Observable < Element > {
89- let buffer = ReplaySubject< Element> . createUnbounded( )
90- let error = errors
91- . flatMap { error -> Observable < Element > in
92- if case . underlyingError( let error) = error {
93- throw error
58+ let enabledSubject = BehaviorSubject < Bool > ( value: false )
59+ enabled = enabledSubject. asObservable ( )
60+
61+ executionObservables = inputs
62+ . withLatestFrom ( enabled) { $0 }
63+ . flatMap { input, enabled -> Observable < Observable < Element > > in
64+ if enabled {
65+ return Observable . of ( workFactory ( input) . shareReplay ( 1 ) )
9466 } else {
9567 return Observable . empty ( )
9668 }
9769 }
70+ . shareReplay ( 1 )
71+
72+ elements = executionObservables
73+ . flatMap { $0. catchError { _ in Observable . empty ( ) } }
74+
75+ let notEnabledError = inputs
76+ . withLatestFrom ( enabled)
77+ . flatMap { $0 ? Observable . empty ( ) : Observable . of ( ActionError . notEnabled) }
78+
79+ let underlyingError = executionObservables
80+ . flatMap { elements in
81+ return elements
82+ . flatMap { _ in Observable< ActionError> . never( ) }
83+ . catchError { error in
84+ if let actionError = error as? ActionError {
85+ return Observable . of ( actionError)
86+ } else {
87+ return Observable . of ( . underlyingError( error) )
88+ }
89+ }
90+ }
9891
99- Observable
100- . of ( elements , error )
92+ errors = Observable
93+ . of ( notEnabledError , underlyingError )
10194 . merge ( )
102- . takeUntil ( _completed)
103- . bindTo ( buffer)
104- . addDisposableTo ( disposeBag)
105-
106- inputs. onNext ( input)
107-
108- return buffer. asObservable ( )
109- }
110-
111- @discardableResult
112- fileprivate func _execute( _ input: Input ) -> Observable < Element > {
113-
114- // Buffer from the work to a replay subject.
115- let buffer = ReplaySubject< Element> . createUnbounded( )
11695
117- // See if we're already executing.
118- var startedExecuting = false
119- self . doLocked {
120- if self . _enabled. valueOrFalse {
121- self . _executing. value = true
122- startedExecuting = true
96+ let executionStart = executionObservables
97+ let executionEnd = executionObservables
98+ . flatMap { observable -> Observable < Void > in
99+ return observable
100+ . flatMap { _ in Observable< Void> . empty( ) }
101+ . concat ( Observable . just ( ) )
102+ . catchErrorJustReturn ( )
123103 }
124- }
125-
126- // Make sure we started executing and we're accidentally disabled.
127- guard startedExecuting else {
128- let error = ActionError . notEnabled
129- self . _errors. onNext ( error)
130- buffer. onError ( error)
131-
132- return buffer
133- }
134-
135- let work = self . workFactory ( input)
136- defer {
137- // Subscribe to the work.
138- work. multicast ( buffer) . connect ( ) . addDisposableTo ( disposeBag)
139- }
140-
141- self . _executionObservables. onNext ( buffer)
142-
143- buffer. subscribe ( onNext: { [ weak self] element in
144- self ? . _elements. onNext ( element)
145- } ,
146- onError: { [ weak self] error in
147- self ? . _errors. onNext ( ActionError . underlyingError ( error) )
148- } ,
149- onCompleted: { [ weak self] in
150- self ? . _completed. onNext ( )
151- } ,
152- onDisposed: { [ weak self] in
153- self ? . doLocked { self ? . _executing. value = false }
154- } )
155- . addDisposableTo ( disposeBag)
156104
105+ executing = Observable
106+ . of ( executionStart. map { _ in true } , executionEnd. map { _ in false } )
107+ . merge ( )
108+ . startWith ( false )
157109
158- return buffer. asObservable ( )
110+ Observable
111+ . combineLatest ( executing, enabledIf) { !$0 && $1 }
112+ . bindTo ( enabledSubject)
113+ . addDisposableTo ( disposeBag)
159114 }
160- }
161115
162- private extension Action {
163- func doLocked( _ closure: ( ) -> Void ) {
164- executingQueue. sync ( execute: closure)
165- }
166- }
116+ @discardableResult
117+ public func execute( _ value: Input ) -> Observable < Element > {
118+ let subject = ReplaySubject< Element> . createUnbounded( )
167119
168- fileprivate let executingQueue = DispatchQueue ( label: " com.ashfurrow.Action.executingQueue " , attributes: [ ] )
169- internal func doLocked( _ closure: ( ) -> Void ) {
170- executingQueue. sync ( execute: closure)
171- }
120+ executionObservables
121+ . take ( 1 )
122+ . flatMap { $0. catchError { _ in Observable . never ( ) } }
123+ . bindTo ( subject)
124+ . addDisposableTo ( disposeBag)
125+
126+ errors
127+ . map { throw $0 }
128+ . bindTo ( subject)
129+ . addDisposableTo ( disposeBag)
172130
173- internal extension BehaviorSubject where Element: ExpressibleByBooleanLiteral {
174- var valueOrFalse : Element {
175- guard let value = try ? value ( ) else { return false }
131+ inputs. onNext ( value)
176132
177- return value
133+ return subject . asObservable ( )
178134 }
179135}
0 commit comments