Skip to content

Commit 39d49fc

Browse files
ObserveOn Fixes
- fix subscription leak (Composite+MultipleAssignment instead of just Composite) - add unit tests
1 parent 670cee3 commit 39d49fc

File tree

3 files changed

+174
-66
lines changed

3 files changed

+174
-66
lines changed

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import rx.schedulers.CurrentThreadScheduler;
2828
import rx.schedulers.ImmediateScheduler;
2929
import rx.subscriptions.CompositeSubscription;
30-
import rx.subscriptions.Subscriptions;
30+
import rx.subscriptions.MultipleAssignmentSubscription;
3131
import rx.util.functions.Action0;
3232
import rx.util.functions.Action1;
3333
import rx.util.functions.Func2;
@@ -64,62 +64,66 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
6464
return new Observation(observer).init();
6565
}
6666
}
67+
6768
/** Observe through individual queue per observer. */
68-
private class Observation implements Action1<Notification<? extends T>> {
69+
private class Observation {
6970
final Observer<? super T> observer;
70-
final CompositeSubscription s;
71-
final ConcurrentLinkedQueue<Notification<? extends T>> queue;
72-
final AtomicInteger counter;
71+
final CompositeSubscription compositeSubscription = new CompositeSubscription();
72+
final MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription();
73+
final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
74+
final AtomicInteger counter = new AtomicInteger(0);
7375
private volatile Scheduler recursiveScheduler;
76+
7477
public Observation(Observer<? super T> observer) {
7578
this.observer = observer;
76-
this.queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
77-
this.counter = new AtomicInteger(0);
78-
this.s = new CompositeSubscription();
7979
}
80+
8081
public Subscription init() {
81-
s.add(source.materialize().subscribe(this));
82-
return s;
82+
compositeSubscription.add(source.materialize().subscribe(new SourceObserver()));
83+
return compositeSubscription;
8384
}
8485

85-
@Override
86-
public void call(Notification<? extends T> e) {
87-
queue.offer(e);
88-
if (counter.getAndIncrement() == 0) {
89-
if (recursiveScheduler == null) {
90-
s.add(scheduler.schedule(null, new Func2<Scheduler, T, Subscription>() {
86+
private class SourceObserver implements Action1<Notification<? extends T>> {
87+
88+
@Override
89+
public void call(Notification<? extends T> e) {
90+
queue.offer(e);
91+
if (counter.getAndIncrement() == 0) {
92+
if (recursiveScheduler == null) {
93+
// compositeSubscription for the outer scheduler, recursive for inner
94+
compositeSubscription.add(scheduler.schedule(null, new Func2<Scheduler, T, Subscription>() {
9195
@Override
9296
public Subscription call(Scheduler innerScheduler, T state) {
9397
// record innerScheduler so 'processQueue' can use it for all subsequent executions
9498
recursiveScheduler = innerScheduler;
95-
99+
// once we have the innerScheduler we can start doing real work
96100
processQueue();
97-
98-
return Subscriptions.empty();
101+
return recursiveSubscription;
99102
}
100103
}));
101-
} else {
102-
processQueue();
104+
} else {
105+
processQueue();
106+
}
103107
}
104108
}
105-
}
106-
void processQueue() {
107-
s.add(recursiveScheduler.schedule(new Action1<Action0>() {
108-
@Override
109-
public void call(Action0 self) {
110-
Notification<? extends T> not = queue.poll();
111-
if (not != null) {
112-
not.accept(observer);
113-
}
114109

115-
// decrement count and if we still have work to do
116-
// recursively schedule ourselves to process again
117-
if (counter.decrementAndGet() > 0) {
118-
self.call();
119-
}
110+
void processQueue() {
111+
recursiveSubscription.setSubscription(recursiveScheduler.schedule(new Action1<Action0>() {
112+
@Override
113+
public void call(Action0 self) {
114+
Notification<? extends T> not = queue.poll();
115+
if (not != null) {
116+
not.accept(observer);
117+
}
120118

121-
}
122-
}));
119+
// decrement count and if we still have work to do
120+
// recursively schedule ourselves to process again
121+
if (counter.decrementAndGet() > 0) {
122+
self.call();
123+
}
124+
}
125+
}));
126+
}
123127
}
124128
}
125129
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
import org.junit.Test;
25+
26+
import rx.schedulers.Schedulers;
27+
28+
public class ErrorHandlingTests {
29+
30+
/**
31+
* Test that an error from a user provided Observer.onNext is handled and emitted to the onError
32+
*/
33+
@Test
34+
public void testOnNextError() throws InterruptedException {
35+
final CountDownLatch latch = new CountDownLatch(1);
36+
final AtomicReference<Throwable> caughtError = new AtomicReference<Throwable>();
37+
Observable<Long> o = Observable.interval(50, TimeUnit.MILLISECONDS);
38+
Observer<Long> observer = new Observer<Long>() {
39+
40+
@Override
41+
public void onCompleted() {
42+
System.out.println("completed");
43+
latch.countDown();
44+
}
45+
46+
@Override
47+
public void onError(Throwable e) {
48+
System.out.println("error: " + e);
49+
caughtError.set(e);
50+
latch.countDown();
51+
}
52+
53+
@Override
54+
public void onNext(Long args) {
55+
throw new RuntimeException("forced failure");
56+
}
57+
};
58+
o.subscribe(observer);
59+
60+
latch.await(2000, TimeUnit.MILLISECONDS);
61+
assertNotNull(caughtError.get());
62+
}
63+
64+
/**
65+
* Test that an error from a user provided Observer.onNext is handled and emitted to the onError
66+
* even when done across thread boundaries with observeOn
67+
*/
68+
@Test
69+
public void testOnNextErrorAcrossThread() throws InterruptedException {
70+
final CountDownLatch latch = new CountDownLatch(1);
71+
final AtomicReference<Throwable> caughtError = new AtomicReference<Throwable>();
72+
Observable<Long> o = Observable.interval(50, TimeUnit.MILLISECONDS);
73+
Observer<Long> observer = new Observer<Long>() {
74+
75+
@Override
76+
public void onCompleted() {
77+
System.out.println("completed");
78+
latch.countDown();
79+
}
80+
81+
@Override
82+
public void onError(Throwable e) {
83+
System.out.println("error: " + e);
84+
caughtError.set(e);
85+
latch.countDown();
86+
}
87+
88+
@Override
89+
public void onNext(Long args) {
90+
throw new RuntimeException("forced failure");
91+
}
92+
};
93+
o.observeOn(Schedulers.newThread()).subscribe(observer);
94+
95+
latch.await(2000, TimeUnit.MILLISECONDS);
96+
assertNotNull(caughtError.get());
97+
}
98+
}

rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import rx.Observer;
3333
import rx.schedulers.Schedulers;
3434
import rx.schedulers.TestScheduler;
35+
import rx.util.functions.Action0;
3536
import rx.util.functions.Action1;
3637

3738
public class OperationObserveOnTest {
@@ -88,71 +89,75 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
8889
@Test
8990
@SuppressWarnings("unchecked")
9091
public void testThreadName() throws InterruptedException {
92+
System.out.println("Main Thread: " + Thread.currentThread().getName());
9193
Observable<String> obs = Observable.from("one", null, "two", "three", "four");
9294

9395
Observer<String> observer = mock(Observer.class);
94-
95-
InOrder inOrder = inOrder(observer);
96+
final String parentThreadName = Thread.currentThread().getName();
9697

9798
final CountDownLatch completedLatch = new CountDownLatch(1);
98-
doAnswer(new Answer<Void>() {
99-
100-
@Override
101-
public Void answer(InvocationOnMock invocation) throws Throwable {
102-
completedLatch.countDown();
10399

104-
return null;
105-
}
106-
}).when(observer).onCompleted();
107-
108-
doAnswer(new Answer<Void>() {
100+
// assert subscribe is on main thread
101+
obs = obs.doOnEach(new Action1<String>() {
109102

110103
@Override
111-
public Void answer(InvocationOnMock invocation) throws Throwable {
112-
completedLatch.countDown();
113-
114-
return null;
104+
public void call(String s) {
105+
String threadName = Thread.currentThread().getName();
106+
System.out.println("Source ThreadName: " + threadName + " Expected => " + parentThreadName);
107+
assertEquals(parentThreadName, threadName);
115108
}
116-
}).when(observer).onError(any(Exception.class));
117109

110+
});
111+
112+
// assert observe is on new thread
118113
obs.observeOn(Schedulers.newThread()).doOnEach(new Action1<String>() {
119114

120115
@Override
121116
public void call(String t1) {
122117
String threadName = Thread.currentThread().getName();
123118
boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler");
124-
System.out.println("ThreadName: " + threadName + " Correct => " + correctThreadName);
119+
System.out.println("ObserveOn ThreadName: " + threadName + " Correct => " + correctThreadName);
125120
assertTrue(correctThreadName);
126121
}
127122

123+
}).finallyDo(new Action0() {
124+
125+
@Override
126+
public void call() {
127+
completedLatch.countDown();
128+
129+
}
128130
}).subscribe(observer);
129131

130132
if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) {
131133
fail("timed out waiting");
132134
}
133135

134-
inOrder.verify(observer, times(1)).onCompleted();
136+
verify(observer, never()).onError(any(Throwable.class));
137+
verify(observer, times(5)).onNext(any(String.class));
138+
verify(observer, times(1)).onCompleted();
135139
}
140+
136141
@Test
137142
public void observeOnTheSameSchedulerTwice() {
138143
TestScheduler scheduler = new TestScheduler();
139-
144+
140145
Observable<Integer> o = Observable.from(1, 2, 3);
141146
Observable<Integer> o2 = o.observeOn(scheduler);
142147

143148
@SuppressWarnings("unchecked")
144149
Observer<Object> observer1 = mock(Observer.class);
145150
@SuppressWarnings("unchecked")
146151
Observer<Object> observer2 = mock(Observer.class);
147-
152+
148153
InOrder inOrder1 = inOrder(observer1);
149154
InOrder inOrder2 = inOrder(observer2);
150-
155+
151156
o2.subscribe(observer1);
152157
o2.subscribe(observer2);
153-
158+
154159
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
155-
160+
156161
inOrder1.verify(observer1, times(1)).onNext(1);
157162
inOrder1.verify(observer1, times(1)).onNext(2);
158163
inOrder1.verify(observer1, times(1)).onNext(3);
@@ -168,11 +173,12 @@ public void observeOnTheSameSchedulerTwice() {
168173
inOrder2.verifyNoMoreInteractions();
169174

170175
}
176+
171177
@Test
172178
public void observeSameOnMultipleSchedulers() {
173179
TestScheduler scheduler1 = new TestScheduler();
174180
TestScheduler scheduler2 = new TestScheduler();
175-
181+
176182
Observable<Integer> o = Observable.from(1, 2, 3);
177183
Observable<Integer> o1 = o.observeOn(scheduler1);
178184
Observable<Integer> o2 = o.observeOn(scheduler2);
@@ -181,16 +187,16 @@ public void observeSameOnMultipleSchedulers() {
181187
Observer<Object> observer1 = mock(Observer.class);
182188
@SuppressWarnings("unchecked")
183189
Observer<Object> observer2 = mock(Observer.class);
184-
190+
185191
InOrder inOrder1 = inOrder(observer1);
186192
InOrder inOrder2 = inOrder(observer2);
187-
193+
188194
o1.subscribe(observer1);
189195
o2.subscribe(observer2);
190-
196+
191197
scheduler1.advanceTimeBy(1, TimeUnit.SECONDS);
192198
scheduler2.advanceTimeBy(1, TimeUnit.SECONDS);
193-
199+
194200
inOrder1.verify(observer1, times(1)).onNext(1);
195201
inOrder1.verify(observer1, times(1)).onNext(2);
196202
inOrder1.verify(observer1, times(1)).onNext(3);

0 commit comments

Comments
 (0)