Skip to content

Commit 307a6a6

Browse files
authored
2.x: fix withLatestFrom null checks, lifecycle (#4970)
1 parent bcdfb13 commit 307a6a6

File tree

4 files changed

+71
-22
lines changed

4 files changed

+71
-22
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import java.util.concurrent.atomic.AtomicReference;
16+
import java.util.concurrent.atomic.*;
1717

1818
import org.reactivestreams.*;
1919

2020
import io.reactivex.exceptions.Exceptions;
2121
import io.reactivex.functions.BiFunction;
22-
import io.reactivex.internal.subscriptions.*;
23-
import io.reactivex.plugins.RxJavaPlugins;
22+
import io.reactivex.internal.functions.ObjectHelper;
23+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2424
import io.reactivex.subscribers.SerializedSubscriber;
2525

2626
public final class FlowableWithLatestFrom<T, U, R> extends AbstractFlowableWithUpstream<T, R> {
@@ -37,6 +37,8 @@ protected void subscribeActual(Subscriber<? super R> s) {
3737
final SerializedSubscriber<R> serial = new SerializedSubscriber<R>(s);
3838
final WithLatestFromSubscriber<T, U, R> wlf = new WithLatestFromSubscriber<T, U, R>(serial, combiner);
3939

40+
serial.onSubscribe(wlf);
41+
4042
other.subscribe(new Subscriber<U>() {
4143
@Override
4244
public void onSubscribe(Subscription s) {
@@ -73,6 +75,8 @@ static final class WithLatestFromSubscriber<T, U, R> extends AtomicReference<U>
7375

7476
final AtomicReference<Subscription> s = new AtomicReference<Subscription>();
7577

78+
final AtomicLong requested = new AtomicLong();
79+
7680
final AtomicReference<Subscription> other = new AtomicReference<Subscription>();
7781

7882
WithLatestFromSubscriber(Subscriber<? super R> actual, BiFunction<? super T, ? super U, ? extends R> combiner) {
@@ -81,9 +85,7 @@ static final class WithLatestFromSubscriber<T, U, R> extends AtomicReference<U>
8185
}
8286
@Override
8387
public void onSubscribe(Subscription s) {
84-
if (SubscriptionHelper.setOnce(this.s, s)) {
85-
actual.onSubscribe(this);
86-
}
88+
SubscriptionHelper.deferredSetOnce(this.s, requested, s);
8789
}
8890

8991
@Override
@@ -92,7 +94,7 @@ public void onNext(T t) {
9294
if (u != null) {
9395
R r;
9496
try {
95-
r = combiner.apply(t, u);
97+
r = ObjectHelper.requireNonNull(combiner.apply(t, u), "The combiner returned a null value");
9698
} catch (Throwable e) {
9799
Exceptions.throwIfFatal(e);
98100
cancel();
@@ -117,12 +119,12 @@ public void onComplete() {
117119

118120
@Override
119121
public void request(long n) {
120-
s.get().request(n);
122+
SubscriptionHelper.deferredRequest(s, requested, n);
121123
}
122124

123125
@Override
124126
public void cancel() {
125-
s.get().cancel();
127+
SubscriptionHelper.cancel(s);
126128
SubscriptionHelper.cancel(other);
127129
}
128130

@@ -131,16 +133,8 @@ public boolean setOther(Subscription o) {
131133
}
132134

133135
public void otherError(Throwable e) {
134-
if (s.compareAndSet(null, SubscriptionHelper.CANCELLED)) {
135-
EmptySubscription.error(e, actual);
136-
} else {
137-
if (s.get() != SubscriptionHelper.CANCELLED) {
138-
cancel();
139-
actual.onError(e);
140-
} else {
141-
RxJavaPlugins.onError(e);
142-
}
143-
}
136+
SubscriptionHelper.cancel(s);
137+
actual.onError(e);
144138
}
145139
}
146140
}

src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFrom.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.exceptions.Exceptions;
2121
import io.reactivex.functions.BiFunction;
2222
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.functions.ObjectHelper;
2324
import io.reactivex.observers.SerializedObserver;
2425

2526
public final class ObservableWithLatestFrom<T, U, R> extends AbstractObservableWithUpstream<T, R> {
@@ -37,7 +38,7 @@ public void subscribeActual(Observer<? super R> t) {
3738
final SerializedObserver<R> serial = new SerializedObserver<R>(t);
3839
final WithLatestFromObserver<T, U, R> wlf = new WithLatestFromObserver<T, U, R>(serial, combiner);
3940

40-
t.onSubscribe(wlf);
41+
serial.onSubscribe(wlf);
4142

4243
other.subscribe(new Observer<U>() {
4344
@Override
@@ -91,7 +92,7 @@ public void onNext(T t) {
9192
if (u != null) {
9293
R r;
9394
try {
94-
r = combiner.apply(t, u);
95+
r = ObjectHelper.requireNonNull(combiner.apply(t, u), "The combiner returned a null value");
9596
} catch (Throwable e) {
9697
Exceptions.throwIfFatal(e);
9798
dispose();

src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,4 +681,31 @@ public Integer apply(Integer a, Integer b) throws Exception {
681681
RxJavaPlugins.reset();
682682
}
683683
}
684+
685+
@Test
686+
public void combineToNull1() {
687+
Flowable.just(1)
688+
.withLatestFrom(Flowable.just(2), new BiFunction<Integer, Integer, Object>() {
689+
@Override
690+
public Object apply(Integer a, Integer b) throws Exception {
691+
return null;
692+
}
693+
})
694+
.test()
695+
.assertFailure(NullPointerException.class);
696+
}
697+
698+
@SuppressWarnings("unchecked")
699+
@Test
700+
public void combineToNull2() {
701+
Flowable.just(1)
702+
.withLatestFrom(Arrays.asList(Flowable.just(2), Flowable.just(3)), new Function<Object[], Object>() {
703+
@Override
704+
public Object apply(Object[] o) throws Exception {
705+
return null;
706+
}
707+
})
708+
.test()
709+
.assertFailure(NullPointerException.class);
710+
}
684711
}

src/test/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.junit.*;
2222
import org.mockito.InOrder;
2323

24+
import io.reactivex.*;
2425
import io.reactivex.Observable;
2526
import io.reactivex.Observer;
26-
import io.reactivex.TestHelper;
2727
import io.reactivex.disposables.Disposables;
2828
import io.reactivex.exceptions.TestException;
2929
import io.reactivex.functions.*;
@@ -620,4 +620,31 @@ public Object apply(Integer a, Integer b, Integer c) throws Exception {
620620
RxJavaPlugins.reset();
621621
}
622622
}
623+
624+
@Test
625+
public void combineToNull1() {
626+
Observable.just(1)
627+
.withLatestFrom(Observable.just(2), new BiFunction<Integer, Integer, Object>() {
628+
@Override
629+
public Object apply(Integer a, Integer b) throws Exception {
630+
return null;
631+
}
632+
})
633+
.test()
634+
.assertFailure(NullPointerException.class);
635+
}
636+
637+
@SuppressWarnings("unchecked")
638+
@Test
639+
public void combineToNull2() {
640+
Observable.just(1)
641+
.withLatestFrom(Arrays.asList(Observable.just(2), Observable.just(3)), new Function<Object[], Object>() {
642+
@Override
643+
public Object apply(Object[] o) throws Exception {
644+
return null;
645+
}
646+
})
647+
.test()
648+
.assertFailure(NullPointerException.class);
649+
}
623650
}

0 commit comments

Comments
 (0)