Skip to content

Commit a70afdf

Browse files
587 of 590 operator unit tests passing
1 parent d2cfac1 commit a70afdf

File tree

63 files changed

+890
-871
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+890
-871
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5324,8 +5324,7 @@ public final <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>
53245324
* calls to user code from within an Observer"
53255325
*/
53265326
private Subscription protectivelyWrapAndSubscribe(Observer<? super T> o) {
5327-
SafeObservableSubscription subscription = new SafeObservableSubscription();
5328-
return subscription.wrap(subscribe(new SafeObserver<T>(subscription, o)));
5327+
return subscribe(new SafeObserver<T>(o));
53295328
}
53305329

53315330
/**
@@ -6885,9 +6884,7 @@ public final Subscription subscribe(Observer<? super T> observer) {
68856884
if (isInternalImplementation(observer)) {
68866885
onSubscribeFunction.call(observer);
68876886
} else {
6888-
// TODO this doesn't seem correct any longer with the Observer and injecting of CompositeSubscription
6889-
SafeObservableSubscription subscription = new SafeObservableSubscription(observer);
6890-
onSubscribeFunction.call(new SafeObserver<T>(subscription, observer));
6887+
onSubscribeFunction.call(new SafeObserver<T>(observer));
68916888
}
68926889
return hook.onSubscribeReturn(this, observer);
68936890
} catch (OnErrorNotImplementedException e) {

rxjava-core/src/main/java/rx/joins/JoinObserver1.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public final class JoinObserver1<T> extends Observer<Notification<T>> implements
3737
private final Action1<Throwable> onError;
3838
private final List<ActivePlan0> activePlans;
3939
private final Queue<Notification<T>> queue;
40-
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
4140
// private volatile boolean done;
4241
private final AtomicBoolean subscribed = new AtomicBoolean(false);
4342
private final SafeObserver<Notification<T>> safeObserver;
@@ -47,7 +46,7 @@ public JoinObserver1(Observable<T> source, Action1<Throwable> onError) {
4746
this.onError = onError;
4847
queue = new LinkedList<Notification<T>>();
4948
activePlans = new ArrayList<ActivePlan0>();
50-
safeObserver = new SafeObserver<Notification<T>>(subscription, new InnerObserver());
49+
safeObserver = new SafeObserver<Notification<T>>(new InnerObserver());
5150
// add this subscription so it gets unsubscribed when the parent does
5251
add(safeObserver);
5352
}
@@ -64,7 +63,7 @@ public void addActivePlan(ActivePlan0 activePlan) {
6463
public void subscribe(Object gate) {
6564
if (subscribed.compareAndSet(false, true)) {
6665
this.gate = gate;
67-
subscription.wrap(source.materialize().subscribe(this));
66+
source.materialize().subscribe(this);
6867
} else {
6968
throw new IllegalStateException("Can only be subscribed to once.");
7069
}

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
7373
* "Guideline 6.4: Protect calls to user code from within an operator"
7474
*/
7575
private Subscription protectivelyWrapAndSubscribe(Observer<? super T> observer) {
76-
SafeObservableSubscription subscription = new SafeObservableSubscription();
77-
return subscription.wrap(o.subscribe(new SafeObserver<T>(subscription, observer)));
76+
return o.subscribe(new SafeObserver<T>(observer));
7877
}
7978

