|
15 | 15 | */ |
16 | 16 | package rx.operators; |
17 | 17 |
|
18 | | -import static org.junit.Assert.*; |
19 | | -import static org.mockito.Matchers.*; |
20 | | -import static org.mockito.Mockito.*; |
| 18 | +import static org.junit.Assert.assertTrue; |
| 19 | +import static org.junit.Assert.fail; |
| 20 | +import static org.mockito.Matchers.any; |
| 21 | +import static org.mockito.Matchers.anyInt; |
| 22 | +import static org.mockito.Matchers.anyString; |
| 23 | +import static org.mockito.Mockito.inOrder; |
| 24 | +import static org.mockito.Mockito.mock; |
| 25 | +import static org.mockito.Mockito.never; |
| 26 | +import static org.mockito.Mockito.times; |
| 27 | +import static org.mockito.Mockito.verify; |
| 28 | +import static org.mockito.Mockito.verifyNoMoreInteractions; |
21 | 29 |
|
22 | 30 | import java.util.concurrent.atomic.AtomicBoolean; |
23 | 31 | import java.util.concurrent.atomic.AtomicInteger; |
24 | 32 |
|
25 | 33 | import org.junit.Test; |
| 34 | +import org.mockito.InOrder; |
26 | 35 |
|
27 | 36 | import rx.Observable; |
28 | 37 | import rx.Observable.OnSubscribeFunc; |
29 | 38 | import rx.Observer; |
30 | 39 | import rx.Subscription; |
31 | 40 | import rx.subscriptions.Subscriptions; |
| 41 | +import rx.util.functions.Func1; |
32 | 42 |
|
33 | 43 | /** |
34 | 44 | * Returns an Observable that emits the first <code>num</code> items emitted by the source |
@@ -114,30 +124,47 @@ private class ItemObserver implements Observer<T> { |
114 | 124 | private final Observer<? super T> observer; |
115 | 125 |
|
116 | 126 | private final AtomicInteger counter = new AtomicInteger(); |
| 127 | + private volatile boolean hasEmitedError = false; |
117 | 128 |
|
118 | 129 | public ItemObserver(Observer<? super T> observer) { |
119 | 130 | this.observer = observer; |
120 | 131 | } |
121 | 132 |
|
122 | 133 | @Override |
123 | 134 | public void onCompleted() { |
| 135 | + if (hasEmitedError) { |
| 136 | + return; |
| 137 | + } |
124 | 138 | if (counter.getAndSet(num) < num) { |
125 | 139 | observer.onCompleted(); |
126 | 140 | } |
127 | 141 | } |
128 | 142 |
|
129 | 143 | @Override |
130 | 144 | public void onError(Throwable e) { |
| 145 | + if (hasEmitedError) { |
| 146 | + return; |
| 147 | + } |
131 | 148 | if (counter.getAndSet(num) < num) { |
132 | 149 | observer.onError(e); |
133 | 150 | } |
134 | 151 | } |
135 | 152 |
|
136 | 153 | @Override |
137 | 154 | public void onNext(T args) { |
| 155 | + if (hasEmitedError) { |
| 156 | + return; |
| 157 | + } |
138 | 158 | final int count = counter.incrementAndGet(); |
139 | 159 | if (count <= num) { |
140 | | - observer.onNext(args); |
| 160 | + try { |
| 161 | + observer.onNext(args); |
| 162 | + } catch (Throwable ex) { |
| 163 | + hasEmitedError = true; |
| 164 | + observer.onError(ex); |
| 165 | + subscription.unsubscribe(); |
| 166 | + return; |
| 167 | + } |
141 | 168 | if (count == num) { |
142 | 169 | observer.onCompleted(); |
143 | 170 | } |
@@ -184,6 +211,47 @@ public void testTake2() { |
184 | 211 | verify(aObserver, times(1)).onCompleted(); |
185 | 212 | } |
186 | 213 |
|
| 214 | + @Test(expected = IllegalArgumentException.class) |
| 215 | + public void testTakeWithError() { |
| 216 | + Observable.from(1, 2, 3).take(1).map(new Func1<Integer, Integer>() { |
| 217 | + public Integer call(Integer t1) { |
| 218 | + throw new IllegalArgumentException("some error"); |
| 219 | + } |
| 220 | + }).toBlockingObservable().single(); |
| 221 | + } |
| 222 | + |
| 223 | + @Test |
| 224 | + public void testTakeWithErrorHappeningInOnNext() { |
| 225 | + Observable<Integer> w = Observable.from(1, 2, 3).take(2).map(new Func1<Integer, Integer>() { |
| 226 | + public Integer call(Integer t1) { |
| 227 | + throw new IllegalArgumentException("some error"); |
| 228 | + } |
| 229 | + }); |
| 230 | + |
| 231 | + @SuppressWarnings("unchecked") |
| 232 | + Observer<Integer> observer = mock(Observer.class); |
| 233 | + w.subscribe(observer); |
| 234 | + InOrder inOrder = inOrder(observer); |
| 235 | + inOrder.verify(observer, times(1)).onError(any(IllegalArgumentException.class)); |
| 236 | + inOrder.verifyNoMoreInteractions(); |
| 237 | + } |
| 238 | + |
| 239 | + @Test |
| 240 | + public void testTakeWithErrorHappeningInTheLastOnNext() { |
| 241 | + Observable<Integer> w = Observable.from(1, 2, 3).take(1).map(new Func1<Integer, Integer>() { |
| 242 | + public Integer call(Integer t1) { |
| 243 | + throw new IllegalArgumentException("some error"); |
| 244 | + } |
| 245 | + }); |
| 246 | + |
| 247 | + @SuppressWarnings("unchecked") |
| 248 | + Observer<Integer> observer = mock(Observer.class); |
| 249 | + w.subscribe(observer); |
| 250 | + InOrder inOrder = inOrder(observer); |
| 251 | + inOrder.verify(observer, times(1)).onError(any(IllegalArgumentException.class)); |
| 252 | + inOrder.verifyNoMoreInteractions(); |
| 253 | + } |
| 254 | + |
187 | 255 | @Test |
188 | 256 | public void testTakeDoesntLeakErrors() { |
189 | 257 | Observable<String> source = Observable.create(new OnSubscribeFunc<String>() |
|
0 commit comments