|
15 | 15 | */ |
16 | 16 | package rx.subjects; |
17 | 17 |
|
18 | | -import java.util.concurrent.ConcurrentHashMap; |
19 | | -import java.util.concurrent.atomic.AtomicBoolean; |
20 | | -import java.util.concurrent.atomic.AtomicReference; |
21 | | -import java.util.concurrent.locks.ReentrantLock; |
22 | | - |
23 | 18 | import rx.Notification; |
24 | 19 | import rx.Observer; |
25 | | -import rx.Subscription; |
26 | | -import rx.operators.SafeObservableSubscription; |
27 | | -import rx.subscriptions.Subscriptions; |
| 20 | +import rx.util.functions.Action2; |
28 | 21 |
|
29 | 22 | /** |
30 | 23 | * Subject that publishes only the last event to each {@link Observer} that has subscribed when the |
|
55 | 48 | * |
56 | 49 | * @param <T> |
57 | 50 | */ |
58 | | -public class AsyncSubject<T> extends Subject<T, T> { |
| 51 | +public class AsyncSubject<T> extends AbstractSubject<T> { |
59 | 52 |
|
60 | 53 | /** |
61 | 54 | * Create a new AsyncSubject |
62 | 55 | * |
63 | 56 | * @return a new AsyncSubject |
64 | 57 | */ |
65 | 58 | public static <T> AsyncSubject<T> create() { |
66 | | - final AsyncSubjectState<T> state = new AsyncSubjectState<T>(); |
| 59 | + final SubjectState<T> state = new SubjectState<T>(); |
| 60 | + OnSubscribeFunc<T> onSubscribe = getOnSubscribeFunc(state, new Action2<SubjectState<T>, Observer<? super T>>() { |
67 | 61 |
|
68 | | - OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() { |
69 | 62 | @Override |
70 | | - public Subscription onSubscribe(Observer<? super T> observer) { |
71 | | - /* |
72 | | - * Subscription needs to be synchronized with terminal states to ensure |
73 | | - * race conditions are handled. When subscribing we must make sure |
74 | | - * onComplete/onError is correctly emitted to all observers, even if it |
75 | | - * comes in while the onComplete/onError is being propagated. |
76 | | - */ |
77 | | - state.SUBSCRIPTION_LOCK.lock(); |
78 | | - try { |
79 | | - if (state.completed.get()) { |
80 | | - emitNotificationToObserver(state, observer); |
81 | | - return Subscriptions.empty(); |
82 | | - } else { |
83 | | - // the subject is not completed so we subscribe |
84 | | - final SafeObservableSubscription subscription = new SafeObservableSubscription(); |
85 | | - |
86 | | - subscription.wrap(new Subscription() { |
87 | | - @Override |
88 | | - public void unsubscribe() { |
89 | | - // on unsubscribe remove it from the map of outbound observers to notify |
90 | | - state.observers.remove(subscription); |
91 | | - } |
92 | | - }); |
93 | | - |
94 | | - // on subscribe add it to the map of outbound observers to notify |
95 | | - state.observers.put(subscription, observer); |
96 | | - |
97 | | - return subscription; |
| 63 | + public void call(SubjectState<T> state, Observer<? super T> o) { |
| 64 | + // we want the last value + completed so add this extra logic |
| 65 | + // to send onCompleted if the last value is an onNext |
| 66 | + if (state.completed.get()) { |
| 67 | + Notification<T> value = state.currentValue.get(); |
| 68 | + if (value != null && value.isOnNext()) { |
| 69 | + o.onCompleted(); |
98 | 70 | } |
99 | | - } finally { |
100 | | - state.SUBSCRIPTION_LOCK.unlock(); |
101 | 71 | } |
102 | | - |
103 | 72 | } |
104 | | - |
105 | | - }; |
106 | | - |
| 73 | + }); |
107 | 74 | return new AsyncSubject<T>(onSubscribe, state); |
108 | 75 | } |
109 | 76 |
|
110 | | - private static <T> void emitNotificationToObserver(final AsyncSubjectState<T> state, Observer<? super T> observer) { |
111 | | - Notification<T> finalValue = state.currentValue.get(); |
112 | | - |
113 | | - // if null that means onNext was never invoked (no Notification set) |
114 | | - if (finalValue != null) { |
115 | | - if (finalValue.isOnNext()) { |
116 | | - observer.onNext(finalValue.getValue()); |
117 | | - } else if (finalValue.isOnError()) { |
118 | | - observer.onError(finalValue.getThrowable()); |
119 | | - } |
120 | | - } |
121 | | - observer.onCompleted(); |
122 | | - } |
123 | | - |
124 | | - /** |
125 | | - * State externally constructed and passed in so the onSubscribe function has access to it. |
126 | | - * |
127 | | - * @param <T> |
128 | | - */ |
129 | | - private static class AsyncSubjectState<T> { |
130 | | - private final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>(); |
131 | | - private final AtomicReference<Notification<T>> currentValue = new AtomicReference<Notification<T>>(); |
132 | | - private final AtomicBoolean completed = new AtomicBoolean(); |
133 | | - private final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock(); |
134 | | - } |
135 | | - |
136 | | - private final AsyncSubjectState<T> state; |
| 77 | + private final SubjectState<T> state; |
137 | 78 |
|
138 | | - protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, AsyncSubjectState<T> state) { |
| 79 | + protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectState<T> state) { |
139 | 80 | super(onSubscribe); |
140 | 81 | this.state = state; |
141 | 82 | } |
142 | 83 |
|
143 | 84 | @Override |
144 | 85 | public void onCompleted() { |
145 | | - terminalState(); |
| 86 | + /** |
| 87 | + * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers |
| 88 | + */ |
| 89 | + emitNotificationAndTerminate(state, new Action2<SubjectState<T>, Observer<? super T>>() { |
| 90 | + |
| 91 | + @Override |
| 92 | + public void call(SubjectState<T> state, Observer<? super T> o) { |
| 93 | + o.onCompleted(); |
| 94 | + } |
| 95 | + }); |
146 | 96 | } |
147 | 97 |
|
148 | 98 | @Override |
149 | 99 | public void onError(Throwable e) { |
| 100 | + /** |
| 101 | + * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers |
| 102 | + */ |
150 | 103 | state.currentValue.set(new Notification<T>(e)); |
151 | | - terminalState(); |
| 104 | + emitNotificationAndTerminate(state, null); |
152 | 105 | } |
153 | 106 |
|
154 | 107 | @Override |
155 | 108 | public void onNext(T v) { |
| 109 | + /** |
| 110 | + * Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs. |
| 111 | + */ |
156 | 112 | state.currentValue.set(new Notification<T>(v)); |
157 | 113 | } |
158 | 114 |
|
159 | | - private void terminalState() { |
160 | | - /* |
161 | | - * We can not allow new subscribers to be added while we execute the terminal state. |
162 | | - */ |
163 | | - state.SUBSCRIPTION_LOCK.lock(); |
164 | | - try { |
165 | | - if (state.completed.compareAndSet(false, true)) { |
166 | | - for (Subscription s : state.observers.keySet()) { |
167 | | - // emit notifications to this observer |
168 | | - emitNotificationToObserver(state, state.observers.get(s)); |
169 | | - // remove the subscription as it is completed |
170 | | - state.observers.remove(s); |
171 | | - } |
172 | | - } |
173 | | - } finally { |
174 | | - state.SUBSCRIPTION_LOCK.unlock(); |
175 | | - } |
176 | | - } |
177 | 115 | } |
0 commit comments