Skip to content

Commit 81d281f

Browse files
authored
2.x: coverage, fixes, enhancements, cleanup 10/18-1 (#4723)
1 parent 19d83c1 commit 81d281f

36 files changed

+1783
-295
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7300,32 +7300,6 @@ public final <U, V> Observable<V> flatMapIterable(final Function<? super T, ? ex
73007300
return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), resultSelector, false, bufferSize(), bufferSize());
73017301
}
73027302

7303-
/**
7304-
* Returns an Observable that merges each item emitted by the source ObservableSource with the values in an
7305-
* Iterable corresponding to that item that is generated by a selector.
7306-
* <p>
7307-
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.png" alt="">
7308-
* <dl>
7309-
* <dt><b>Scheduler:</b></dt>
7310-
* <dd>{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
7311-
* </dl>
7312-
*
7313-
* @param <U>
7314-
* the type of item emitted by the resulting ObservableSource
7315-
* @param mapper
7316-
* a function that returns an Iterable sequence of values for when given an item emitted by the
7317-
* source ObservableSource
7318-
* @param bufferSize
7319-
* the number of elements to prefetch from the current Observable
7320-
* @return an Observable that emits the results of merging the items emitted by the source ObservableSource with
7321-
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
7322-
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
7323-
*/
7324-
@SchedulerSupport(SchedulerSupport.NONE)
7325-
public final <U> Observable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper, int bufferSize) {
7326-
return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), false, bufferSize);
7327-
}
7328-
73297303
/**
73307304
* Maps each element of the upstream Observable into MaybeSources, subscribes to them and
73317305
* waits until the upstream and all MaybeSources complete.

src/main/java/io/reactivex/flowables/ConnectableFlowable.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.functions.Consumer;
2121
import io.reactivex.internal.functions.Functions;
2222
import io.reactivex.internal.operators.flowable.*;
23+
import io.reactivex.internal.util.ConnectConsumer;
2324
import io.reactivex.plugins.RxJavaPlugins;
2425

2526
/**
@@ -58,14 +59,9 @@ public abstract class ConnectableFlowable<T> extends Flowable<T> {
5859
* @see <a href="http://reactivex.io/documentation/operators/connect.html">ReactiveX documentation: Connect</a>
5960
*/
6061
public final Disposable connect() {
61-
final Disposable[] connection = new Disposable[1];
62-
connect(new Consumer<Disposable>() {
63-
@Override
64-
public void accept(Disposable d) {
65-
connection[0] = d;
66-
}
67-
});
68-
return connection[0];
62+
ConnectConsumer cc = new ConnectConsumer();
63+
connect(cc);
64+
return cc.disposable;
6965
}
7066

7167
/**

src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,12 @@ public void onNext(T t) {
8686
}
8787

8888
DebounceEmitter<T> de = new DebounceEmitter<T>(t, idx, this);
89-
if (!timer.compareAndSet(d, de)) {
90-
return;
91-
}
89+
if (timer.compareAndSet(d, de)) {
90+
d = worker.schedule(de, timeout, unit);
9291

93-
d = worker.schedule(de, timeout, unit);
92+
de.setResource(d);
93+
}
9494

95-
de.setResource(d);
9695
}
9796

9897
@Override
@@ -117,7 +116,9 @@ public void onComplete() {
117116
if (d != DisposableHelper.DISPOSED) {
118117
@SuppressWarnings("unchecked")
119118
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
120-
de.emit();
119+
if (de != null) {
120+
de.run();
121+
}
121122
DisposableHelper.dispose(timer);
122123
worker.dispose();
123124
actual.onComplete();
@@ -162,10 +163,6 @@ static final class DebounceEmitter<T> extends AtomicReference<Disposable> implem
162163

163164
@Override
164165
public void run() {
165-
emit();
166-
}
167-
168-
void emit() {
169166
if (once.compareAndSet(false, true)) {
170167
parent.emit(idx, value, this);
171168
}

src/main/java/io/reactivex/internal/operators/observable/ObservableJoin.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,12 @@ static final class GroupJoinDisposable<TLeft, TRight, TLeftEnd, TRightEnd, R>
129129

130130
@Override
131131
public void dispose() {
132-
if (cancelled) {
133-
return;
134-
}
135-
cancelled = true;
136-
cancelAll();
137-
if (getAndIncrement() == 0) {
138-
queue.clear();
132+
if (!cancelled) {
133+
cancelled = true;
134+
cancelAll();
135+
if (getAndIncrement() == 0) {
136+
queue.clear();
137+
}
139138
}
140139
}
141140

@@ -303,8 +302,7 @@ else if (mode == LEFT_CLOSE) {
303302

304303
lefts.remove(end.index);
305304
disposables.remove(end);
306-
}
307-
else if (mode == RIGHT_CLOSE) {
305+
} else {
308306
LeftRightEndObserver end = (LeftRightEndObserver)val;
309307

310308
rights.remove(end.index);

src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java

Lines changed: 127 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import java.util.concurrent.atomic.AtomicInteger;
17-
1816
import io.reactivex.*;
1917
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.exceptions.MissingBackpressureException;
18+
import io.reactivex.exceptions.Exceptions;
2119
import io.reactivex.internal.disposables.DisposableHelper;
20+
import io.reactivex.internal.fuseable.*;
21+
import io.reactivex.internal.observers.BasicIntQueueDisposable;
2222
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2323
import io.reactivex.internal.schedulers.TrampolineScheduler;
2424
import io.reactivex.plugins.RxJavaPlugins;
@@ -45,24 +45,16 @@ protected void subscribeActual(Observer<? super T> observer) {
4545
}
4646
}
4747

48-
/**
49-
* Pads the base atomic integer used for wip counting.
50-
*/
51-
static class Padding0 extends AtomicInteger {
52-
53-
private static final long serialVersionUID = 3172843496016154809L;
54-
55-
volatile long p01, p02, p03, p04, p05, p06, p07;
56-
volatile long p08, p09, p0A, p0B, p0C, p0D, p0E, p0F;
57-
}
58-
59-
static final class ObserveOnObserver<T> extends Padding0 implements Observer<T>, Disposable, Runnable {
48+
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
49+
implements Observer<T>, Runnable {
6050

6151
private static final long serialVersionUID = 6576896619930983584L;
6252
final Observer<? super T> actual;
6353
final Scheduler.Worker worker;
6454
final boolean delayError;
65-
final SpscLinkedArrayQueue<T> queue;
55+
final int bufferSize;
56+
57+
SimpleQueue<T> queue;
6658

6759
Disposable s;
6860

@@ -71,17 +63,45 @@ static final class ObserveOnObserver<T> extends Padding0 implements Observer<T>,
7163

7264
volatile boolean cancelled;
7365

66+
int sourceMode;
67+
68+
boolean outputFused;
69+
7470
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
7571
this.actual = actual;
7672
this.worker = worker;
7773
this.delayError = delayError;
78-
this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
74+
this.bufferSize = bufferSize;
7975
}
8076

8177
@Override
8278
public void onSubscribe(Disposable s) {
8379
if (DisposableHelper.validate(this.s, s)) {
8480
this.s = s;
81+
if (s instanceof QueueDisposable) {
82+
@SuppressWarnings("unchecked")
83+
QueueDisposable<T> qd = (QueueDisposable<T>) s;
84+
85+
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
86+
87+
if (m == QueueDisposable.SYNC) {
88+
sourceMode = m;
89+
queue = qd;
90+
done = true;
91+
actual.onSubscribe(this);
92+
schedule();
93+
return;
94+
}
95+
if (m == QueueDisposable.ASYNC) {
96+
sourceMode = m;
97+
queue = qd;
98+
actual.onSubscribe(this);
99+
return;
100+
}
101+
}
102+
103+
queue = new SpscLinkedArrayQueue<T>(bufferSize);
104+
85105
actual.onSubscribe(this);
86106
}
87107
}
@@ -92,10 +112,8 @@ public void onNext(T t) {
92112
return;
93113
}
94114

95-
if (!queue.offer(t)) {
96-
s.dispose();
97-
onError(new MissingBackpressureException("Queue full?!"));
98-
return;
115+
if (sourceMode != QueueDisposable.ASYNC) {
116+
queue.offer(t);
99117
}
100118
schedule();
101119
}
@@ -126,6 +144,9 @@ public void dispose() {
126144
cancelled = true;
127145
s.dispose();
128146
worker.dispose();
147+
if (getAndIncrement() == 0) {
148+
queue.clear();
149+
}
129150
}
130151
}
131152

