Skip to content

Commit cc17cbd

Browse files
Merge pull request #1040 from benjchristensen/pull-972
Merge and Cleanup of #972
2 parents 7a85774 + 56f88df commit cc17cbd

File tree

10 files changed

+324
-239
lines changed

10 files changed

+324
-239
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import rx.operators.OperationAsObservable;
5757
import rx.operators.OperationAverage;
5858
import rx.operators.OperationBuffer;
59-
import rx.operators.OperationCache;
6059
import rx.operators.OperationCombineLatest;
6160
import rx.operators.OperationConcat;
6261
import rx.operators.OperationDebounce;
@@ -105,6 +104,7 @@
105104
import rx.operators.OperationUsing;
106105
import rx.operators.OperationWindow;
107106
import rx.operators.OperatorAmb;
107+
import rx.operators.OperatorCache;
108108
import rx.operators.OperatorCast;
109109
import rx.operators.OperatorDoOnEach;
110110
import rx.operators.OperatorFilter;
@@ -3864,7 +3864,7 @@ public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialC
38643864
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-cache">RxJava Wiki: cache()</a>
38653865
*/
38663866
public final Observable<T> cache() {
3867-
return create(OperationCache.cache(this));
3867+
return create(new OperatorCache<T>(this));
38683868
}
38693869

38703870
/**

rxjava-core/src/main/java/rx/operators/OperationCache.java renamed to rxjava-core/src/main/java/rx/operators/OperatorCache.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
import java.util.concurrent.atomic.AtomicBoolean;
1919

2020
import rx.Observable;
21-
import rx.Observable.OnSubscribeFunc;
22-
import rx.Observer;
23-
import rx.Subscription;
21+
import rx.Observable.OnSubscribe;
22+
import rx.Subscriber;
2423
import rx.observers.Subscribers;
2524
import rx.subjects.ReplaySubject;
25+
import rx.subjects.Subject;
2626

2727
/**
2828
* This method has similar behavior to {@link Observable#replay()} except that this auto-subscribes
@@ -36,30 +36,35 @@
3636
* NOTE: You sacrifice the ability to unsubscribe from the origin when you use this operator, so be
3737
* careful not to use this operator on Observables that emit infinite or very large numbers of
3838
* items, as this will use up memory.
39+
*
40+
* @param <T>
41+
* the cached value type
3942
*/
40-
public class OperationCache {
41-
42-
public static <T> OnSubscribeFunc<T> cache(final Observable<? extends T> source) {
43-
return new OnSubscribeFunc<T>() {
44-
45-
final AtomicBoolean subscribed = new AtomicBoolean(false);
46-
private final ReplaySubject<T> cache = ReplaySubject.create();
43+
public final class OperatorCache<T> implements OnSubscribe<T> {
44+
protected final Observable<? extends T> source;
45+
protected final Subject<? super T, ? extends T> cache;
46+
protected final AtomicBoolean sourceSubscribed;
4747

48-
@Override
49-
public Subscription onSubscribe(Observer<? super T> observer) {
50-
if (subscribed.compareAndSet(false, true)) {
51-
// subscribe to the source once
52-
source.unsafeSubscribe(Subscribers.from(cache));
53-
/*
54-
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
55-
*
56-
* This means this should never be used on an infinite or very large sequence, similar to toList().
57-
*/
58-
}
48+
public OperatorCache(Observable<? extends T> source) {
49+
this(source, ReplaySubject.<T> create());
50+
}
5951

60-
return cache.unsafeSubscribe(Subscribers.from(observer));
61-
}
52+
/* accessible to tests */OperatorCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
53+
this.source = source;
54+
this.cache = cache;
55+
this.sourceSubscribed = new AtomicBoolean();
56+
}
6257

63-
};
58+
@Override
59+
public void call(Subscriber<? super T> s) {
60+
if (sourceSubscribed.compareAndSet(false, true)) {
61+
source.unsafeSubscribe(Subscribers.from(cache));
62+
/*
63+
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
64+
*
65+
* This means this should never be used on an infinite or very large sequence, similar to toList().
66+
*/
67+
}
68+
cache.unsafeSubscribe(s);
6469
}
65-
}
70+
}

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import rx.Notification;
2222
import rx.Observer;
23+
import rx.functions.Action0;
2324
import rx.functions.Action1;
2425
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2526

