Skip to content

Commit 98d0b7f

Browse files
authored
2.x: coverage, fixes, cleanup 10/21-1 (#4742)
1 parent 14a954c commit 98d0b7f

20 files changed

+927
-671
lines changed

src/main/java/io/reactivex/internal/functions/Functions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.*;
1616
import java.util.concurrent.*;
1717

18+
import org.reactivestreams.Subscription;
19+
1820
import io.reactivex.*;
1921
import io.reactivex.functions.*;
2022
import io.reactivex.plugins.RxJavaPlugins;
@@ -662,4 +664,11 @@ public boolean test(T t1, T t2) throws Exception {
662664
public static <T, K> BiPredicate<T, T> equalsPredicate(Function<? super T, K> keySelector) {
663665
return new KeyedEqualsPredicate<T, K>(keySelector);
664666
}
667+
668+
public static final Consumer<Subscription> REQUEST_MAX = new Consumer<Subscription>() {
669+
@Override
670+
public void accept(Subscription t) throws Exception {
671+
t.request(Long.MAX_VALUE);
672+
}
673+
};
665674
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java

Lines changed: 8 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717

1818
import org.reactivestreams.*;
1919

20-
import io.reactivex.exceptions.Exceptions;
2120
import io.reactivex.functions.*;
2221
import io.reactivex.internal.functions.Functions;
2322
import io.reactivex.internal.subscribers.*;
2423
import io.reactivex.internal.util.*;
25-
import io.reactivex.plugins.RxJavaPlugins;
26-
import io.reactivex.subscribers.DefaultSubscriber;
2724

2825
/**
2926
* Utility methods to consume a Publisher in a blocking manner with callbacks or Subscriber.
@@ -65,17 +62,14 @@ public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T>
6562
if (bs.isCancelled()) {
6663
break;
6764
}
68-
if (o == BlockingSubscriber.TERMINATED) {
69-
break;
70-
}
71-
if (NotificationLite.acceptFull(v, subscriber)) {
65+
if (o == BlockingSubscriber.TERMINATED
66+
|| NotificationLite.acceptFull(v, subscriber)) {
7267
break;
7368
}
7469
}
7570
} catch (InterruptedException e) {
76-
subscriber.onError(e);
77-
} finally {
7871
bs.cancel();
72+
subscriber.onError(e);
7973
}
8074
}
8175

@@ -85,31 +79,14 @@ public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T>
8579
* @param <T> the value type
8680
*/
8781
public static <T> void subscribe(Publisher<? extends T> o) {
88-
final CountDownLatch cdl = new CountDownLatch(1);
89-
final Throwable[] error = { null };
82+
BlockingIgnoringReceiver callback = new BlockingIgnoringReceiver();
9083
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(Functions.emptyConsumer(),
91-
new Consumer<Throwable>() {
92-
@Override
93-
public void accept(Throwable e) {
94-
error[0] = e;
95-
cdl.countDown();
96-
}
97-
}, new Action() {
98-
@Override
99-
public void run() {
100-
cdl.countDown();
101-
}
102-
}, new Consumer<Subscription>() {
103-
@Override
104-
public void accept(Subscription s) {
105-
s.request(Long.MAX_VALUE);
106-
}
107-
});
84+
callback, callback, Functions.REQUEST_MAX);
10885

10986
o.subscribe(ls);
11087

111-
BlockingHelper.awaitForComplete(cdl, ls);
112-
Throwable e = error[0];
88+
BlockingHelper.awaitForComplete(callback, ls);
89+
Throwable e = callback.error;
11390
if (e != null) {
11491
throw ExceptionHelper.wrapOrThrow(e);
11592
}
@@ -125,50 +102,6 @@ public void accept(Subscription s) {
125102
*/
126103
public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? super T> onNext,
127104
final Consumer<? super Throwable> onError, final Action onComplete) {
128-
subscribe(o, new DefaultSubscriber<T>() {
129-
boolean done;
130-
@Override
131-
public void onNext(T t) {
132-
if (done) {
133-
return;
134-
}
135-
try {
136-
onNext.accept(t);
137-
} catch (Throwable ex) {
138-
Exceptions.throwIfFatal(ex);
139-
cancel();
140-
onError(ex);
141-
}
142-
}
143-
144-
@Override
145-
public void onError(Throwable e) {
146-
if (done) {
147-
RxJavaPlugins.onError(e);
148-
return;
149-
}
150-
done = true;
151-
try {
152-
onError.accept(e);
153-
} catch (Throwable ex) {
154-
Exceptions.throwIfFatal(ex);
155-
RxJavaPlugins.onError(ex);
156-
}
157-
}
158-
159-
@Override
160-
public void onComplete() {
161-
if (done) {
162-
return;
163-
}
164-
done = true;
165-
try {
166-
onComplete.run();
167-
} catch (Throwable ex) {
168-
Exceptions.throwIfFatal(ex);
169-
RxJavaPlugins.onError(ex);
170-
}
171-
}
172-
});
105+
subscribe(o, new LambdaSubscriber<T>(onNext, onError, onComplete, Functions.REQUEST_MAX));
173106
}
174107
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,10 @@ public void request(long n) {
170170
void innerSuccess(InnerObserver inner, R value) {
171171
set.delete(inner);
172172
if (get() == 0 && compareAndSet(0, 1)) {
173+
boolean d = active.decrementAndGet() == 0;
173174
if (requested.get() != 0) {
174175
actual.onNext(value);
175176

176-
boolean d = active.decrementAndGet() == 0;
177177
SpscLinkedArrayQueue<R> q = queue.get();
178178

179179
if (d && (q == null || q.isEmpty())) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,10 @@ public void request(long n) {
170170
void innerSuccess(InnerObserver inner, R value) {
171171
set.delete(inner);
172172
if (get() == 0 && compareAndSet(0, 1)) {
173+
boolean d = active.decrementAndGet() == 0;
173174
if (requested.get() != 0) {
174175
actual.onNext(value);
175176

176-
boolean d = active.decrementAndGet() == 0;
177177
SpscLinkedArrayQueue<R> q = queue.get();
178178

179179
if (d && (q == null || q.isEmpty())) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java

Lines changed: 47 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import java.util.concurrent.atomic.*;
16+
import java.util.concurrent.atomic.AtomicLong;
1717

1818
import org.reactivestreams.*;
1919

20-
import io.reactivex.*;
20+
import io.reactivex.Notification;
2121
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2222
import io.reactivex.internal.util.BackpressureHelper;
23+
import io.reactivex.plugins.RxJavaPlugins;
2324

2425
public final class FlowableMaterialize<T> extends AbstractFlowableWithUpstream<T, Notification<T>> {
2526

@@ -32,24 +33,20 @@ protected void subscribeActual(Subscriber<? super Notification<T>> s) {
3233
source.subscribe(new MaterializeSubscriber<T>(s));
3334
}
3435

35-
// FIXME needs post-complete drain management
3636
static final class MaterializeSubscriber<T> extends AtomicLong implements Subscriber<T>, Subscription {
3737

3838
private static final long serialVersionUID = -3740826063558713822L;
39+
3940
final Subscriber<? super Notification<T>> actual;
4041

4142
Subscription s;
4243

43-
final AtomicInteger state = new AtomicInteger();
44-
4544
Notification<T> value;
4645

47-
volatile boolean done;
46+
long produced;
4847

49-
static final int NO_REQUEST_NO_VALUE = 0;
50-
static final int NO_REQUEST_HAS_VALUE = 1;
51-
static final int HAS_REQUEST_NO_VALUE = 2;
52-
static final int HAS_REQUEST_HAS_VALUE = 3;
48+
static final long COMPLETE_MASK = Long.MIN_VALUE;
49+
static final long REQUEST_MASK = Long.MAX_VALUE;
5350

5451
MaterializeSubscriber(Subscriber<? super Notification<T>> actual) {
5552
this.actual = actual;
@@ -65,92 +62,70 @@ public void onSubscribe(Subscription s) {
6562

6663
@Override
6764
public void onNext(T t) {
65+
produced++;
6866
actual.onNext(Notification.createOnNext(t));
69-
70-
if (get() != Long.MAX_VALUE) {
71-
decrementAndGet();
72-
}
73-
}
74-
75-
void tryEmit(Notification<T> v) {
76-
if (get() != 0L) {
77-
state.lazySet(HAS_REQUEST_HAS_VALUE);
78-
actual.onNext(v);
79-
actual.onComplete();
80-
} else {
81-
for (;;) {
82-
int s = state.get();
83-
if (s == HAS_REQUEST_NO_VALUE) {
84-
if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) {
85-
actual.onNext(v);
86-
actual.onComplete();
87-
return;
88-
}
89-
} else
90-
if (s == NO_REQUEST_HAS_VALUE) {
91-
return;
92-
} else
93-
if (s == HAS_REQUEST_HAS_VALUE) {
94-
value = null;
95-
return;
96-
} else {
97-
value = v;
98-
done = true;
99-
if (state.compareAndSet(s, NO_REQUEST_HAS_VALUE)) {
100-
return;
101-
}
102-
}
103-
}
104-
}
10567
}
10668

10769
@Override
10870
public void onError(Throwable t) {
109-
Notification<T> v = Notification.createOnError(t);
110-
111-
tryEmit(v);
71+
complete(Notification.<T>createOnError(t));
11272
}
11373

11474
@Override
11575
public void onComplete() {
116-
Notification<T> v = Notification.createOnComplete();
76+
complete(Notification.<T>createOnComplete());
77+
}
11778

118-
tryEmit(v);
79+
void complete(Notification<T> n) {
80+
long p = produced;
81+
if (p != 0) {
82+
BackpressureHelper.produced(this, p);
83+
}
84+
85+
for (;;) {
86+
long r = get();
87+
if ((r & COMPLETE_MASK) != 0) {
88+
if (n.isOnError()) {
89+
RxJavaPlugins.onError(n.getError());
90+
}
91+
return;
92+
}
93+
if ((r & REQUEST_MASK) != 0) {
94+
lazySet(COMPLETE_MASK + 1);
95+
actual.onNext(n);
96+
actual.onComplete();
97+
return;
98+
}
99+
value = n;
100+
if (compareAndSet(0, COMPLETE_MASK)) {
101+
return;
102+
}
103+
}
119104
}
120105

121106
@Override
122107
public void request(long n) {
123-
if (!SubscriptionHelper.validate(n)) {
124-
return;
125-
}
126-
BackpressureHelper.add(this, n);
127-
if (done) {
108+
if (SubscriptionHelper.validate(n)) {
128109
for (;;) {
129-
int s = state.get();
130-
if (s == NO_REQUEST_HAS_VALUE) {
131-
if (state.compareAndSet(s, HAS_REQUEST_HAS_VALUE)) {
132-
Notification<T> v = value;
133-
value = null;
134-
actual.onNext(v);
110+
long r = get();
111+
if ((r & COMPLETE_MASK) != 0) {
112+
if (compareAndSet(COMPLETE_MASK, COMPLETE_MASK + 1)) {
113+
actual.onNext(value);
135114
actual.onComplete();
136-
return;
137115
}
138-
} else
139-
if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) {
140-
return;
141-
} else
142-
if (state.compareAndSet(s, HAS_REQUEST_NO_VALUE)) {
143-
return;
116+
break;
117+
}
118+
long u = BackpressureHelper.addCap(r, n);
119+
if (compareAndSet(r, u)) {
120+
s.request(n);
121+
break;
144122
}
145123
}
146-
} else {
147-
s.request(n);
148124
}
149125
}
150126

151127
@Override
152128
public void cancel() {
153-
state.lazySet(HAS_REQUEST_HAS_VALUE);
154129
s.cancel();
155130
}
156131
}

0 commit comments

Comments
 (0)