Skip to content

Commit 0ac1e6c

Browse files
Fixed 1 Unit Test - Linked Observer/Subscription where seemingly obvious to do so.
1 parent d37ce86 commit 0ac1e6c

16 files changed

+36
-67
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public final static <T> Observable<T> create(final OnSubscribeFunc<T> f) {
213213
@Override
214214
public void call(Observer<? super T> observer) {
215215
Subscription s = f.onSubscribe(observer);
216-
if (s != null) {
216+
if (s != null && s != observer) {
217217
observer.add(s);
218218
}
219219
}

rxjava-core/src/main/java/rx/operators/OperationAny.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ private Any(Observable<? extends T> source, Func1<? super T, Boolean> predicate,
8080

8181
@Override
8282
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
83-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
84-
return subscription.wrap(source.subscribe(new Observer<T>() {
83+
return source.subscribe(new Observer<T>(observer) {
8584

8685
private final AtomicBoolean hasEmitted = new AtomicBoolean(false);
8786

@@ -93,16 +92,12 @@ public void onNext(T value) {
9392
&& hasEmitted.getAndSet(true) == false) {
9493
observer.onNext(!returnOnEmpty);
9594
observer.onCompleted();
96-
// this will work if the sequence is asynchronous, it
97-
// will have no effect on a synchronous observable
98-
subscription.unsubscribe();
95+
unsubscribe();
9996
}
10097
}
10198
} catch (Throwable ex) {
10299
observer.onError(ex);
103-
// this will work if the sequence is asynchronous, it
104-
// will have no effect on a synchronous observable
105-
subscription.unsubscribe();
100+
unsubscribe();
106101
}
107102

108103
}
@@ -119,7 +114,7 @@ public void onCompleted() {
119114
observer.onCompleted();
120115
}
121116
}
122-
}));
117+
});
123118
}
124119

125120
}

rxjava-core/src/main/java/rx/operators/OperationDefaultIfEmpty.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ private DefaultIfEmpty(Observable<? extends T> source, T defaultValue) {
5555

5656
@Override
5757
public Subscription onSubscribe(final Observer<? super T> observer) {
58-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
59-
return subscription.wrap(source.subscribe(new Observer<T>() {
58+
return source.subscribe(new Observer<T>(observer) {
6059

6160
private volatile boolean hasEmitted = false;
6261

@@ -67,9 +66,7 @@ public void onNext(T value) {
6766
observer.onNext(value);
6867
} catch (Throwable ex) {
6968
observer.onError(ex);
70-
// this will work if the sequence is asynchronous, it
71-
// will have no effect on a synchronous observable
72-
subscription.unsubscribe();
69+
unsubscribe();
7370
}
7471
}
7572

@@ -87,7 +84,7 @@ public void onCompleted() {
8784
observer.onCompleted();
8885
}
8986
}
90-
}));
87+
});
9188
}
9289
}
9390
}