@@ -105,35 +106,38 @@ protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T>
105106

106107
@Override
107108
public void onCompleted() {
108-
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
109+
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
109110

110111
@Override
111-
public void call(Collection<SubjectObserver<? super T>> observers) {
112-
for (Observer<? super T> o : observers) {
113-
emitValueToObserver(lastNotification.get(), o);
114-
}
112+
public void call() {
115113
}
116114
});
115+
if (observers != null) {
116+
for (Observer<? super T> o : observers) {
117+
emitValueToObserver(lastNotification.get(), o);
118+
}
119+
}
117120
}
118121

119122
@Override
120123
public void onError(final Throwable e) {
121-
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
122-
124+
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
123125
@Override
124-
public void call(Collection<SubjectObserver<? super T>> observers) {
125-
lastNotification.set(new Notification<T>(e));
126-
for (Observer<? super T> o : observers) {
127-
emitValueToObserver(lastNotification.get(), o);
128-
}
126+
public void call() {
127+
lastNotification.set(Notification.<T> createOnError(e));
129128
}
130129
});
130+
if (observers != null) {
131+
for (Observer<? super T> o : observers) {
132+
emitValueToObserver(lastNotification.get(), o);
133+
}
134+
}
131135

132136
}
133137

134138
@Override
135139
public void onNext(T v) {
136-
lastNotification.set(new Notification<T>(v));
140+
lastNotification.set(Notification.createOnNext(v));
137141
}
138142

139143
}

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import rx.Notification;
2222
import rx.Observer;
23+
import rx.functions.Action0;
2324
import rx.functions.Action1;
2425
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2526

@@ -67,12 +68,10 @@
6768
public final class BehaviorSubject<T> extends Subject<T, T> {
6869

6970
/**
70-
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each
71-
* {@link Observer} that subscribes to it.
71+
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it.
7272
*
7373
* @param defaultValue
74-
* the value which will be published to any {@link Observer} as long as the
75-
* {@link BehaviorSubject} has not yet received any events
74+
* the value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events
7675
* @return the constructed {@link BehaviorSubject}
7776
* @deprecated use {@link #create(T)} instead
7877
*/
@@ -81,12 +80,10 @@ public static <T> BehaviorSubject<T> createWithDefaultValue(T defaultValue) {
8180
}
8281

8382
/**
84-
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each
85-
* {@link Observer} that subscribes to it.
83+
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it.
8684
*
8785
* @param defaultValue
88-
* the value which will be published to any {@link Observer} as long as the
89-
* {@link BehaviorSubject} has not yet received any events
86+
* the value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events
9087
* @return the constructed {@link BehaviorSubject}
9188
*/
9289
public static <T> BehaviorSubject<T> create(T defaultValue) {
@@ -144,39 +141,42 @@ protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager
144141

145142
@Override
146143
public void onCompleted() {
147-
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
144+
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
148145

149146
@Override
150-
public void call(Collection<SubjectObserver<? super T>> observers) {
151-
lastNotification.set(new Notification<T>());
152-
for (Observer<? super T> o : observers) {
153-
o.onCompleted();
154-
}
147+
public void call() {
148+
lastNotification.set(Notification.<T> createOnCompleted());
155149
}
156150
});
151+
if (observers != null) {
152+
for (Observer<? super T> o : observers) {
153+
o.onCompleted();
154+
}
155+
}
157156
}
158157

159158
@Override
160159
public void onError(final Throwable e) {
161-
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
160+
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
162161

163162
@Override
164-
public void call(Collection<SubjectObserver<? super T>> observers) {
165-
lastNotification.set(new Notification<T>(e));
166-
for (Observer<? super T> o : observers) {
167-
o.onError(e);
168-
}
163+
public void call() {
164+
lastNotification.set(Notification.<T> createOnError(e));
169165
}
170166
});
171-
167+
if (observers != null) {
168+
for (Observer<? super T> o : observers) {
169+
o.onError(e);
170+
}
171+
}
172172
}
173173

