Skip to content

Commit 2fa773c

Browse files
authored
2.x: add missing null checks on values returned by user functions (#5379)
1 parent 815c5d2 commit 2fa773c

35 files changed

+315
-31
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ public R poll() throws Exception {
474474
return null;
475475
}
476476
T[] a = (T[])queue.poll();
477-
R r = combiner.apply(a);
477+
R r = ObjectHelper.requireNonNull(combiner.apply(a), "The combiner returned a null value");
478478
((CombineLatestInnerSubscriber<T>)e).requestOne();
479479
return r;
480480
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.*;
2121
import io.reactivex.flowables.ConnectableFlowable;
2222
import io.reactivex.functions.*;
23-
import io.reactivex.internal.functions.Functions;
23+
import io.reactivex.internal.functions.*;
2424

2525
/**
2626
* Helper utility class to support Flowable with inner classes.
@@ -77,7 +77,8 @@ static final class ItemDelayFunction<T, U> implements Function<T, Publisher<T>>
7777

7878
@Override
7979
public Publisher<T> apply(final T v) throws Exception {
80-
return new FlowableTakePublisher<U>(itemDelay.apply(v), 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
80+
Publisher<U> p = ObjectHelper.requireNonNull(itemDelay.apply(v), "The itemDelay returned a null Publisher");
81+
return new FlowableTakePublisher<U>(p, 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
8182
}
8283
}
8384

@@ -164,7 +165,7 @@ static final class FlatMapWithCombinerOuter<T, R, U> implements Function<T, Publ
164165
@Override
165166
public Publisher<R> apply(final T t) throws Exception {
166167
@SuppressWarnings("unchecked")
167-
Publisher<U> u = (Publisher<U>)mapper.apply(t);
168+
Publisher<U> u = (Publisher<U>)ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
168169
return new FlowableMapPublisher<U, R>(u, new FlatMapWithCombinerInner<U, R, T>(combiner, t));
169170
}
170171
}
@@ -184,7 +185,7 @@ static final class FlatMapIntoIterable<T, U> implements Function<T, Publisher<U>
184185

185186
@Override
186187
public Publisher<U> apply(T t) throws Exception {
187-
return new FlowableFromIterable<U>(mapper.apply(t));
188+
return new FlowableFromIterable<U>(ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Iterable"));
188189
}
189190
}
190191

@@ -317,7 +318,8 @@ static final class ReplayFunction<T, R> implements Function<Flowable<T>, Publish
317318

318319
@Override
319320
public Publisher<R> apply(Flowable<T> t) throws Exception {
320-
return Flowable.fromPublisher(selector.apply(t)).observeOn(scheduler);
321+
Publisher<R> p = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Publisher");
322+
return Flowable.fromPublisher(p).observeOn(scheduler);
321323
}
322324
}
323325
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.reactivestreams.Subscriber;
1919

2020
import io.reactivex.Flowable;
21-
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.exceptions.*;
2222
import io.reactivex.functions.Function;
2323
import io.reactivex.internal.functions.ObjectHelper;
2424
import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber;
@@ -87,7 +87,7 @@ public void onError(Throwable t) {
8787
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError publisher returned is null");
8888
} catch (Throwable e) {
8989
Exceptions.throwIfFatal(e);
90-
actual.onError(e);
90+
actual.onError(new CompositeException(t, e));
9191
return;
9292
}
9393

src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.reactivex.*;
2222
import io.reactivex.exceptions.*;
2323
import io.reactivex.functions.*;
24+
import io.reactivex.internal.functions.ObjectHelper;
2425
import io.reactivex.internal.subscriptions.*;
2526
import io.reactivex.plugins.RxJavaPlugins;
2627

@@ -54,7 +55,7 @@ public void subscribeActual(Subscriber<? super T> s) {
5455

5556
Publisher<? extends T> source;
5657
try {
57-
source = sourceSupplier.apply(resource);
58+
source = ObjectHelper.requireNonNull(sourceSupplier.apply(resource), "The sourceSupplier returned a null Publisher");
5859
} catch (Throwable e) {
5960
Exceptions.throwIfFatal(e);
6061
try {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public void onNext(T t) {
168168
R v;
169169

170170
try {
171-
v = ObjectHelper.requireNonNull(combiner.apply(objects), "combiner returned a null value");
171+
v = ObjectHelper.requireNonNull(combiner.apply(objects), "The combiner returned a null value");
172172
} catch (Throwable ex) {
173173
Exceptions.throwIfFatal(ex);
174174
cancel();
@@ -297,7 +297,7 @@ public void dispose() {
297297
final class SingletonArrayFunc implements Function<T, R> {
298298
@Override
299299
public R apply(T t) throws Exception {
300-
return combiner.apply(new Object[] { t });
300+
return ObjectHelper.requireNonNull(combiner.apply(new Object[] { t }), "The combiner returned a null value");
301301
}
302302
}
303303
}

src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public void onComplete() {
192192
final class SingletonArrayFunc implements Function<T, R> {
193193
@Override
194194
public R apply(T t) throws Exception {
195-
return zipper.apply(new Object[] { t });
195+
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
196196
}
197197
}
198198
}

src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.exceptions.Exceptions;
2020
import io.reactivex.functions.Function;
2121
import io.reactivex.internal.disposables.EmptyDisposable;
22+
import io.reactivex.internal.functions.ObjectHelper;
2223
import io.reactivex.internal.operators.maybe.MaybeZipArray.ZipCoordinator;
2324

2425
public final class MaybeZipIterable<T, R> extends Maybe<R> {
@@ -81,7 +82,7 @@ protected void subscribeActual(MaybeObserver<? super R> observer) {
8182
final class SingletonArrayFunc implements Function<T, R> {
8283
@Override
8384
public R apply(T t) throws Exception {
84-
return zipper.apply(new Object[] { t });
85+
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
8586
}
8687
}
8788
}

src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ static final class ItemDelayFunction<T, U> implements Function<T, ObservableSour
7777

7878
@Override
7979
public ObservableSource<T> apply(final T v) throws Exception {
80-
return new ObservableTake<U>(itemDelay.apply(v), 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
80+
ObservableSource<U> o = ObjectHelper.requireNonNull(itemDelay.apply(v), "The itemDelay returned a null ObservableSource");
81+
return new ObservableTake<U>(o, 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
8182
}
8283
}
8384

@@ -165,7 +166,7 @@ static final class FlatMapWithCombinerOuter<T, R, U> implements Function<T, Obse
165166
@Override
166167
public ObservableSource<R> apply(final T t) throws Exception {
167168
@SuppressWarnings("unchecked")
168-
ObservableSource<U> u = (ObservableSource<U>)mapper.apply(t);
169+
ObservableSource<U> u = (ObservableSource<U>)ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
169170
return new ObservableMap<U, R>(u, new FlatMapWithCombinerInner<U, R, T>(combiner, t));
170171
}
171172
}
@@ -185,7 +186,7 @@ static final class FlatMapIntoIterable<T, U> implements Function<T, ObservableSo
185186

186187
@Override
187188
public ObservableSource<U> apply(T t) throws Exception {
188-
return new ObservableFromIterable<U>(mapper.apply(t));
189+
return new ObservableFromIterable<U>(ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Iterable"));
189190
}
190191
}
191192

@@ -319,7 +320,7 @@ static final class ObservableMapper<T,R> implements Function<T,Observable<R>> {
319320
@Override
320321
public Observable<R> apply(T t) throws Exception {
321322
return RxJavaPlugins.onAssembly(new SingleToObservable<R>(
322-
ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value")));
323+
ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource")));
323324
}
324325

325326
}
@@ -403,7 +404,8 @@ static final class ReplayFunction<T, R> implements Function<Observable<T>, Obser
403404

404405
@Override
405406
public ObservableSource<R> apply(Observable<T> t) throws Exception {
406-
return Observable.wrap(selector.apply(t)).observeOn(scheduler);
407+
ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
408+
return Observable.wrap(apply).observeOn(scheduler);
407409
}
408410
}
409411
}

src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import io.reactivex.internal.functions.ObjectHelper;
1716
import java.util.concurrent.Callable;
1817

1918
import io.reactivex.*;
2019
import io.reactivex.disposables.Disposable;
21-
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.exceptions.*;
2221
import io.reactivex.functions.Function;
2322
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.functions.ObjectHelper;
2424

2525
public final class ObservableMapNotification<T, R> extends AbstractObservableWithUpstream<T, ObservableSource<? extends R>> {
2626

@@ -106,7 +106,7 @@ public void onError(Throwable t) {
106106
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError ObservableSource returned is null");
107107
} catch (Throwable e) {
108108
Exceptions.throwIfFatal(e);
109-
actual.onError(e);
109+
actual.onError(new CompositeException(t, e));
110110
return;
111111
}
112112

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.exceptions.Exceptions;
2525
import io.reactivex.functions.*;
2626
import io.reactivex.internal.disposables.*;
27+
import io.reactivex.internal.functions.ObjectHelper;
2728
import io.reactivex.internal.fuseable.HasUpstreamObservableSource;
2829
import io.reactivex.internal.util.*;
2930
import io.reactivex.observables.ConnectableObservable;
@@ -1026,8 +1027,8 @@ protected void subscribeActual(Observer<? super R> child) {
10261027
ConnectableObservable<U> co;
10271028
ObservableSource<R> observable;
10281029
try {
1029-
co = connectableFactory.call();
1030-
observable = selector.apply(co);
1030+
co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned a null ConnectableObservable");
1031+
observable = ObjectHelper.requireNonNull(selector.apply(co), "The selector returned a null ObservableSource");
10311032
} catch (Throwable e) {
10321033
Exceptions.throwIfFatal(e);
10331034
EmptyDisposable.error(e, child);

0 commit comments

Comments
 (0)