Skip to content

Commit ca4bba6

Browse files
vanniktechakarnokd
authored andcommitted
2.x: More usage of ObjectHelper (#4526)
1 parent 9966209 commit ca4bba6

File tree

10 files changed

+23
-55
lines changed

10 files changed

+23
-55
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,7 @@ public static Completable concat(Publisher<? extends CompletableSource> sources)
161161
@SchedulerSupport(SchedulerSupport.NONE)
162162
public static Completable concat(Publisher<? extends CompletableSource> sources, int prefetch) {
163163
ObjectHelper.requireNonNull(sources, "sources is null");
164-
if (prefetch < 1) {
165-
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
166-
}
164+
ObjectHelper.verifyPositive(prefetch, "prefetch");
167165
return RxJavaPlugins.onAssembly(new CompletableConcat(sources, prefetch));
168166
}
169167

@@ -477,9 +475,7 @@ public static Completable merge(Publisher<? extends CompletableSource> sources,
477475
@SchedulerSupport(SchedulerSupport.NONE)
478476
private static Completable merge0(Publisher<? extends CompletableSource> sources, int maxConcurrency, boolean delayErrors) {
479477
ObjectHelper.requireNonNull(sources, "sources is null");
480-
if (maxConcurrency < 1) {
481-
throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);
482-
}
478+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
483479
return RxJavaPlugins.onAssembly(new CompletableMerge(sources, maxConcurrency, delayErrors));
484480
}
485481

src/main/java/io/reactivex/Flowable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5399,6 +5399,9 @@ public final Flowable<List<T>> buffer(int count, int skip) {
53995399
@BackpressureSupport(BackpressureKind.FULL)
54005400
@SchedulerSupport(SchedulerSupport.NONE)
54015401
public final <U extends Collection<? super T>> Flowable<U> buffer(int count, int skip, Callable<U> bufferSupplier) {
5402+
ObjectHelper.verifyPositive(count, "count");
5403+
ObjectHelper.verifyPositive(skip, "skip");
5404+
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
54025405
return RxJavaPlugins.onAssembly(new FlowableBuffer<T, U>(this, count, skip, bufferSupplier));
54035406
}
54045407

@@ -6535,6 +6538,7 @@ public final <U> Flowable<U> concatMapIterable(Function<? super T, ? extends Ite
65356538
@SchedulerSupport(SchedulerSupport.NONE)
65366539
public final <U> Flowable<U> concatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper, int prefetch) {
65376540
ObjectHelper.requireNonNull(mapper, "mapper is null");
6541+
ObjectHelper.verifyPositive(prefetch, "prefetch");
65386542
return RxJavaPlugins.onAssembly(new FlowableFlattenIterable<T, U>(this, mapper, prefetch));
65396543
}
65406544

@@ -8198,6 +8202,8 @@ public final <U> Flowable<U> flatMapIterable(final Function<? super T, ? extends
81988202
@BackpressureSupport(BackpressureKind.FULL)
81998203
@SchedulerSupport(SchedulerSupport.NONE)
82008204
public final <U> Flowable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper, int bufferSize) {
8205+
ObjectHelper.requireNonNull(mapper, "mapper is null");
8206+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
82018207
return RxJavaPlugins.onAssembly(new FlowableFlattenIterable<T, U>(this, mapper, bufferSize));
82028208
}
82038209

src/main/java/io/reactivex/internal/operators/flowable/FlowableAutoConnect.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import java.util.concurrent.atomic.AtomicInteger;
17-
1816
import org.reactivestreams.Subscriber;
1917

18+
import java.util.concurrent.atomic.AtomicInteger;
19+
2020
import io.reactivex.Flowable;
2121
import io.reactivex.disposables.Disposable;
2222
import io.reactivex.flowables.ConnectableFlowable;
@@ -37,9 +37,6 @@ public final class FlowableAutoConnect<T> extends Flowable<T> {
3737
public FlowableAutoConnect(ConnectableFlowable<? extends T> source,
3838
int numberOfSubscribers,
3939
Consumer<? super Disposable> connection) {
40-
if (numberOfSubscribers <= 0) {
41-
throw new IllegalArgumentException("numberOfSubscribers > 0 required");
42-
}
4340
this.source = source;
4441
this.numberOfSubscribers = numberOfSubscribers;
4542
this.connection = connection;

src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,11 @@ public final class FlowableBuffer<T, C extends Collection<? super T>> extends Ab
3333

3434
final Callable<C> bufferSupplier;
3535

36-
public FlowableBuffer(Publisher<T> source, int size, Callable<C> bufferSupplier) {
37-
this(source, size, size, bufferSupplier);
38-
}
39-
4036
public FlowableBuffer(Publisher<T> source, int size, int skip, Callable<C> bufferSupplier) {
4137
super(source);
42-
if (size <= 0) {
43-
throw new IllegalArgumentException("size > 0 required but it was " + size);
44-
}
45-
46-
if (skip <= 0) {
47-
throw new IllegalArgumentException("skip > 0 required but it was " + size);
48-
}
49-
5038
this.size = size;
5139
this.skip = skip;
52-
this.bufferSupplier = ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier");
40+
this.bufferSupplier = bufferSupplier;
5341
}
5442

5543
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.Flowable;
2121
import io.reactivex.disposables.Disposable;
2222
import io.reactivex.exceptions.Exceptions;
23+
import io.reactivex.internal.functions.ObjectHelper;
2324
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2425
import io.reactivex.internal.util.*;
2526
import io.reactivex.plugins.RxJavaPlugins;
@@ -53,9 +54,7 @@ public static <T> Flowable<T> from(Flowable<T> source) {
5354
* @return the CachedObservable instance
5455
*/
5556
public static <T> Flowable<T> from(Flowable<T> source, int capacityHint) {
56-
if (capacityHint < 1) {
57-
throw new IllegalArgumentException("capacityHint > 0 required");
58-
}
57+
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
5958
CacheState<T> state = new CacheState<T>(source, capacityHint);
6059
return RxJavaPlugins.onAssembly(new FlowableCache<T>(source, state));
6160
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,27 +49,19 @@ public final class FlowableCombineLatest<T, R>
4949
public FlowableCombineLatest(Publisher<? extends T>[] array,
5050
Function<? super Object[], ? extends R> combiner,
5151
int bufferSize, boolean delayErrors) {
52-
if (bufferSize <= 0) {
53-
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
54-
}
55-
56-
this.array = ObjectHelper.requireNonNull(array, "array");
52+
this.array = array;
5753
this.iterable = null;
58-
this.combiner = ObjectHelper.requireNonNull(combiner, "combiner");
54+
this.combiner = combiner;
5955
this.bufferSize = bufferSize;
6056
this.delayErrors = delayErrors;
6157
}
6258

6359
public FlowableCombineLatest(Iterable<? extends Publisher<? extends T>> iterable,
6460
Function<? super Object[], ? extends R> combiner,
6561
int bufferSize, boolean delayErrors) {
66-
if (bufferSize <= 0) {
67-
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
68-
}
69-
7062
this.array = null;
71-
this.iterable = ObjectHelper.requireNonNull(iterable, "iterable");
72-
this.combiner = ObjectHelper.requireNonNull(combiner, "combiner");
63+
this.iterable = iterable;
64+
this.combiner = combiner;
7365
this.bufferSize = bufferSize;
7466
this.delayErrors = delayErrors;
7567
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,9 @@ public FlowableConcatMap(Publisher<T> source,
3838
Function<? super T, ? extends Publisher<? extends R>> mapper,
3939
int prefetch, ErrorMode errorMode) {
4040
super(source);
41-
if (prefetch <= 0) {
42-
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
43-
}
44-
this.mapper = ObjectHelper.requireNonNull(mapper, "mapper");
41+
this.mapper = mapper;
4542
this.prefetch = prefetch;
46-
this.errorMode = ObjectHelper.requireNonNull(errorMode, "errorMode");
43+
this.errorMode = errorMode;
4744
}
4845

4946
public static <T, R> Subscriber<T> subscribe(Subscriber<? super R> s, Function<? super T, ? extends Publisher<? extends R>> mapper,

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@ public final class FlowableFlattenIterable<T, R> extends AbstractFlowableWithUps
3737
public FlowableFlattenIterable(Publisher<T> source,
3838
Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) {
3939
super(source);
40-
if (prefetch <= 0) {
41-
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
42-
}
43-
this.mapper = ObjectHelper.requireNonNull(mapper, "mapper");
40+
this.mapper = mapper;
4441
this.prefetch = prefetch;
4542
}
4643

src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ public FlowableObserveOn(
3939
boolean delayError,
4040
int prefetch) {
4141
super(source);
42-
if (prefetch <= 0) {
43-
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
44-
}
45-
this.scheduler = ObjectHelper.requireNonNull(scheduler, "scheduler");
42+
this.scheduler = scheduler;
4643
this.delayError = delayError;
4744
this.prefetch = prefetch;
4845
}

src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.reactivex.*;
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.internal.functions.ObjectHelper;
2122
import io.reactivex.internal.disposables.SequentialDisposable;
2223
import io.reactivex.internal.util.*;
2324
import io.reactivex.plugins.RxJavaPlugins;
@@ -52,9 +53,7 @@ public static <T> Observable<T> from(Observable<T> source) {
5253
* @return the CachedObservable instance
5354
*/
5455
public static <T> Observable<T> from(Observable<T> source, int capacityHint) {
55-
if (capacityHint < 1) {
56-
throw new IllegalArgumentException("capacityHint > 0 required");
57-
}
56+
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
5857
CacheState<T> state = new CacheState<T>(source, capacityHint);
5958
return RxJavaPlugins.onAssembly(new ObservableCache<T>(source, state));
6059
}

0 commit comments

Comments
 (0)