174174
@Override
175175
public void onNext(T v) {
176176
// do not overwrite a terminal notification
177177
// so new subscribers can get them
178178
if (lastNotification.get().isOnNext()) {
179-
lastNotification.set(new Notification<T>(v));
179+
lastNotification.set(Notification.createOnNext(v));
180180
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
181181
o.onNext(v);
182182
}

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import rx.Notification;
2222
import rx.Observer;
23+
import rx.functions.Action0;
2324
import rx.functions.Action1;
2425
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2526

@@ -95,31 +96,33 @@ protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<
9596

9697
@Override
9798
public void onCompleted() {
98-
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
99-
99+
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
100100
@Override
101-
public void call(Collection<SubjectObserver<? super T>> observers) {
101+
public void call() {
102102
lastNotification.set(Notification.<T> createOnCompleted());
103-
for (Observer<? super T> o : observers) {
104-
o.onCompleted();
105-
}
106103
}
107104
});
105+
if (observers != null) {
106+
for (Observer<? super T> o : observers) {
107+
o.onCompleted();
108+
}
109+
}
108110
}
109111

110112
@Override
111113
public void onError(final Throwable e) {
112-
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
114+
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
113115

114116
@Override
115-
public void call(Collection<SubjectObserver<? super T>> observers) {
116-
lastNotification.set(Notification.<T>createOnError(e));
117-
for (Observer<? super T> o : observers) {
118-
o.onError(e);
119-
}
117+
public void call() {
118+
lastNotification.set(Notification.<T> createOnError(e));
120119
}
121120
});
122-
121+
if (observers != null) {
122+
for (Observer<? super T> o : observers) {
123+
o.onError(e);
124+
}
125+
}
123126
}
124127

125128
@Override

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import rx.Notification;
2525
import rx.Observer;
26+
import rx.functions.Action0;
2627
import rx.functions.Action1;
2728
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2829

@@ -88,7 +89,7 @@ public void call(SubjectObserver<? super T> o) {
8889
// we will finish replaying if there is anything left
8990
replayObserverFromIndex(state.history, idx, o);
9091
}
91-
},
92+
},
9293
new Action1<SubjectObserver<? super T>>() {
9394
@Override
9495
public void call(SubjectObserver<? super T> o) {
@@ -122,34 +123,38 @@ protected ReplaySubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T
122123

123124
@Override
124125
public void onCompleted() {
125-
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
126+
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
126127

127128
@Override
128-
public void call(Collection<SubjectObserver<? super T>> observers) {
129-
state.history.complete(Notification.<T>createOnCompleted());
130-
for (SubjectObserver<? super T> o : observers) {
131-
if (caughtUp(o)) {
132-
o.onCompleted();
133-
}
134-
}
129+
public void call() {
130+
state.history.complete(Notification.<T> createOnCompleted());
135131
}
136132
});
133+
if (observers != null) {
134+
for (SubjectObserver<? super T> o : observers) {
135+
if (caughtUp(o)) {
136+
o.onCompleted();
137+
}
138+
}
139+
}
137140
}
138141

139142
@Override
140143
public void onError(final Throwable e) {
141-
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
144+
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
142145

143146
@Override
144-
public void call(Collection<SubjectObserver<? super T>> observers) {
145-
state.history.complete(Notification.<T>createOnError(e));
146-
for (SubjectObserver<? super T> o : observers) {
147-
if (caughtUp(o)) {
148-
o.onError(e);
149-
}
150-
}
147+
public void call() {
148+
state.history.complete(Notification.<T> createOnError(e));
151149
}
152150
});
151+
if (observers != null) {
152+
for (SubjectObserver<? super T> o : observers) {
153+
if (caughtUp(o)) {
154+
o.onError(e);
155+
}
156+
}
157+
}
153158
}
154159

155160
@Override
@@ -236,10 +241,11 @@ public void complete(Notification<T> n) {
236241
terminalValue.set(n);
237242
}
238243
}
244+
239245
/**
240246
* @return Returns the number of subscribers.
241247
*/
242-
/* Support test.*/ int subscriberCount() {
248+
/* Support test. */int subscriberCount() {
243249
return state.replayState.size();
244250
}
245251
}

0 commit comments

Comments
 (0)