Skip to content

Commit 50598b3

Browse files
authored
2.x: cleanup and coverage 10/03 (#4661)
1 parent 9e77e32 commit 50598b3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2808
-701
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.reactivex.exceptions.Exceptions;
2424
import io.reactivex.functions.*;
2525
import io.reactivex.internal.functions.*;
26+
import io.reactivex.internal.fuseable.*;
2627
import io.reactivex.internal.observers.BlockingMultiObserver;
2728
import io.reactivex.internal.operators.flowable.*;
2829
import io.reactivex.internal.operators.maybe.*;
@@ -2866,9 +2867,13 @@ public final Completable toCompletable() {
28662867
* </dl>
28672868
* @return the new Flowable instance
28682869
*/
2870+
@SuppressWarnings("unchecked")
28692871
@BackpressureSupport(BackpressureKind.FULL)
28702872
@SchedulerSupport(SchedulerSupport.NONE)
28712873
public final Flowable<T> toFlowable() {
2874+
if (this instanceof FuseToFlowable) {
2875+
return ((FuseToFlowable<T>)this).fuseToFlowable();
2876+
}
28722877
return RxJavaPlugins.onAssembly(new MaybeToFlowable<T>(this));
28732878
}
28742879

@@ -2881,8 +2886,12 @@ public final Flowable<T> toFlowable() {
28812886
* </dl>
28822887
* @return the new Observable instance
28832888
*/
2889+
@SuppressWarnings("unchecked")
28842890
@SchedulerSupport(SchedulerSupport.NONE)
28852891
public final Observable<T> toObservable() {
2892+
if (this instanceof FuseToObservable) {
2893+
return ((FuseToObservable<T>)this).fuseToObservable();
2894+
}
28862895
return RxJavaPlugins.onAssembly(new MaybeToObservable<T>(this));
28872896
}
28882897

src/main/java/io/reactivex/internal/disposables/EmptyDisposable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ public enum EmptyDisposable implements QueueDisposable<Object> {
3030
* don't use it in tests and then signal onNext with it;
3131
* use Disposables.empty() instead.
3232
*/
33-
INSTANCE
33+
INSTANCE,
34+
/**
35+
* An empty disposable that returns false for isDisposed.
36+
*/
37+
NEVER
3438
;
3539

3640
@Override
@@ -40,7 +44,7 @@ public void dispose() {
4044

4145
@Override
4246
public boolean isDisposed() {
43-
return true;
47+
return this == INSTANCE;
4448
}
4549

4650
public static void complete(Observer<?> s) {

src/main/java/io/reactivex/internal/observers/FutureSingleObserver.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -110,33 +110,28 @@ public void onSubscribe(Disposable s) {
110110

111111
@Override
112112
public void onSuccess(T t) {
113-
if (value != null) {
114-
s.get().dispose();
115-
onError(new IndexOutOfBoundsException("More than one element received"));
113+
Disposable a = s.get();
114+
if (a == DisposableHelper.DISPOSED) {
116115
return;
117116
}
118117
value = t;
118+
s.compareAndSet(a, this);
119119
countDown();
120120
}
121121

122122
@Override
123123
public void onError(Throwable t) {
124-
if (error == null) {
124+
for (;;) {
125+
Disposable a = s.get();
126+
if (a == DisposableHelper.DISPOSED) {
127+
RxJavaPlugins.onError(t);
128+
return;
129+
}
125130
error = t;
126-
127-
for (;;) {
128-
Disposable a = s.get();
129-
if (a == this || a == DisposableHelper.DISPOSED) {
130-
RxJavaPlugins.onError(t);
131-
return;
132-
}
133-
if (s.compareAndSet(a, this)) {
134-
countDown();
135-
return;
136-
}
131+
if (s.compareAndSet(a, this)) {
132+
countDown();
133+
return;
137134
}
138-
} else {
139-
RxJavaPlugins.onError(t);
140135
}
141136
}
142137

src/main/java/io/reactivex/internal/operators/completable/CompletableNever.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ private CompletableNever() {
2424

2525
@Override
2626
protected void subscribeActual(CompletableObserver s) {
27-
s.onSubscribe(EmptyDisposable.INSTANCE);
27+
s.onSubscribe(EmptyDisposable.NEVER);
2828
}
2929

3030
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableMap.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,6 @@ public boolean tryOnNext(T t) {
115115
return false;
116116
}
117117

118-
if (sourceMode != NONE) {
119-
return actual.tryOnNext(null);
120-
}
121-
122118
U v;
123119

124120
try {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import io.reactivex.Scheduler.Worker;
2525
import io.reactivex.disposables.Disposable;
2626
import io.reactivex.exceptions.Exceptions;
27-
import io.reactivex.internal.disposables.*;
27+
import io.reactivex.internal.disposables.DisposableHelper;
2828
import io.reactivex.internal.fuseable.SimpleQueue;
2929
import io.reactivex.internal.queue.MpscLinkedQueue;
3030
import io.reactivex.internal.subscribers.QueueDrainSubscriber;
@@ -733,7 +733,7 @@ public void onNext(T t) {
733733
return;
734734
}
735735
} else {
736-
queue.offer(NotificationLite.next(t));
736+
queue.offer(t);
737737
if (!enter()) {
738738
return;
739739
}
@@ -797,6 +797,7 @@ void complete(UnicastProcessor<T> w) {
797797
}
798798
}
799799

800+
@SuppressWarnings("unchecked")
800801
void drainLoop() {
801802
final SimpleQueue<Object> q = queue;
802803
final Subscriber<? super Flowable<T>> a = actual;
@@ -854,7 +855,6 @@ void drainLoop() {
854855
}
855856

856857
if (sw) {
857-
@SuppressWarnings("unchecked")
858858
SubjectWork<T> work = (SubjectWork<T>)v;
859859

860860
if (work.open) {
@@ -889,10 +889,10 @@ public void run() {
889889
}
890890
continue;
891891
}
892-
}
893-
894-
for (UnicastProcessor<T> w : ws) {
895-
w.onNext(NotificationLite.<T>getValue(v));
892+
} else {
893+
for (UnicastProcessor<T> w : ws) {
894+
w.onNext((T)v);
895+
}
896896
}
897897
}
898898

src/main/java/io/reactivex/internal/operators/maybe/MaybeFromObservable.java

Lines changed: 0 additions & 117 deletions
This file was deleted.

0 commit comments

Comments
 (0)