Skip to content

Commit 788873e

Browse files
authored
2.x: coverage, cleanup, fixes 10/15-2 (#4712)
* 2.x: coverage, cleanup, fixes 10/15-2 * Add missing header
1 parent fe4acf2 commit 788873e

28 files changed

+1796
-370
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3034,7 +3034,7 @@ public static Observable<Long> rangeLong(long start, long count) {
30343034
}
30353035

30363036
/**
3037-
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
3037+
* Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the
30383038
* same by comparing the items emitted by each ObservableSource pairwise.
30393039
* <p>
30403040
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="">
@@ -3049,16 +3049,16 @@ public static Observable<Long> rangeLong(long start, long count) {
30493049
* the second ObservableSource to compare
30503050
* @param <T>
30513051
* the type of items emitted by each ObservableSource
3052-
* @return an Observable that emits a Boolean value that indicates whether the two sequences are the same
3052+
* @return a Single that emits a Boolean value that indicates whether the two sequences are the same
30533053
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
30543054
*/
30553055
@SchedulerSupport(SchedulerSupport.NONE)
3056-
public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2) {
3056+
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2) {
30573057
return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize());
30583058
}
30593059

30603060
/**
3061-
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
3061+
* Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the
30623062
* same by comparing the items emitted by each ObservableSource pairwise based on the results of a specified
30633063
* equality function.
30643064
* <p>
@@ -3076,18 +3076,18 @@ public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T
30763076
* a function used to compare items emitted by each ObservableSource
30773077
* @param <T>
30783078
* the type of items emitted by each ObservableSource
3079-
* @return an Observable that emits a Boolean value that indicates whether the two ObservableSource two sequences
3079+
* @return a Single that emits a Boolean value that indicates whether the two ObservableSource two sequences
30803080
* are the same according to the specified function
30813081
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
30823082
*/
30833083
@SchedulerSupport(SchedulerSupport.NONE)
3084-
public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
3084+
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
30853085
BiPredicate<? super T, ? super T> isEqual) {
30863086
return sequenceEqual(source1, source2, isEqual, bufferSize());
30873087
}
30883088

30893089
/**
3090-
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
3090+
* Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the
30913091
* same by comparing the items emitted by each ObservableSource pairwise based on the results of a specified
30923092
* equality function.
30933093
* <p>
@@ -3112,17 +3112,17 @@ public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T
31123112
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
31133113
*/
31143114
@SchedulerSupport(SchedulerSupport.NONE)
3115-
public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
3115+
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
31163116
BiPredicate<? super T, ? super T> isEqual, int bufferSize) {
31173117
ObjectHelper.requireNonNull(source1, "source1 is null");
31183118
ObjectHelper.requireNonNull(source2, "source2 is null");
31193119
ObjectHelper.requireNonNull(isEqual, "isEqual is null");
31203120
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
3121-
return RxJavaPlugins.onAssembly(new ObservableSequenceEqual<T>(source1, source2, isEqual, bufferSize));
3121+
return RxJavaPlugins.onAssembly(new ObservableSequenceEqualSingle<T>(source1, source2, isEqual, bufferSize));
31223122
}
31233123

31243124
/**
3125-
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
3125+
* Returns a Single that emits a Boolean value that indicates whether two ObservableSource sequences are the
31263126
* same by comparing the items emitted by each ObservableSource pairwise.
31273127
* <p>
31283128
* <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="">
@@ -3139,11 +3139,11 @@ public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T
31393139
* the number of items to prefetch from the first and second source ObservableSource
31403140
* @param <T>
31413141
* the type of items emitted by each ObservableSource
3142-
* @return an Observable that emits a Boolean value that indicates whether the two sequences are the same
3142+
* @return a Single that emits a Boolean value that indicates whether the two sequences are the same
31433143
* @see <a href="http://reactivex.io/documentation/operators/sequenceequal.html">ReactiveX operators documentation: SequenceEqual</a>
31443144
*/
31453145
@SchedulerSupport(SchedulerSupport.NONE)
3146-
public static <T> Observable<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
3146+
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
31473147
int bufferSize) {
31483148
return sequenceEqual(source1, source2, ObjectHelper.equalsPredicate(), bufferSize);
31493149
}
@@ -6319,7 +6319,7 @@ public final <K> Observable<T> distinct(Function<? super T, K> keySelector) {
63196319
public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier) {
63206320
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
63216321
ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
6322-
return ObservableDistinct.withCollection(this, keySelector, collectionSupplier);
6322+
return new ObservableDistinct<T, K>(this, keySelector, collectionSupplier);
63236323
}
63246324

