Skip to content

Commit 34aaf0b

Browse files
Merge pull request #1412 from benjchristensen/backpressure-9
Backpressure & 0.20.0-RC1
2 parents b79077d + 97f73fa commit 34aaf0b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+4757
-475
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ subprojects {
106106
maxHeapSize = "512m"
107107
jvmArgs '-XX:+UnlockCommercialFeatures'
108108
jvmArgs '-XX:+FlightRecorder'
109+
jvmArgs '-XX:AutoBoxCacheMax=1000000'
109110

110111
if (project.hasProperty('jmh')) {
111112
args(jmh.split(' '))

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.19.5-SNAPSHOT
1+
version=0.20.0-RC1-SNAPSHOT

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import rx.exceptions.*;
2121
import rx.functions.*;
2222
import rx.internal.operators.*;
23+
import rx.internal.util.ScalarSynchronousObservable;
2324
import rx.observables.*;
2425
import rx.observers.SafeSubscriber;
2526
import rx.plugins.*;
@@ -1154,7 +1155,7 @@ public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
11541155
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
11551156
*/
11561157
public final static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
1157-
return create(new OnSubscribeFromIterable<T>(iterable)).subscribeOn(scheduler);
1158+
return from(iterable).subscribeOn(scheduler);
11581159
}
11591160

11601161
/**
@@ -1543,17 +1544,7 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
15431544
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#just">RxJava Wiki: just()</a>
15441545
*/
15451546
public final static <T> Observable<T> just(final T value) {
1546-
return Observable.create(new OnSubscribe<T>() {
1547-
1548-
@Override
1549-
public void call(Subscriber<? super T> s) {
1550-
if (!s.isUnsubscribed()) {
1551-
s.onNext(value);
1552-
s.onCompleted();
1553-
}
1554-
}
1555-
1556-
});
1547+
return ScalarSynchronousObservable.create(value);
15571548
}
15581549

15591550
/**
@@ -2471,6 +2462,9 @@ public final static Observable<Integer> range(int start, int count) {
24712462
if (start > Integer.MAX_VALUE - count + 1) {
24722463
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
24732464
}
2465+
if(count == 1) {
2466+
return Observable.just(start);
2467+
}
24742468
return Observable.create(new OnSubscribeRange(start, start + (count - 1)));
24752469
}
24762470

@@ -4914,6 +4908,14 @@ public final Boolean call(T t) {
49144908
}).cast(klass);
49154909
}
49164910

4911+
public final Observable<T> onBackpressureBuffer() {
4912+
return lift(new OperatorOnBackpressureBuffer<T>());
4913+
}
4914+
4915+
public final Observable<T> onBackpressureDrop() {
4916+
return lift(new OperatorOnBackpressureDrop<T>());
4917+
}
4918+
49174919
/**
49184920
* Instructs an Observable to pass control to another Observable rather than invoking
49194921
* {@link Observer#onError onError} if it encounters an error.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
public interface Producer {
19+
20+
public void request(int n);
21+
22+
}

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,27 +34,48 @@
3434
public abstract class Subscriber<T> implements Observer<T>, Subscription {
3535

3636
private final SubscriptionList cs;
37+
private final Subscriber<?> op;
38+
/* protected by `this` */
39+
private Producer p;
40+
/* protected by `this` */
41+
private int requested = Integer.MIN_VALUE; // default to not set
3742

3843
@Deprecated
3944
protected Subscriber(CompositeSubscription cs) {
45+
this.op = null;
4046
this.cs = new SubscriptionList();
4147
add(cs);
4248
}
4349

4450
protected Subscriber() {
51+
this.op = null;
4552
this.cs = new SubscriptionList();
4653
}
4754

55+
protected Subscriber(int bufferRequest) {
56+
this.op = null;
57+
this.cs = new SubscriptionList();
58+
request(bufferRequest);
59+
}
60+
4861
protected Subscriber(Subscriber<?> op) {
62+
this.op = op;
4963
this.cs = op.cs;
5064
}
5165

66+
protected Subscriber(Subscriber<?> op, int bufferRequest) {
67+
this.op = op;
68+
this.cs = op.cs;
69+
request(bufferRequest);
70+
}
71+
5272
/**
5373
* Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as
5474
* unsubscribed. If the list <em>is</em> marked as unsubscribed, {@code add} will indicate this by
5575
* explicitly unsubscribing the new {@code Subscription} as well.
5676
*
57-
* @param s the {@code Subscription} to add
77+
* @param s
78+
* the {@code Subscription} to add
5879
*/
5980
public final void add(Subscription s) {
6081
cs.add(s);
@@ -73,4 +94,51 @@ public final void unsubscribe() {
7394
public final boolean isUnsubscribed() {
7495
return cs.isUnsubscribed();
7596
}
97+
98+
public final void request(int n) {
99+
Producer shouldRequest = null;
100+
synchronized (this) {
101+
if (p != null) {
102+
shouldRequest = p;
103+
} else {
104+
requested = n;
105+
}
106+
}
107+
// after releasing lock
108+
if (shouldRequest != null) {
109+
shouldRequest.request(n);
110+
}
111+
}
112+
113+
protected Producer onSetProducer(Producer producer) {
114+
return producer;
115+
}
116+
117+
public final void setProducer(Producer producer) {
118+
producer = onSetProducer(producer);
119+
int toRequest = requested;
120+
boolean setProducer = false;
121+
synchronized (this) {
122+
p = producer;
123+
if (op != null) {
124+
// middle operator ... we pass thru unless a request has been made
125+
if (toRequest == Integer.MIN_VALUE) {
126+
// we pass-thru to the next producer as nothing has been requested
127+
setProducer = true;
128+
}
129+
130+
}
131+
}
132+
// do after releasing lock
133+
if (setProducer) {
134+
op.setProducer(p);
135+
} else {
136+
// we execute the request with whatever has been requested (or -1)
137+
if (toRequest == Integer.MIN_VALUE) {
138+
p.request(-1);
139+
} else {
140+
p.request(toRequest);
141+
}
142+
}
143+
}
76144
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.exceptions;
17+
18+
public class MissingBackpressureException extends Exception {
19+
20+
private static final long serialVersionUID = 7250870679677032194L;
21+
22+
public MissingBackpressureException() {
23+
}
24+
25+
public MissingBackpressureException(String message) {
26+
super(message);
27+
}
28+
29+
}

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.Collection;
19+
import java.util.Iterator;
20+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
21+
1822
import rx.Observable.OnSubscribe;
23+
import rx.Producer;
1924
import rx.Subscriber;
2025

2126
/**
@@ -35,17 +40,71 @@ public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
3540
}
3641

3742
@Override
38-
public void call(Subscriber<? super T> o) {
39-
for (T i : is) {
40-
if (o.isUnsubscribed()) {
41-
return;
42-
}
43-
o.onNext(i);
43+
public void call(final Subscriber<? super T> o) {
44+
if (is == null) {
45+
o.onCompleted();
46+
}
47+
final Iterator<? extends T> it = is.iterator();
48+
o.setProducer(new IterableProducer<T>(o, it));
49+
}
50+
51+
private static final class IterableProducer<T> implements Producer {
52+
private final Subscriber<? super T> o;
53+
private final Iterator<? extends T> it;
54+
55+
private volatile int requested = 0;
56+
@SuppressWarnings("rawtypes")
57+
private static final AtomicIntegerFieldUpdater<IterableProducer> REQUESTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(IterableProducer.class, "requested");
58+
59+
private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
60+
this.o = o;
61+
this.it = it;
4462
}
45-
if (o.isUnsubscribed()) {
46-
return;
63+
64+
@Override
65+
public void request(int n) {
66+
if (n < 0) {
67+
// fast-path without backpressure
68+
while (it.hasNext()) {
69+
if (o.isUnsubscribed()) {
70+
return;
71+
}
72+
o.onNext(it.next());
73+
}
74+
o.onCompleted();
75+
} else {
76+
// backpressure is requested
77+
int _c = REQUESTED_UPDATER.getAndAdd(this, n);
78+
if (_c == 0) {
79+
while (true) {
80+
/*
81+
* This complicated logic is done to avoid touching the volatile `requested` value
82+
* during the loop itself. If it is touched during the loop the performance is impacted significantly.
83+
*/
84+
int r = requested;
85+
int numToEmit = r;
86+
while (it.hasNext() && --numToEmit >= 0) {
87+
if (o.isUnsubscribed()) {
88+
return;
89+
}
90+
o.onNext(it.next());
91+
92+
}
93+
94+
if (!it.hasNext()) {
95+
o.onCompleted();
96+
return;
97+
}
98+
if (REQUESTED_UPDATER.addAndGet(this, -r) == 0) {
99+
// we're done emitting the number requested so return
100+
return;
101+
}
102+
103+
}
104+
}
105+
}
106+
47107
}
48-
o.onCompleted();
49108
}
50109

51110
}

0 commit comments

Comments
 (0)