Skip to content

Commit 1124dc7

Browse files
authored
2.x: coverage, fixes, cleanup, copy to Flowable 10/19-1 (#4730)
1 parent e28dc4b commit 1124dc7

File tree

91 files changed

+3679
-421
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+3679
-421
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7967,7 +7967,7 @@ public final Observable<T> mergeWith(ObservableSource<? extends T> other) {
79677967

79687968
/**
79697969
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
7970-
* asynchronously with a bounded buffer of {@link Flowable#bufferSize()} slots.
7970+
* asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size".
79717971
*
79727972
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
79737973
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
@@ -7977,7 +7977,9 @@ public final Observable<T> mergeWith(ObservableSource<? extends T> other) {
79777977
* <dt><b>Scheduler:</b></dt>
79787978
* <dd>You specify which {@link Scheduler} this operator will use</dd>
79797979
* </dl>
7980-
*
7980+
* <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
7981+
* on the other side of the asynchronous boundary.
7982+
*
79817983
* @param scheduler
79827984
* the {@link Scheduler} to notify {@link Observer}s on
79837985
* @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
@@ -7995,13 +7997,15 @@ public final Observable<T> observeOn(Scheduler scheduler) {
79957997

79967998
/**
79977999
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
7998-
* asynchronously with a bounded buffer and optionally delays onError notifications.
8000+
* asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size" and optionally delays onError notifications.
79998001
* <p>
80008002
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
80018003
* <dl>
80028004
* <dt><b>Scheduler:</b></dt>
80038005
* <dd>You specify which {@link Scheduler} this operator will use</dd>
80048006
* </dl>
8007+
* <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
8008+
* on the other side of the asynchronous boundary.
80058009
*
80068010
* @param scheduler
80078011
* the {@link Scheduler} to notify {@link Observer}s on
@@ -8024,13 +8028,15 @@ public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
80248028

80258029
/**
80268030
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
8027-
* asynchronously with a bounded buffer of configurable size and optionally delays onError notifications.
8031+
* asynchronously with an unbounded buffer of configurable "island size" and optionally delays onError notifications.
80288032
* <p>
80298033
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
80308034
* <dl>
80318035
* <dt><b>Scheduler:</b></dt>
80328036
* <dd>You specify which {@link Scheduler} this operator will use</dd>
80338037
* </dl>
8038+
* <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
8039+
* on the other side of the asynchronous boundary. Values below 16 are not recommended in performance sensitive scenarios.
80348040
*
80358041
* @param scheduler
80368042
* the {@link Scheduler} to notify {@link Observer}s on

src/main/java/io/reactivex/internal/observers/QueueDrainObserver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ public void drain(boolean delayError, Disposable dispose) {
121121
QueueDrainHelper.drainLoop(queue, actual, delayError, dispose, this);
122122
}
123123
}
124+
125+
@Override
126+
public void accept(Observer<? super V> a, U v) {
127+
// ignored by default
128+
}
124129
}
125130

126131
// -------------------------------------------------------------------

src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ public void onComplete() {
125125
if (!DisposableHelper.isDisposed(d)) {
126126
@SuppressWarnings("unchecked")
127127
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
128-
de.emit();
128+
if (de != null) {
129+
de.emit();
130+
}
129131
DisposableHelper.dispose(timer);
130132
worker.dispose();
131133
actual.onComplete();

src/main/java/io/reactivex/internal/operators/flowable/FlowableDematerialize.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ public void onSubscribe(Subscription s) {
5252
@Override
5353
public void onNext(Notification<T> t) {
5454
if (done) {
55+
if (t.isOnError()) {
56+
RxJavaPlugins.onError(t.getError());
57+
}
5558
return;
5659
}
5760
if (t.isOnError()) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,12 @@ public void onError(Throwable e) {
150150
@Override
151151
public void onComplete() {
152152
if (decrementAndGet() == 0) {
153-
if (delayErrors) {
154-
Throwable ex = errors.terminate();
155-
if (ex != null) {
156-
actual.onError(ex);
157-
return;
158-
}
153+
Throwable ex = errors.terminate();
154+
if (ex != null) {
155+
actual.onError(ex);
156+
} else {
157+
actual.onComplete();
159158
}
160-
actual.onComplete();
161159
} else {
162160
if (maxConcurrency != Integer.MAX_VALUE) {
163161
s.request(1);

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,12 @@ public void onError(Throwable e) {
159159
@Override
160160
public void onComplete() {
161161
if (decrementAndGet() == 0) {
162-
if (delayErrors) {
163-
Throwable ex = errors.terminate();
164-
if (ex != null) {
165-
actual.onError(ex);
166-
return;
167-
}
162+
Throwable ex = errors.terminate();
163+
if (ex != null) {
164+
actual.onError(ex);
165+
} else {
166+
actual.onComplete();
168167
}
169-
actual.onComplete();
170168
} else {
171169
if (maxConcurrency != Integer.MAX_VALUE) {
172170
s.request(1);

src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.internal.queue.SpscArrayQueue;
2525
import io.reactivex.internal.subscriptions.*;
2626
import io.reactivex.internal.util.BackpressureHelper;
27+
import io.reactivex.plugins.RxJavaPlugins;
2728

2829
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
2930
final Scheduler scheduler;
@@ -99,6 +100,9 @@ abstract static class BaseObserveOnSubscriber<T>
99100

100101
@Override
101102
public final void onNext(T t) {
103+
if (done) {
104+
return;
105+
}
102106
if (sourceMode == ASYNC) {
103107
trySchedule();
104108
return;
@@ -114,15 +118,21 @@ public final void onNext(T t) {
114118

115119
@Override
116120
public final void onError(Throwable t) {
121+
if (done) {
122+
RxJavaPlugins.onError(t);
123+
return;
124+
}
117125
error = t;
118126
done = true;
119127
trySchedule();
120128
}
121129

122130
@Override
123131
public final void onComplete() {
124-
done = true;
125-
trySchedule();
132+
if (!done) {
133+
done = true;
134+
trySchedule();
135+
}
126136
}
127137

128138
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void subscribe(Subscriber<? super T> child) {
9797
* Note: although technically correct, concurrent disconnects can cause
9898
* unexpected behavior such as child subscribers never receiving anything
9999
* (unless connected again). An alternative approach, similar to
100-
* PublishSubject would be to immediately terminate such child
100+
* PublishProcessor would be to immediately terminate such child
101101
* subscribers as well:
102102
*
103103
* Object term = r.terminalEvent;
@@ -172,7 +172,7 @@ public void connect(Consumer<? super Disposable> connection) {
172172
* Disposable as subscribe() may never return on its own.
173173
*
174174
* Note however, that asynchronously disconnecting a running source might leave
175-
* child subscribers without any terminal event; PublishSubject does not have this
175+
* child subscribers without any terminal event; PublishProcessor does not have this
176176
* issue because the cancellation was always triggered by the child subscribers
177177
* themselves.
178178
*/

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,17 +227,26 @@ public void subscribe(Subscriber<? super T> child) {
227227

228228
// create the backpressure-managing producer for this child
229229
InnerSubscription<T> inner = new InnerSubscription<T>(r, child);
230+
// the producer has been registered with the current subscriber-to-source so
231+
// at least it will receive the next terminal event
232+
// setting the producer will trigger the first request to be considered by
233+
// the subscriber-to-source.
234+
child.onSubscribe(inner);
230235
// we try to add it to the array of subscribers
231236
// if it fails, no worries because we will still have its buffer
232237
// so it is going to replay it for us
233238
r.add(inner);
239+
240+
if (inner.isDisposed()) {
241+
r.remove(inner);
242+
return;
243+
}
244+
245+
r.manageRequests();
246+
234247
// trigger the capturing of the current node and total requested
235248
r.buffer.replay(inner);
236-
// the producer has been registered with the current subscriber-to-source so
237-
// at least it will receive the next terminal event
238-
// setting the producer will trigger the first request to be considered by
239-
// the subscriber-to-source.
240-
child.onSubscribe(inner);
249+
241250
break; // NOPMD
242251
}
243252
}
@@ -315,6 +324,9 @@ public void connect(Consumer<? super Disposable> connection) {
315324
try {
316325
connection.accept(ps);
317326
} catch (Throwable ex) {
327+
if (doConnect) {
328+
ps.shouldConnect.compareAndSet(true, false);
329+
}
318330
Exceptions.throwIfFatal(ex);
319331
throw ExceptionHelper.wrapOrThrow(ex);
320332
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilPredicate.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.reactivex.exceptions.Exceptions;
1919
import io.reactivex.functions.Predicate;
2020
import io.reactivex.internal.subscriptions.SubscriptionHelper;
21+
import io.reactivex.plugins.RxJavaPlugins;
2122

2223
public final class FlowableTakeUntilPredicate<T> extends AbstractFlowableWithUpstream<T, T> {
2324
final Predicate<? super T> predicate;
@@ -75,6 +76,8 @@ public void onError(Throwable t) {
7576
if (!done) {
7677
done = true;
7778
actual.onError(t);
79+
} else {
80+
RxJavaPlugins.onError(t);
7881
}
7982
}
8083

0 commit comments

Comments
 (0)