63256325
/**
@@ -6338,7 +6338,7 @@ public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Call
63386338
*/
63396339
@SchedulerSupport(SchedulerSupport.NONE)
63406340
public final Observable<T> distinctUntilChanged() {
6341-
return ObservableDistinct.<T>untilChanged(this);
6341+
return new ObservableDistinctUntilChanged<T>(this, Functions.equalsPredicate());
63426342
}
63436343

63446344
/**
@@ -6362,7 +6362,7 @@ public final Observable<T> distinctUntilChanged() {
63626362
@SchedulerSupport(SchedulerSupport.NONE)
63636363
public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector) {
63646364
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
6365-
return ObservableDistinct.untilChanged(this, keySelector);
6365+
return new ObservableDistinctUntilChanged<T>(this, Functions.equalsPredicate(keySelector));
63666366
}
63676367

63686368
/**

src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public void dispose() {
5454

5555
@Override
5656
public boolean isDisposed() {
57-
return cancelled;
57+
Disposable d = resource;
58+
return d != null ? d.isDisposed() : cancelled;
5859
}
5960

6061
void disposeResource() {

src/main/java/io/reactivex/internal/functions/Functions.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,4 +637,29 @@ public static <T> Function<List<T>, List<T>> listSorter(final Comparator<? super
637637
return new ListSorter<T>(comparator);
638638
}
639639

640+
static final BiPredicate<Object, Object> DEFAULT_EQUALS_PREDICATE = equalsPredicate(Functions.identity());
641+
642+
@SuppressWarnings("unchecked")
643+
public static <T> BiPredicate<T, T> equalsPredicate() {
644+
return (BiPredicate<T, T>)DEFAULT_EQUALS_PREDICATE;
645+
}
646+
647+
static final class KeyedEqualsPredicate<T, K> implements BiPredicate<T, T> {
648+
final Function<? super T, K> keySelector;
649+
650+
KeyedEqualsPredicate(Function<? super T, K> keySelector) {
651+
this.keySelector = keySelector;
652+
}
653+
654+
@Override
655+
public boolean test(T t1, T t2) throws Exception {
656+
K k1 = ObjectHelper.requireNonNull(keySelector.apply(t1), "The keySelector returned a null key");
657+
K k2 = ObjectHelper.requireNonNull(keySelector.apply(t2), "The keySelector returned a null key");
658+
return ObjectHelper.equals(k1, k2);
659+
}
660+
}
661+
662+
public static <T, K> BiPredicate<T, T> equalsPredicate(Function<? super T, K> keySelector) {
663+
return new KeyedEqualsPredicate<T, K>(keySelector);
664+
}
640665
}

src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import io.reactivex.internal.functions.ObjectHelper;
1716
import java.util.Arrays;
1817
import java.util.concurrent.atomic.*;
1918

2019
import io.reactivex.*;
2120
import io.reactivex.disposables.Disposable;
22-
import io.reactivex.exceptions.*;
21+
import io.reactivex.exceptions.Exceptions;
2322
import io.reactivex.functions.Function;
2423
import io.reactivex.internal.disposables.*;
24+
import io.reactivex.internal.functions.ObjectHelper;
2525
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
26+
import io.reactivex.internal.util.AtomicThrowable;
2627
import io.reactivex.plugins.RxJavaPlugins;
2728

2829
public final class ObservableCombineLatest<T, R> extends Observable<R> {
@@ -86,7 +87,7 @@ static final class LatestCoordinator<T, R> extends AtomicInteger implements Disp
8687

8788
volatile boolean done;
8889

89-
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
90+
final AtomicThrowable errors = new AtomicThrowable();
9091

9192
int active;
9293
int complete;
@@ -181,7 +182,7 @@ void combine(T value, int index) {
181182
if (value != null && f) {
182183
queue.offer(cs, latest.clone());
183184
} else
184-
if (value == null && error.get() != null) {
185+
if (value == null && errors.get() != null) {
185186
done = true; // if this source completed without a value
186187
}
187188
} else {
@@ -227,13 +228,6 @@ void drain() {
227228
@SuppressWarnings("unchecked")
228229
T[] array = (T[])q.poll();
229230

230-
if (array == null) {
231-
cancelled = true;
232-
cancel(q);
233-
a.onError(new IllegalStateException("Broken queue?! Sender received but not the array."));
234-
return;
235-
}
236-
237231
R v;
238232
try {
239233
v = ObjectHelper.requireNonNull(combiner.apply(array), "The combiner returned a null");
@@ -265,7 +259,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer<?> a, SpscLinkedArray
265259
if (delayError) {
266260
if (empty) {
267261
clear(queue);
268-
Throwable e = error.get();
262+
Throwable e = errors.terminate();
269263
if (e != null) {
270264
a.onError(e);
271265
} else {
@@ -274,10 +268,10 @@ boolean checkTerminated(boolean d, boolean empty, Observer<?> a, SpscLinkedArray
274268
return true;
275269
}
276270
} else {
277-
Throwable e = error.get();
271+
Throwable e = errors.get();
278272
if (e != null) {
279273
cancel(q);
280-
a.onError(e);
274+
a.onError(errors.terminate());
281275
return true;
282276
} else
283277
if (empty) {
@@ -291,26 +285,16 @@ boolean checkTerminated(boolean d, boolean empty, Observer<?> a, SpscLinkedArray
291285
}
292286

293287
void onError(Throwable e) {
294-
for (;;) {
295-
Throwable curr = error.get();
296-
if (curr != null) {
297-
CompositeException ce = new CompositeException(curr, e);
298-
e = ce;
299-
}
300-
Throwable next = e;
301-
if (error.compareAndSet(curr, next)) {
302-
return;
303-
}
288+
if (!errors.addThrowable(e)) {
289+
RxJavaPlugins.onError(e);
304290
}
305291
}
306292
}
307293

308-
static final class CombinerObserver<T, R> implements Observer<T>, Disposable {
294+
static final class CombinerObserver<T, R> implements Observer<T> {
309295
final LatestCoordinator<T, R> parent;
310296
final int index;
311297

312-
boolean done;
313-
314298
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
315299

316300
CombinerObserver(LatestCoordinator<T, R> parent, int index) {
@@ -325,40 +309,22 @@ public void onSubscribe(Disposable s) {
325309

326310
@Override
327311
public void onNext(T t) {
328-
if (done) {
329-
return;
330-
}
331312
parent.combine(t, index);
332313
}
333314

334315
@Override
335316
public void onError(Throwable t) {
336-
if (done) {
337-
RxJavaPlugins.onError(t);
338-
return;
339-
}
340317
parent.onError(t);
341-
done = true;
342318
parent.combine(null, index);
343319
}
344320

345321
@Override
346322
public void onComplete() {
347-
if (done) {
348-
return;
349-
}
350-
done = true;
351323
parent.combine(null, index);
352324
}
353325

354-
@Override
355326
public void dispose() {
356327
DisposableHelper.dispose(s);
357328
}
358-
359-
@Override
360-
public boolean isDisposed() {
361-
return s.get() == DisposableHelper.DISPOSED;
362-
}
363329
}
364330
}

0 commit comments

Comments
 (0)