Skip to content

Commit e71d371

Browse files
authored
2.x: coverage and cleanup 10/12-1 (#4696)
1 parent 35c2951 commit e71d371

17 files changed

+1372
-269
lines changed

src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,6 @@ public void onSubscribe(Disposable d) {
104104
}
105105
return;
106106
}
107-
if (once.get() || set.isDisposed()) {
108-
return;
109-
}
110107

111108
// no need to have separate subscribers because inner is stateless
112109
c.subscribe(inner);

src/main/java/io/reactivex/internal/operators/completable/CompletableConcat.java

Lines changed: 147 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919

2020
import io.reactivex.*;
2121
import io.reactivex.disposables.Disposable;
22-
import io.reactivex.exceptions.MissingBackpressureException;
23-
import io.reactivex.internal.disposables.SequentialDisposable;
24-
import io.reactivex.internal.queue.SpscArrayQueue;
22+
import io.reactivex.exceptions.*;
23+
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.internal.fuseable.*;
25+
import io.reactivex.internal.queue.*;
2526
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2627
import io.reactivex.plugins.RxJavaPlugins;
2728

@@ -36,133 +37,218 @@ public CompletableConcat(Publisher<? extends CompletableSource> sources, int pre
3637

3738
@Override
3839
public void subscribeActual(CompletableObserver s) {
39-
CompletableConcatSubscriber parent = new CompletableConcatSubscriber(s, prefetch);
40-
sources.subscribe(parent);
40+
sources.subscribe(new CompletableConcatSubscriber(s, prefetch));
4141
}
4242

4343
static final class CompletableConcatSubscriber
4444
extends AtomicInteger
4545
implements Subscriber<CompletableSource>, Disposable {
46-
private static final long serialVersionUID = 7412667182931235013L;
46+
private static final long serialVersionUID = 9032184911934499404L;
47+
4748
final CompletableObserver actual;
49+
4850
final int prefetch;
49-
final SequentialDisposable sd;
5051

51-
final SpscArrayQueue<CompletableSource> queue;
52+
final int limit;
53+
54+
final ConcatInnerObserver inner;
55+
56+
final AtomicBoolean once;
57+
58+
int sourceFused;
59+
60+
int consumed;
61+
62+
SimpleQueue<CompletableSource> queue;
5263

5364
Subscription s;
5465

5566
volatile boolean done;
5667

57-
final AtomicBoolean once = new AtomicBoolean();
58-
59-
final ConcatInnerObserver inner;
68+
volatile boolean active;
6069

6170
CompletableConcatSubscriber(CompletableObserver actual, int prefetch) {
6271
this.actual = actual;
6372
this.prefetch = prefetch;
64-
this.queue = new SpscArrayQueue<CompletableSource>(prefetch);
65-
this.sd = new SequentialDisposable();
66-
this.inner = new ConcatInnerObserver();
73+
this.inner = new ConcatInnerObserver(this);
74+
this.once = new AtomicBoolean();
75+
this.limit = prefetch - (prefetch >> 2);
6776
}
6877

6978
@Override
7079
public void onSubscribe(Subscription s) {
7180
if (SubscriptionHelper.validate(this.s, s)) {
7281
this.s = s;
82+
83+
long r = prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch;
84+
85+
if (s instanceof QueueSubscription) {
86+
@SuppressWarnings("unchecked")
87+
QueueSubscription<CompletableSource> qs = (QueueSubscription<CompletableSource>) s;
88+
89+
int m = qs.requestFusion(QueueSubscription.ANY);
90+
91+
if (m == QueueSubscription.SYNC) {
92+
sourceFused = m;
93+
queue = qs;
94+
done = true;
95+
actual.onSubscribe(this);
96+
drain();
97+
return;
98+
}
99+
if (m == QueueSubscription.ASYNC) {
100+
sourceFused = m;
101+
queue = qs;
102+
actual.onSubscribe(this);
103+
s.request(r);
104+
return;
105+
}
106+
}
107+
108+
if (prefetch == Integer.MAX_VALUE) {
109+
queue = new SpscLinkedArrayQueue<CompletableSource>(Flowable.bufferSize());
110+
} else {
111+
queue = new SpscArrayQueue<CompletableSource>(prefetch);
112+
}
113+
73114
actual.onSubscribe(this);
74-
s.request(prefetch);
115+
116+
s.request(r);
75117
}
76118
}
77119

78120
@Override
79121
public void onNext(CompletableSource t) {
80-
if (!queue.offer(t)) {
81-
onError(new MissingBackpressureException());
82-
return;
83-
}
84-
if (getAndIncrement() == 0) {
85-
next();
122+
if (sourceFused == QueueSubscription.NONE) {
123+
if (!queue.offer(t)) {
124+
onError(new MissingBackpressureException());
125+
return;
126+
}
86127
}
128+
drain();
87129
}
88130

89131
@Override
90132
public void onError(Throwable t) {
91133
if (once.compareAndSet(false, true)) {
134+
DisposableHelper.dispose(inner);
92135
actual.onError(t);
93-
return;
136+
} else {
137+
RxJavaPlugins.onError(t);
94138
}
95-
done = true;
96-
RxJavaPlugins.onError(t);
97139
}
98140

99141
@Override
100142
public void onComplete() {
101-
if (done) {
102-
return;
103-
}
104143
done = true;
105-
if (getAndIncrement() == 0) {
106-
next();
107-
}
108-
}
109-
110-
void innerError(Throwable e) {
111-
s.cancel();
112-
onError(e);
113-
}
114-
115-
void innerComplete() {
116-
if (decrementAndGet() != 0) {
117-
next();
118-
}
119-
if (!done) {
120-
s.request(1);
121-
}
144+
drain();
122145
}
123146

124147
@Override
125148
public void dispose() {
126149
s.cancel();
127-
sd.dispose();
150+
DisposableHelper.dispose(inner);
128151
}
129152

130153
@Override
131154
public boolean isDisposed() {
132-
return sd.isDisposed();
155+
return DisposableHelper.isDisposed(inner.get());
133156
}
134157

135-
void next() {
136-
boolean d = done;
137-
CompletableSource c = queue.poll();
138-
if (c == null) {
139-
if (d) {
140-
if (once.compareAndSet(false, true)) {
141-
actual.onComplete();
142-
}
158+
void drain() {
159+
if (getAndIncrement() != 0) {
160+
return;
161+
}
162+
163+
for (;;) {
164+
if (isDisposed()) {
143165
return;
144166
}
145-
RxJavaPlugins.onError(new IllegalStateException("Queue is empty?!"));
146-
return;
167+
168+
if (!active) {
169+
170+
boolean d = done;
171+
172+
CompletableSource cs;
173+
174+
try {
175+
cs = queue.poll();
176+
} catch (Throwable ex) {
177+
Exceptions.throwIfFatal(ex);
178+
innerError(ex);
179+
return;
180+
}
181+
182+
boolean empty = cs == null;
183+
184+
if (d && empty) {
185+
if (once.compareAndSet(false, true)) {
186+
actual.onComplete();
187+
}
188+
return;
189+
}
190+
191+
if (!empty) {
192+
active = true;
193+
cs.subscribe(inner);
194+
request();
195+
}
196+
}
197+
198+
if (decrementAndGet() == 0) {
199+
break;
200+
}
201+
}
202+
}
203+
204+
void request() {
205+
if (sourceFused != QueueSubscription.SYNC) {
206+
int p = consumed + 1;
207+
if (p == limit) {
208+
consumed = 0;
209+
s.request(p);
210+
} else {
211+
consumed = p;
212+
}
213+
}
214+
}
215+
216+
void innerError(Throwable e) {
217+
if (once.compareAndSet(false, true)) {
218+
s.cancel();
219+
actual.onError(e);
220+
} else {
221+
RxJavaPlugins.onError(e);
147222
}
223+
}
148224

149-
c.subscribe(inner);
225+
void innerComplete() {
226+
active = false;
227+
drain();
150228
}
151229

152-
final class ConcatInnerObserver implements CompletableObserver {
230+
static final class ConcatInnerObserver extends AtomicReference<Disposable> implements CompletableObserver {
231+
private static final long serialVersionUID = -5454794857847146511L;
232+
233+
final CompletableConcatSubscriber parent;
234+
235+
ConcatInnerObserver(CompletableConcatSubscriber parent) {
236+
this.parent = parent;
237+
}
238+
153239
@Override
154240
public void onSubscribe(Disposable d) {
155-
sd.update(d);
241+
DisposableHelper.replace(this, d);
156242
}
157243

158244
@Override
159245
public void onError(Throwable e) {
160-
innerError(e);
246+
parent.innerError(e);
161247
}
162248

163249
@Override
164250
public void onComplete() {
165-
innerComplete();
251+
parent.innerComplete();
166252
}
167253
}
168254
}

0 commit comments

Comments
 (0)