Skip to content

Commit 45cc53d

Browse files
authored
2.x: Test cleanup (#6119)
* 2.x: Test error printout, local naming, mocking cleanup * Fix additional local variable names * Fix Observers with wrong variable names * Fix nit * More time to MulticastProcessorRefCountedTckTest
1 parent a20f993 commit 45cc53d

File tree

384 files changed

+8395
-7778
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

384 files changed

+8395
-7778
lines changed

src/jmh/java/io/reactivex/InputWithIncrementingInteger.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void subscribe(Subscriber<? super Integer> s) {
9494
}
9595

9696
public Iterable<Integer> iterable;
97-
public Flowable<Integer> observable;
97+
public Flowable<Integer> flowable;
9898
public Flowable<Integer> firehose;
9999
public Blackhole bh;
100100

@@ -104,7 +104,7 @@ public void subscribe(Subscriber<? super Integer> s) {
104104
public void setup(final Blackhole bh) {
105105
this.bh = bh;
106106
final int size = getSize();
107-
observable = Flowable.range(0, size);
107+
flowable = Flowable.range(0, size);
108108

109109
firehose = Flowable.unsafeCreate(new IncrementingPublisher(size));
110110
iterable = new IncrementingIterable(size);

src/jmh/java/io/reactivex/OperatorFlatMapPerf.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public int getSize() {
4242

4343
@Benchmark
4444
public void flatMapIntPassthruSync(Input input) throws InterruptedException {
45-
input.observable.flatMap(new Function<Integer, Publisher<Integer>>() {
45+
input.flowable.flatMap(new Function<Integer, Publisher<Integer>>() {
4646
@Override
4747
public Publisher<Integer> apply(Integer v) {
4848
return Flowable.just(v);
@@ -53,7 +53,7 @@ public Publisher<Integer> apply(Integer v) {
5353
@Benchmark
5454
public void flatMapIntPassthruAsync(Input input) throws InterruptedException {
5555
PerfSubscriber latchedObserver = input.newLatchedObserver();
56-
input.observable.flatMap(new Function<Integer, Publisher<Integer>>() {
56+
input.flowable.flatMap(new Function<Integer, Publisher<Integer>>() {
5757
@Override
5858
public Publisher<Integer> apply(Integer i) {
5959
return Flowable.just(i).subscribeOn(Schedulers.computation());
@@ -71,7 +71,7 @@ public void flatMapTwoNestedSync(final Input input) throws InterruptedException
7171
Flowable.range(1, 2).flatMap(new Function<Integer, Publisher<Integer>>() {
7272
@Override
7373
public Publisher<Integer> apply(Integer i) {
74-
return input.observable;
74+
return input.flowable;
7575
}
7676
}).subscribe(input.newSubscriber());
7777
}

src/jmh/java/io/reactivex/OperatorMergePerf.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public Flowable<Integer> apply(Integer i) {
6868

6969
@Benchmark
7070
public void mergeNSyncStreamsOfN(final InputThousand input) throws InterruptedException {
71-
Flowable<Flowable<Integer>> os = input.observable.map(new Function<Integer, Flowable<Integer>>() {
71+
Flowable<Flowable<Integer>> os = input.flowable.map(new Function<Integer, Flowable<Integer>>() {
7272
@Override
7373
public Flowable<Integer> apply(Integer i) {
7474
return Flowable.range(0, input.size);
@@ -85,7 +85,7 @@ public Flowable<Integer> apply(Integer i) {
8585

8686
@Benchmark
8787
public void mergeNAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
88-
Flowable<Flowable<Integer>> os = input.observable.map(new Function<Integer, Flowable<Integer>>() {
88+
Flowable<Flowable<Integer>> os = input.flowable.map(new Function<Integer, Flowable<Integer>>() {
8989
@Override
9090
public Flowable<Integer> apply(Integer i) {
9191
return Flowable.range(0, input.size).subscribeOn(Schedulers.computation());

src/main/java/io/reactivex/Completable.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2155,20 +2155,20 @@ public final Completable hide() {
21552155
*/
21562156
@SchedulerSupport(SchedulerSupport.NONE)
21572157
public final Disposable subscribe() {
2158-
EmptyCompletableObserver s = new EmptyCompletableObserver();
2159-
subscribe(s);
2160-
return s;
2158+
EmptyCompletableObserver observer = new EmptyCompletableObserver();
2159+
subscribe(observer);
2160+
return observer;
21612161
}
21622162

21632163
@SchedulerSupport(SchedulerSupport.NONE)
21642164
@Override
2165-
public final void subscribe(CompletableObserver s) {
2166-
ObjectHelper.requireNonNull(s, "s is null");
2165+
public final void subscribe(CompletableObserver observer) {
2166+
ObjectHelper.requireNonNull(observer, "s is null");
21672167
try {
21682168

2169-
s = RxJavaPlugins.onSubscribe(this, s);
2169+
observer = RxJavaPlugins.onSubscribe(this, observer);
21702170

2171-
subscribeActual(s);
2171+
subscribeActual(observer);
21722172
} catch (NullPointerException ex) { // NOPMD
21732173
throw ex;
21742174
} catch (Throwable ex) {
@@ -2184,9 +2184,9 @@ public final void subscribe(CompletableObserver s) {
21842184
* <p>There is no need to call any of the plugin hooks on the current {@code Completable} instance or
21852185
* the {@code CompletableObserver}; all hooks and basic safeguards have been
21862186
* applied by {@link #subscribe(CompletableObserver)} before this method gets called.
2187-
* @param s the CompletableObserver instance, never null
2187+
* @param observer the CompletableObserver instance, never null
21882188
*/
2189-
protected abstract void subscribeActual(CompletableObserver s);
2189+
protected abstract void subscribeActual(CompletableObserver observer);
21902190

21912191
/**
21922192
* Subscribes a given CompletableObserver (subclass) to this Completable and returns the given
@@ -2240,9 +2240,9 @@ public final Disposable subscribe(final Action onComplete, final Consumer<? supe
22402240
ObjectHelper.requireNonNull(onError, "onError is null");
22412241
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
22422242

2243-
CallbackCompletableObserver s = new CallbackCompletableObserver(onError, onComplete);
2244-
subscribe(s);
2245-
return s;
2243+
CallbackCompletableObserver observer = new CallbackCompletableObserver(onError, onComplete);
2244+
subscribe(observer);
2245+
return observer;
22462246
}
22472247

22482248
/**
@@ -2266,9 +2266,9 @@ public final Disposable subscribe(final Action onComplete, final Consumer<? supe
22662266
public final Disposable subscribe(final Action onComplete) {
22672267
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
22682268

2269-
CallbackCompletableObserver s = new CallbackCompletableObserver(onComplete);
2270-
subscribe(s);
2271-
return s;
2269+
CallbackCompletableObserver observer = new CallbackCompletableObserver(onComplete);
2270+
subscribe(observer);
2271+
return observer;
22722272
}
22732273

22742274
/**

src/main/java/io/reactivex/FlowableOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
public interface FlowableOperator<Downstream, Upstream> {
2626
/**
2727
* Applies a function to the child Subscriber and returns a new parent Subscriber.
28-
* @param observer the child Subscriber instance
28+
* @param subscriber the child Subscriber instance
2929
* @return the parent Subscriber instance
3030
* @throws Exception on failure
3131
*/
3232
@NonNull
33-
Subscriber<? super Upstream> apply(@NonNull Subscriber<? super Downstream> observer) throws Exception;
33+
Subscriber<? super Upstream> apply(@NonNull Subscriber<? super Downstream> subscriber) throws Exception;
3434
}

src/main/java/io/reactivex/Observable.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4982,9 +4982,9 @@ public final <R> R as(@NonNull ObservableConverter<T, ? extends R> converter) {
49824982
@CheckReturnValue
49834983
@SchedulerSupport(SchedulerSupport.NONE)
49844984
public final T blockingFirst() {
4985-
BlockingFirstObserver<T> s = new BlockingFirstObserver<T>();
4986-
subscribe(s);
4987-
T v = s.blockingGet();
4985+
BlockingFirstObserver<T> observer = new BlockingFirstObserver<T>();
4986+
subscribe(observer);
4987+
T v = observer.blockingGet();
49884988
if (v != null) {
49894989
return v;
49904990
}
@@ -5010,9 +5010,9 @@ public final T blockingFirst() {
50105010
@CheckReturnValue
50115011
@SchedulerSupport(SchedulerSupport.NONE)
50125012
public final T blockingFirst(T defaultItem) {
5013-
BlockingFirstObserver<T> s = new BlockingFirstObserver<T>();
5014-
subscribe(s);
5015-
T v = s.blockingGet();
5013+
BlockingFirstObserver<T> observer = new BlockingFirstObserver<T>();
5014+
subscribe(observer);
5015+
T v = observer.blockingGet();
50165016
return v != null ? v : defaultItem;
50175017
}
50185018

@@ -5119,9 +5119,9 @@ public final Iterable<T> blockingIterable(int bufferSize) {
51195119
@CheckReturnValue
51205120
@SchedulerSupport(SchedulerSupport.NONE)
51215121
public final T blockingLast() {
5122-
BlockingLastObserver<T> s = new BlockingLastObserver<T>();
5123-
subscribe(s);
5124-
T v = s.blockingGet();
5122+
BlockingLastObserver<T> observer = new BlockingLastObserver<T>();
5123+
subscribe(observer);
5124+
T v = observer.blockingGet();
51255125
if (v != null) {
51265126
return v;
51275127
}
@@ -5151,9 +5151,9 @@ public final T blockingLast() {
51515151
@CheckReturnValue
51525152
@SchedulerSupport(SchedulerSupport.NONE)
51535153
public final T blockingLast(T defaultItem) {
5154-
BlockingLastObserver<T> s = new BlockingLastObserver<T>();
5155-
subscribe(s);
5156-
T v = s.blockingGet();
5154+
BlockingLastObserver<T> observer = new BlockingLastObserver<T>();
5155+
subscribe(observer);
5156+
T v = observer.blockingGet();
51575157
return v != null ? v : defaultItem;
51585158
}
51595159

@@ -10998,16 +10998,16 @@ public final Observable<T> retryWhen(
1099810998
* <dt><b>Scheduler:</b></dt>
1099910999
* <dd>{@code safeSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
1100011000
* </dl>
11001-
* @param s the incoming Observer instance
11001+
* @param observer the incoming Observer instance
1100211002
* @throws NullPointerException if s is null
1100311003
*/
1100411004
@SchedulerSupport(SchedulerSupport.NONE)
11005-
public final void safeSubscribe(Observer<? super T> s) {
11006-
ObjectHelper.requireNonNull(s, "s is null");
11007-
if (s instanceof SafeObserver) {
11008-
subscribe(s);
11005+
public final void safeSubscribe(Observer<? super T> observer) {
11006+
ObjectHelper.requireNonNull(observer, "s is null");
11007+
if (observer instanceof SafeObserver) {
11008+
subscribe(observer);
1100911009
} else {
11010-
subscribe(new SafeObserver<T>(s));
11010+
subscribe(new SafeObserver<T>(observer));
1101111011
}
1101211012
}
1101311013

@@ -14072,19 +14072,19 @@ public final <K, V> Single<Map<K, Collection<V>>> toMultimap(
1407214072
@CheckReturnValue
1407314073
@SchedulerSupport(SchedulerSupport.NONE)
1407414074
public final Flowable<T> toFlowable(BackpressureStrategy strategy) {
14075-
Flowable<T> o = new FlowableFromObservable<T>(this);
14075+
Flowable<T> f = new FlowableFromObservable<T>(this);
1407614076

1407714077
switch (strategy) {
1407814078
case DROP:
14079-
return o.onBackpressureDrop();
14079+
return f.onBackpressureDrop();
1408014080
case LATEST:
14081-
return o.onBackpressureLatest();
14081+
return f.onBackpressureLatest();
1408214082
case MISSING:
14083-
return o;
14083+
return f;
1408414084
case ERROR:
14085-
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<T>(o));
14085+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<T>(f));
1408614086
default:
14087-
return o.onBackpressureBuffer();
14087+
return f.onBackpressureBuffer();
1408814088
}
1408914089
}
1409014090

src/main/java/io/reactivex/Single.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3322,9 +3322,9 @@ public final Disposable subscribe() {
33223322
public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback) {
33233323
ObjectHelper.requireNonNull(onCallback, "onCallback is null");
33243324

3325-
BiConsumerSingleObserver<T> s = new BiConsumerSingleObserver<T>(onCallback);
3326-
subscribe(s);
3327-
return s;
3325+
BiConsumerSingleObserver<T> observer = new BiConsumerSingleObserver<T>(onCallback);
3326+
subscribe(observer);
3327+
return observer;
33283328
}
33293329

33303330
/**
@@ -3376,22 +3376,22 @@ public final Disposable subscribe(final Consumer<? super T> onSuccess, final Con
33763376
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
33773377
ObjectHelper.requireNonNull(onError, "onError is null");
33783378

3379-
ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
3380-
subscribe(s);
3381-
return s;
3379+
ConsumerSingleObserver<T> observer = new ConsumerSingleObserver<T>(onSuccess, onError);
3380+
subscribe(observer);
3381+
return observer;
33823382
}
33833383

33843384
@SchedulerSupport(SchedulerSupport.NONE)
33853385
@Override
3386-
public final void subscribe(SingleObserver<? super T> subscriber) {
3387-
ObjectHelper.requireNonNull(subscriber, "subscriber is null");
3386+
public final void subscribe(SingleObserver<? super T> observer) {
3387+
ObjectHelper.requireNonNull(observer, "subscriber is null");
33883388

3389-
subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
3389+
observer = RxJavaPlugins.onSubscribe(this, observer);
33903390

3391-
ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");
3391+
ObjectHelper.requireNonNull(observer, "subscriber returned by the RxJavaPlugins hook is null");
33923392

33933393
try {
3394-
subscribeActual(subscriber);
3394+
subscribeActual(observer);
33953395
} catch (NullPointerException ex) {
33963396
throw ex;
33973397
} catch (Throwable ex) {

src/main/java/io/reactivex/internal/disposables/EmptyDisposable.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,39 +48,39 @@ public boolean isDisposed() {
4848
return this == INSTANCE;
4949
}
5050

51-
public static void complete(Observer<?> s) {
52-
s.onSubscribe(INSTANCE);
53-
s.onComplete();
51+
public static void complete(Observer<?> observer) {
52+
observer.onSubscribe(INSTANCE);
53+
observer.onComplete();
5454
}
5555

56-
public static void complete(MaybeObserver<?> s) {
57-
s.onSubscribe(INSTANCE);
58-
s.onComplete();
56+
public static void complete(MaybeObserver<?> observer) {
57+
observer.onSubscribe(INSTANCE);
58+
observer.onComplete();
5959
}
6060

61-
public static void error(Throwable e, Observer<?> s) {
62-
s.onSubscribe(INSTANCE);
63-
s.onError(e);
61+
public static void error(Throwable e, Observer<?> observer) {
62+
observer.onSubscribe(INSTANCE);
63+
observer.onError(e);
6464
}
6565

66-
public static void complete(CompletableObserver s) {
67-
s.onSubscribe(INSTANCE);
68-
s.onComplete();
66+
public static void complete(CompletableObserver observer) {
67+
observer.onSubscribe(INSTANCE);
68+
observer.onComplete();
6969
}
7070

71-
public static void error(Throwable e, CompletableObserver s) {
72-
s.onSubscribe(INSTANCE);
73-
s.onError(e);
71+
public static void error(Throwable e, CompletableObserver observer) {
72+
observer.onSubscribe(INSTANCE);
73+
observer.onError(e);
7474
}
7575

76-
public static void error(Throwable e, SingleObserver<?> s) {
77-
s.onSubscribe(INSTANCE);
78-
s.onError(e);
76+
public static void error(Throwable e, SingleObserver<?> observer) {
77+
observer.onSubscribe(INSTANCE);
78+
observer.onError(e);
7979
}
8080

81-
public static void error(Throwable e, MaybeObserver<?> s) {
82-
s.onSubscribe(INSTANCE);
83-
s.onError(e);
81+
public static void error(Throwable e, MaybeObserver<?> observer) {
82+
observer.onSubscribe(INSTANCE);
83+
observer.onError(e);
8484
}
8585

8686

src/main/java/io/reactivex/internal/observers/QueueDrainObserver.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ public final boolean fastEnter() {
6262
}
6363

6464
protected final void fastPathEmit(U value, boolean delayError, Disposable dispose) {
65-
final Observer<? super V> s = actual;
65+
final Observer<? super V> observer = actual;
6666
final SimplePlainQueue<U> q = queue;
6767

6868
if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
69-
accept(s, value);
69+
accept(observer, value);
7070
if (leave(-1) == 0) {
7171
return;
7272
}
@@ -76,7 +76,7 @@ protected final void fastPathEmit(U value, boolean delayError, Disposable dispos
7676
return;
7777
}
7878
}
79-
QueueDrainHelper.drainLoop(q, s, delayError, dispose, this);
79+
QueueDrainHelper.drainLoop(q, observer, delayError, dispose, this);
8080
}
8181

8282
/**
@@ -86,12 +86,12 @@ protected final void fastPathEmit(U value, boolean delayError, Disposable dispos
8686
* @param disposable the resource to dispose if the drain terminates
8787
*/
8888
protected final void fastPathOrderedEmit(U value, boolean delayError, Disposable disposable) {
89-
final Observer<? super V> s = actual;
89+
final Observer<? super V> observer = actual;
9090
final SimplePlainQueue<U> q = queue;
9191

9292
if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
9393
if (q.isEmpty()) {
94-
accept(s, value);
94+
accept(observer, value);
9595
if (leave(-1) == 0) {
9696
return;
9797
}
@@ -104,7 +104,7 @@ protected final void fastPathOrderedEmit(U value, boolean delayError, Disposable
104104
return;
105105
}
106106
}
107-
QueueDrainHelper.drainLoop(q, s, delayError, disposable, this);
107+
QueueDrainHelper.drainLoop(q, observer, delayError, disposable, this);
108108
}
109109

110110
@Override

0 commit comments

Comments
 (0)