rxjava-core/src/main/java/rx/operators/OperationDematerialize.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public DematerializeObservable(Observable<? extends Notification<? extends T>> s
5454

5555
@Override
5656
public Subscription onSubscribe(final Observer<? super T> observer) {
57-
return sequence.subscribe(new Observer<Notification<? extends T>>() {
57+
return sequence.subscribe(new Observer<Notification<? extends T>>(observer) {
5858
@Override
5959
public void onCompleted() {
6060
}

rxjava-core/src/main/java/rx/operators/OperationDistinct.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private Distinct(Observable<? extends T> source, Func1<? super T, ? extends U> k
9797

9898
@Override
9999
public Subscription onSubscribe(final Observer<? super T> observer) {
100-
final Subscription sourceSub = source.subscribe(new Observer<T>() {
100+
return source.subscribe(new Observer<T>(observer) {
101101
private final Set<U> emittedKeys = new HashSet<U>();
102102

103103
@Override
@@ -119,13 +119,6 @@ public void onNext(T next) {
119119
}
120120
}
121121
});
122-
123-
return Subscriptions.create(new Action0() {
124-
@Override
125-
public void call() {
126-
sourceSub.unsubscribe();
127-
}
128-
});
129122
}
130123
}
131124

@@ -142,7 +135,7 @@ private DistinctWithComparator(Observable<? extends T> source, Func1<? super T,
142135

143136
@Override
144137
public Subscription onSubscribe(final Observer<? super T> observer) {
145-
final Subscription sourceSub = source.subscribe(new Observer<T>() {
138+
return source.subscribe(new Observer<T>(observer) {
146139

147140
// due to the totally arbitrary equality comparator, we can't use anything more efficient than lists here
148141
private final List<U> emittedKeys = new ArrayList<U>();
@@ -175,13 +168,6 @@ private boolean alreadyEmitted(U newKey) {
175168
return false;
176169
}
177170
});
178-
179-
return Subscriptions.create(new Action0() {
180-
@Override
181-
public void call() {
182-
sourceSub.unsubscribe();
183-
}
184-
});
185171
}
186172
}
187173
}

rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private DistinctUntilChanged(Observable<? extends T> source, Func1<? super T, ?
108108

109109
@Override
110110
public Subscription onSubscribe(final Observer<? super T> observer) {
111-
final Subscription sourceSub = source.subscribe(new Observer<T>() {
111+
return source.subscribe(new Observer<T>(observer) {
112112
private U lastEmittedKey;
113113
private boolean hasEmitted;
114114

@@ -135,13 +135,6 @@ public void onNext(T next) {
135135
}
136136
}
137137
});
138-
139-
return Subscriptions.create(new Action0() {
140-
@Override
141-
public void call() {
142-
sourceSub.unsubscribe();
143-
}
144-
});
145138
}
146139
}
147140
}

rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public DoOnEachObservable(Observable<? extends T> sequence, Observer<? super T>
4040

4141
@Override
4242
public Subscription onSubscribe(final Observer<? super T> observer) {
43-
return sequence.subscribe(new Observer<T>() {
43+
return sequence.subscribe(new Observer<T>(observer) {
4444
@Override
4545
public void onCompleted() {
4646
try {

rxjava-core/src/main/java/rx/operators/OperationFilter.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,15 @@ public Filter(Observable<? extends T> that, Func1<? super T, Boolean> predicate)
4343
}
4444

4545
public Subscription onSubscribe(final Observer<? super T> observer) {
46-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
47-
return subscription.wrap(that.subscribe(new Observer<T>() {
46+
return that.subscribe(new Observer<T>(observer) {
4847
public void onNext(T value) {
4948
try {
5049
if (predicate.call(value)) {
5150
observer.onNext(value);
5251
}
5352
} catch (Throwable ex) {
5453
observer.onError(ex);
55-
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
56-
subscription.unsubscribe();
54+
unsubscribe();
5755
}
5856
}
5957

@@ -64,7 +62,7 @@ public void onError(Throwable ex) {
6462
public void onCompleted() {
6563
observer.onCompleted();
6664
}
67-
}));
65+
});
6866
}
6967

7068
}

rxjava-core/src/main/java/rx/operators/OperationMaterialize.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,23 @@ public MaterializeObservable(Observable<? extends T> sequence) {
5454

5555
@Override
5656
public Subscription onSubscribe(final Observer<? super Notification<T>> observer) {
57-
return sequence.subscribe(new Observer<T>() {
57+
return sequence.subscribe(new Observer<T>(observer) {
5858

5959
@Override
6060
public void onCompleted() {
61-
observer.onNext(new Notification<T>());
61+
observer.onNext(Notification.<T>createOnCompleted());
6262
observer.onCompleted();
6363
}
6464

6565
@Override
6666
public void onError(Throwable e) {
67-
observer.onNext(new Notification<T>(e));
67+
observer.onNext(Notification.<T>createOnError(e));
6868
observer.onCompleted();
6969
}
7070

7171
@Override
7272
public void onNext(T value) {
73-
observer.onNext(new Notification<T>(value));
73+
observer.onNext(Notification.createOnNext(value));
7474
}
7575

7676
});

rxjava-core/src/main/java/rx/operators/OperationRepeat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
4747
compositeSubscription.add(scheduler.schedule(new Action1<Action0>() {
4848
@Override
4949
public void call(final Action0 self) {
50-
innerSubscription.set(source.subscribe(new Observer<T>() {
50+
innerSubscription.set(source.subscribe(new Observer<T>(observer) {
5151

5252
@Override
5353
public void onCompleted() {

0 commit comments

Comments
 (0)