@@ -140,11 +161,10 @@ void schedule() {
140161
}
141162
}
142163

143-
@Override
144-
public void run() {
164+
void drainNormal() {
145165
int missed = 1;
146166

147-
final SpscLinkedArrayQueue<T> q = queue;
167+
final SimpleQueue<T> q = queue;
148168
final Observer<? super T> a = actual;
149169

150170
for (;;) {
@@ -154,7 +174,17 @@ public void run() {
154174

155175
for (;;) {
156176
boolean d = done;
157-
T v = q.poll();
177+
T v;
178+
179+
try {
180+
v = q.poll();
181+
} catch (Throwable ex) {
182+
Exceptions.throwIfFatal(ex);
183+
s.dispose();
184+
q.clear();
185+
a.onError(ex);
186+
return;
187+
}
158188
boolean empty = v == null;
159189

160190
if (checkTerminated(d, empty, a)) {
@@ -175,10 +205,55 @@ public void run() {
175205
}
176206
}
177207

208+
void drainFused() {
209+
int missed = 1;
210+
211+
for (;;) {
212+
if (cancelled) {
213+
return;
214+
}
215+
216+
boolean d = done;
217+
Throwable ex = error;
218+
219+
if (!delayError && d && ex != null) {
220+
actual.onError(error);
221+
worker.dispose();
222+
return;
223+
}
224+
225+
actual.onNext(null);
226+
227+
if (d) {
228+
ex = error;
229+
if (ex != null) {
230+
actual.onError(ex);
231+
} else {
232+
actual.onComplete();
233+
}
234+
worker.dispose();
235+
return;
236+
}
237+
238+
missed = addAndGet(-missed);
239+
if (missed == 0) {
240+
break;
241+
}
242+
}
243+
}
244+
245+
@Override
246+
public void run() {
247+
if (outputFused) {
248+
drainFused();
249+
} else {
250+
drainNormal();
251+
}
252+
}
253+
178254
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
179255
if (cancelled) {
180-
s.dispose();
181-
worker.dispose();
256+
queue.clear();
182257
return true;
183258
}
184259
if (d) {
@@ -195,6 +270,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
195270
}
196271
} else {
197272
if (e != null) {
273+
queue.clear();
198274
a.onError(e);
199275
worker.dispose();
200276
return true;
@@ -208,5 +284,29 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
208284
}
209285
return false;
210286
}
287+
288+
@Override
289+
public int requestFusion(int mode) {
290+
if ((mode & ASYNC) != 0) {
291+
outputFused = true;
292+
return ASYNC;
293+
}
294+
return NONE;
295+
}
296+
297+
@Override
298+
public T poll() throws Exception {
299+
return queue.poll();
300+
}
301+
302+
@Override
303+
public void clear() {
304+
queue.clear();
305+
}
306+
307+
@Override
308+
public boolean isEmpty() {
309+
return queue.isEmpty();
310+
}
211311
}
212312
}

0 commit comments

Comments
 (0)