Skip to content

Commit b44e8ad

Browse files
author
Aaron Tull
committed
Changed producer interface to work with long instead of int
1 parent 6b6fced commit b44e8ad

18 files changed

+92
-90
lines changed

rxjava-core/src/main/java/rx/Producer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@
1717

1818
public interface Producer {
1919

20-
public void request(int n);
20+
public void request(long n);
2121

2222
}

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public abstract class Subscriber<T> implements Observer<T>, Subscription {
3838
/* protected by `this` */
3939
private Producer p;
4040
/* protected by `this` */
41-
private int requested = Integer.MIN_VALUE; // default to not set
41+
private long requested = Long.MIN_VALUE; // default to not set
4242

4343
@Deprecated
4444
protected Subscriber(CompositeSubscription cs) {
@@ -87,7 +87,7 @@ public void onStart() {
8787
// do nothing by default
8888
}
8989

90-
public final void request(int n) {
90+
public final void request(long n) {
9191
Producer shouldRequest = null;
9292
synchronized (this) {
9393
if (p != null) {
@@ -108,14 +108,14 @@ protected Producer onSetProducer(Producer producer) {
108108

109109
public final void setProducer(Producer producer) {
110110
producer = onSetProducer(producer);
111-
int toRequest;
111+
long toRequest;
112112
boolean setProducer = false;
113113
synchronized (this) {
114114
toRequest = requested;
115115
p = producer;
116116
if (op != null) {
117117
// middle operator ... we pass thru unless a request has been made
118-
if (toRequest == Integer.MIN_VALUE) {
118+
if (toRequest == Long.MIN_VALUE) {
119119
// we pass-thru to the next producer as nothing has been requested
120120
setProducer = true;
121121
}
@@ -127,7 +127,7 @@ public final void setProducer(Producer producer) {
127127
op.setProducer(p);
128128
} else {
129129
// we execute the request with whatever has been requested (or -1)
130-
if (toRequest == Integer.MIN_VALUE) {
130+
if (toRequest == Long.MIN_VALUE) {
131131
p.request(-1);
132132
} else {
133133
p.request(toRequest);

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.Collection;
1918
import java.util.Iterator;
20-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2120

2221
import rx.Observable.OnSubscribe;
2322
import rx.Producer;
@@ -52,17 +51,17 @@ private static final class IterableProducer<T> implements Producer {
5251
private final Subscriber<? super T> o;
5352
private final Iterator<? extends T> it;
5453

55-
private volatile int requested = 0;
54+
private volatile long requested = 0;
5655
@SuppressWarnings("rawtypes")
57-
private static final AtomicIntegerFieldUpdater<IterableProducer> REQUESTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(IterableProducer.class, "requested");
56+
private static final AtomicLongFieldUpdater<IterableProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(IterableProducer.class, "requested");
5857

5958
private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
6059
this.o = o;
6160
this.it = it;
6261
}
6362

6463
@Override
65-
public void request(int n) {
64+
public void request(long n) {
6665
if (n < 0) {
6766
// fast-path without backpressure
6867
while (it.hasNext()) {
@@ -74,15 +73,15 @@ public void request(int n) {
7473
o.onCompleted();
7574
} else {
7675
// backpressure is requested
77-
int _c = REQUESTED_UPDATER.getAndAdd(this, n);
76+
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
7877
if (_c == 0) {
7978
while (true) {
8079
/*
8180
* This complicated logic is done to avoid touching the volatile `requested` value
8281
* during the loop itself. If it is touched during the loop the performance is impacted significantly.
8382
*/
84-
int r = requested;
85-
int numToEmit = r;
83+
long r = requested;
84+
long numToEmit = r;
8685
while (it.hasNext() && --numToEmit >= 0) {
8786
if (o.isUnsubscribed()) {
8887
return;

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
18+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
1919

2020
import rx.Observable.OnSubscribe;
2121
import rx.Producer;
@@ -43,9 +43,9 @@ private static final class RangeProducer implements Producer {
4343
private final Subscriber<? super Integer> o;
4444
@SuppressWarnings("unused")
4545
// accessed by REQUESTED_UPDATER
46-
private volatile int requested;
47-
private static final AtomicIntegerFieldUpdater<RangeProducer> REQUESTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RangeProducer.class, "requested");
48-
private int index;
46+
private volatile long requested;
47+
private static final AtomicLongFieldUpdater<RangeProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(RangeProducer.class, "requested");
48+
private long index;
4949
private final int end;
5050

5151
private RangeProducer(Subscriber<? super Integer> o, int start, int end) {
@@ -55,36 +55,36 @@ private RangeProducer(Subscriber<? super Integer> o, int start, int end) {
5555
}
5656

5757
@Override
58-
public void request(int n) {
58+
public void request(long n) {
5959
if (n < 0) {
6060
// fast-path without backpressure
61-
for (int i = index; i <= end; i++) {
61+
for (long i = index; i <= end; i++) {
6262
if (o.isUnsubscribed()) {
6363
return;
6464
}
65-
o.onNext(i);
65+
o.onNext((int) i);
6666
}
6767
o.onCompleted();
6868
} else if (n > 0) {
6969
// backpressure is requested
70-
int _c = REQUESTED_UPDATER.getAndAdd(this, n);
70+
long _c = REQUESTED_UPDATER.getAndAdd(this, n);
7171
if (_c == 0) {
7272
while (true) {
7373
/*
7474
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
7575
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
7676
*/
77-
int r = requested;
78-
int idx = index;
79-
int numLeft = end - idx + 1;
80-
int e = Math.min(numLeft, r);
77+
long r = requested;
78+
long idx = index;
79+
long numLeft = end - idx + 1;
80+
long e = Math.min(numLeft, r);
8181
boolean completeOnFinish = numLeft <= r;
82-
int stopAt = e + idx;
83-
for (int i = idx; i < stopAt; i++) {
82+
long stopAt = e + idx;
83+
for (long i = idx; i < stopAt; i++) {
8484
if (o.isUnsubscribed()) {
8585
return;
8686
}
87-
o.onNext(i);
87+
o.onNext((int) i);
8888
}
8989
index = stopAt;
9090

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ public MergeProducer(MergeSubscriber<T> ms) {
386386
static final AtomicLongFieldUpdater<MergeProducer> REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested");
387387

388388
@Override
389-
public void request(int n) {
389+
public void request(long n) {
390390
if (n < 0) {
391391
requested = -1;
392392
} else {

rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
9090
child.setProducer(new Producer() {
9191

9292
@Override
93-
public void request(int n) {
93+
public void request(long n) {
9494
REQUESTED.getAndAdd(ObserveOnSubscriber.this, n);
9595
schedule();
9696
}

rxjava-core/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import java.util.Queue;
1919
import java.util.concurrent.ConcurrentLinkedQueue;
20-
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.AtomicLong;
2121

2222
import rx.Observable.Operator;
2323
import rx.Producer;
@@ -32,13 +32,13 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
3232
// TODO get a different queue implementation
3333
// TODO start with size hint
3434
final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
35-
final AtomicInteger wip = new AtomicInteger();
36-
final AtomicInteger requested = new AtomicInteger();
35+
final AtomicLong wip = new AtomicLong();
36+
final AtomicLong requested = new AtomicLong();
3737

3838
child.setProducer(new Producer() {
3939

4040
@Override
41-
public void request(int n) {
41+
public void request(long n) {
4242
if (requested.getAndAdd(n) == 0) {
4343
pollQueue(wip, requested, queue, child);
4444
}
@@ -75,7 +75,7 @@ public void onNext(T t) {
7575
return parent;
7676
}
7777

78-
private void pollQueue(AtomicInteger wip, AtomicInteger requested, Queue<Object> queue, Subscriber<? super T> child) {
78+
private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue, Subscriber<? super T> child) {
7979
// TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
8080
if (requested.get() > 0) {
8181
// only one draining at a time

rxjava-core/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicInteger;
18+
import java.util.concurrent.atomic.AtomicLong;
1919

2020
import rx.Observable.Operator;
2121
import rx.Producer;
@@ -25,12 +25,12 @@ public class OperatorOnBackpressureDrop<T> implements Operator<T, T> {
2525

2626
@Override
2727
public Subscriber<? super T> call(final Subscriber<? super T> child) {
28-
final AtomicInteger requested = new AtomicInteger();
28+
final AtomicLong requested = new AtomicLong();
2929

3030
child.setProducer(new Producer() {
3131

3232
@Override
33-
public void request(int n) {
33+
public void request(long n) {
3434
requested.getAndAdd(n);
3535
}
3636

rxjava-core/src/main/java/rx/internal/operators/OperatorSkip.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ protected Producer onSetProducer(final Producer producer) {
6666
return new Producer() {
6767

6868
@Override
69-
public void request(int n) {
69+
public void request(long n) {
7070
// add the skip num to the requested amount, since we'll skip everything and then emit to the buffer downstream
7171
if (n > 0) {
7272
producer.request(n + (toSkip - skipped));

rxjava-core/src/main/java/rx/internal/operators/OperatorSubscribeOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ protected Producer onSetProducer(final Producer producer) {
8181
return new Producer() {
8282

8383
@Override
84-
public void request(final int n) {
84+
public void request(final long n) {
8585
if (Thread.currentThread() == t) {
8686
// don't schedule if we're already on the thread (primarily for first setProducer call)
8787
// see unit test 'testSetProducerSynchronousRequest' for more context on this

0 commit comments

Comments
 (0)