8079
/**

rxjava-core/src/main/java/rx/observers/SafeObserver.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,9 @@ public class SafeObserver<T> extends Observer<T> {
6262

6363
private final Observer<? super T> actual;
6464
private final AtomicBoolean isFinished = new AtomicBoolean(false);
65-
private final Subscription subscription;
6665

6766
public SafeObserver(Observer<? super T> actual) {
68-
this.subscription = Subscriptions.empty();
69-
this.actual = actual;
70-
}
71-
72-
public SafeObserver(SafeObservableSubscription subscription, Observer<? super T> actual) {
73-
this.subscription = subscription;
67+
super(actual);
7468
this.actual = actual;
7569
}
7670

@@ -84,7 +78,7 @@ public void onCompleted() {
8478
_onError(e);
8579
} finally {
8680
// auto-unsubscribe
87-
subscription.unsubscribe();
81+
unsubscribe();
8882
}
8983
}
9084
}
@@ -131,7 +125,7 @@ protected void _onError(Throwable e) {
131125
* The OnCompleted behavior in this case is to do nothing."
132126
*/
133127
try {
134-
subscription.unsubscribe();
128+
unsubscribe();
135129
} catch (Throwable unsubscribeException) {
136130
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
137131
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
@@ -145,7 +139,7 @@ protected void _onError(Throwable e) {
145139
*/
146140
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
147141
try {
148-
subscription.unsubscribe();
142+
unsubscribe();
149143
} catch (Throwable unsubscribeException) {
150144
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
151145
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
@@ -156,7 +150,7 @@ protected void _onError(Throwable e) {
156150
}
157151
// if we did not throw about we will unsubscribe here, if onError failed then unsubscribe happens in the catch
158152
try {
159-
subscription.unsubscribe();
153+
unsubscribe();
160154
} catch (RuntimeException unsubscribeException) {
161155
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
162156
throw unsubscribeException;

rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import rx.Observable.OnSubscribeFunc;
2020
import rx.Observer;
2121
import rx.Subscription;
22-
import rx.observers.SafeObserver;
2322

2423
/**
2524
* Converts the elements of an observable sequence to the specified type.
@@ -41,26 +40,40 @@ public DoOnEachObservable(Observable<? extends T> sequence, Observer<? super T>
4140

4241
@Override
4342
public Subscription onSubscribe(final Observer<? super T> observer) {
44-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
45-
return subscription.wrap(sequence.subscribe(new SafeObserver<T>(subscription, new Observer<T>() {
43+
return sequence.subscribe(new Observer<T>() {
4644
@Override
4745
public void onCompleted() {
48-
doOnEachObserver.onCompleted();
46+
try {
47+
doOnEachObserver.onCompleted();
48+
} catch (Throwable e) {
49+
onError(e);
50+
return;
51+
}
4952
observer.onCompleted();
5053
}
5154

5255
@Override
5356
public void onError(Throwable e) {
54-
doOnEachObserver.onError(e);
57+
try {
58+
doOnEachObserver.onError(e);
59+
} catch (Throwable e2) {
60+
observer.onError(e2);
61+
return;
62+
}
5563
observer.onError(e);
5664
}
5765

5866
@Override
5967
public void onNext(T value) {
60-
doOnEachObserver.onNext(value);
68+
try {
69+
doOnEachObserver.onNext(value);
70+
} catch (Throwable e) {
71+
onError(e);
72+
return;
73+
}
6174
observer.onNext(value);
6275
}
63-
})));
76+
});
6477
}
6578

6679
}

rxjava-core/src/main/java/rx/operators/OperationMulticast.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,7 @@ public Subscription onSubscribe(Observer<? super TResult> t1) {
137137

138138
CompositeSubscription csub = new CompositeSubscription();
139139

140-
csub.add(observable.subscribe(new SafeObserver<TResult>(
141-
new SafeObservableSubscription(csub), t1)));
140+
csub.add(observable.subscribe(t1));
142141
csub.add(connectable.connect());
143142

144143
return csub;

rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ public Boolean call(T input, Integer index) {
8888
private static class TakeWhile<T> implements OnSubscribeFunc<T> {
8989
private final Observable<? extends T> items;
9090
private final Func2<? super T, ? super Integer, Boolean> predicate;
91-
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
9291

9392
private TakeWhile(Observable<? extends T> items, Func2<? super T, ? super Integer, Boolean> predicate) {
9493
this.items = items;
@@ -97,7 +96,7 @@ private TakeWhile(Observable<? extends T> items, Func2<? super T, ? super Intege
9796

9897
@Override
9998
public Subscription onSubscribe(Observer<? super T> observer) {
100-
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
99+
return items.subscribe(new ItemObserver(observer));
101100
}
102101

103102
private class ItemObserver extends Observer<T> {
@@ -106,10 +105,11 @@ private class ItemObserver extends Observer<T> {
106105
private final AtomicInteger counter = new AtomicInteger();
107106

108107
public ItemObserver(Observer<? super T> observer) {
108+
super(observer);
109109
// Using AtomicObserver because the unsubscribe, onCompleted, onError and error handling behavior
110110
// needs "isFinished" logic to not send duplicated events
111111
// The 'testTakeWhile1' and 'testTakeWhile2' tests fail without this.
112-
this.observer = new SafeObserver<T>(subscription, observer);
112+
this.observer = new SafeObserver<T>(observer);
113113
}
114114

115115
@Override
@@ -135,8 +135,7 @@ public void onNext(T args) {
135135
observer.onNext(args);
136136
} else {
137137
observer.onCompleted();
138-
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
139-
subscription.unsubscribe();
138+
unsubscribe();
140139
}
141140
}
142141

rxjava-core/src/main/java/rx/operators/OperationUsing.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,8 @@ public Subscription onSubscribe(Observer<? super T> observer) {
4343
resourceSubscription = resource;
4444
}
4545
Observable<T> observable = observableFactory.call(resource);
46-
SafeObservableSubscription subscription = new SafeObservableSubscription();
47-
// Use SafeObserver to guarantee resourceSubscription will
48-
// be unsubscribed.
49-
return subscription.wrap(new CompositeSubscription(
50-
observable.subscribe(new SafeObserver<T>(
51-
subscription, observer)),
52-
resourceSubscription));
46+
// Use SafeObserver to guarantee resourceSubscription will be unsubscribed.
47+
return new CompositeSubscription(observable.subscribe(new SafeObserver<T>(observer)), resourceSubscription);
5348
} catch (Throwable e) {
5449
resourceSubscription.unsubscribe();
5550
return Observable.<T> error(e).subscribe(observer);

rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,10 @@
2121

2222
/**
2323
* Thread-safe wrapper around Observable Subscription that ensures unsubscribe can be called only once.
24-
* <p>
25-
* Also used to:
26-
* <p>
27-
* <ul>
28-
* <li>allow the AtomicObserver to have access to the subscription in asynchronous execution for checking if unsubscribed occurred without onComplete/onError.</li>
29-
* <li>handle both synchronous and asynchronous subscribe() execution flows</li>
30-
* </ul>
24+
*
25+
* @deprecated since `Observer` now implements `Subscription` and `CompositeSubscription` etc handle these things
3126
*/
27+
@Deprecated
3228
public final class SafeObservableSubscription implements Subscription {
3329

3430
private static final Subscription UNSUBSCRIBED = new Subscription()

rxjava-core/src/test/java/rx/observers/SafeObserverTest.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.junit.Test;
2323

2424
import rx.Observer;
25-
import rx.operators.SafeObservableSubscription;
2625
import rx.subscriptions.Subscriptions;
2726
import rx.util.CompositeException;
2827
import rx.util.OnErrorNotImplementedException;
@@ -167,22 +166,22 @@ public void onNextOnErrorFailureSafe() {
167166

168167
@Test
169168
public void onCompleteSuccessWithUnsubscribeFailure() {
170-
SafeObservableSubscription s = null;
169+
Observer<String> o = OBSERVER_SUCCESS();
171170
try {
172-
s = new SafeObservableSubscription(Subscriptions.create(new Action0() {
171+
o.add(Subscriptions.create(new Action0() {
173172

174173
@Override
175174
public void call() {
176175
// break contract by throwing exception
177176
throw new SafeObserverTestException("failure from unsubscribe");
178177
}
179178
}));
180-
new SafeObserver<String>(s, OBSERVER_SUCCESS()).onCompleted();
179+
new SafeObserver<String>(o).onCompleted();
181180
fail("expects exception to be thrown");
182181
} catch (Exception e) {
183182
e.printStackTrace();
184183

185-
assertTrue(s.isUnsubscribed());
184+
assertTrue(o.isUnsubscribed());
186185

187186
assertTrue(e instanceof SafeObserverTestException);
188187
assertEquals("failure from unsubscribe", e.getMessage());
@@ -193,22 +192,22 @@ public void call() {
193192
@Test
194193
public void onErrorSuccessWithUnsubscribeFailure() {
195194
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
196-
SafeObservableSubscription s = null;
195+
Observer<String> o = OBSERVER_SUCCESS(onError);
197196
try {
198-
s = new SafeObservableSubscription(Subscriptions.create(new Action0() {
197+
o.add(Subscriptions.create(new Action0() {
199198

200199
@Override
201200
public void call() {
202201
// break contract by throwing exception
203202
throw new SafeObserverTestException("failure from unsubscribe");
204203
}
205204
}));
206-
new SafeObserver<String>(s, OBSERVER_SUCCESS(onError)).onError(new SafeObserverTestException("failed"));
205+
new SafeObserver<String>(o).onError(new SafeObserverTestException("failed"));
207206
fail("we expect the unsubscribe failure to cause an exception to be thrown");
208207
} catch (Exception e) {
209208
e.printStackTrace();
210209

211-
assertTrue(s.isUnsubscribed());
210+
assertTrue(o.isUnsubscribed());
212211

213212
// we still expect onError to have received something before unsubscribe blew up
214213
assertNotNull(onError.get());
@@ -223,22 +222,22 @@ public void call() {
223222

224223
@Test
225224
public void onErrorFailureWithUnsubscribeFailure() {
226-
SafeObservableSubscription s = null;
225+
Observer<String> o = OBSERVER_ONERROR_FAIL();
227226
try {
228-
s = new SafeObservableSubscription(Subscriptions.create(new Action0() {
227+
o.add(Subscriptions.create(new Action0() {
229228

230229
@Override
231230
public void call() {
232231
// break contract by throwing exception
233232
throw new SafeObserverTestException("failure from unsubscribe");
234233
}
235234
}));
236-
new SafeObserver<String>(s, OBSERVER_ONERROR_FAIL()).onError(new SafeObserverTestException("onError failure"));
235+
new SafeObserver<String>(o).onError(new SafeObserverTestException("onError failure"));
237236
fail("expects exception to be thrown");
238237
} catch (Exception e) {
239238
e.printStackTrace();
240239

241-
assertTrue(s.isUnsubscribed());
240+
assertTrue(o.isUnsubscribed());
242241

243242
// assertions for what is expected for the actual failure propagated to onError which then fails
244243
assertTrue(e instanceof RuntimeException);
@@ -264,22 +263,22 @@ public void call() {
264263

265264
@Test
266265
public void onErrorNotImplementedFailureWithUnsubscribeFailure() {
267-
SafeObservableSubscription s = null;
266+
Observer<String> o = OBSERVER_ONERROR_NOTIMPLEMENTED();
268267
try {
269-
s = new SafeObservableSubscription(Subscriptions.create(new Action0() {
268+
o.add(Subscriptions.create(new Action0() {
270269

271270
@Override
272271
public void call() {
273272
// break contract by throwing exception
274273
throw new SafeObserverTestException("failure from unsubscribe");
275274
}
276275
}));
277-
new SafeObserver<String>(s, OBSERVER_ONERROR_NOTIMPLEMENTED()).onError(new SafeObserverTestException("error!"));
276+
new SafeObserver<String>(o).onError(new SafeObserverTestException("error!"));
278277
fail("expects exception to be thrown");
279278
} catch (Exception e) {
280279
e.printStackTrace();
281280

282-
assertTrue(s.isUnsubscribed());
281+
assertTrue(o.isUnsubscribed());
283282

284283
// assertions for what is expected for the actual failure propagated to onError which then fails
285284
assertTrue(e instanceof RuntimeException);

0 commit comments

Comments
 (0)