Skip to content

Commit 676313a

Browse files
committed
BufferWithSize with Backpressure Support
1 parent 0af90b7 commit 676313a

File tree

2 files changed

+157
-0
lines changed

2 files changed

+157
-0
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
import java.util.Iterator;
2020
import java.util.LinkedList;
2121
import java.util.List;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
2224
import rx.Observable;
2325
import rx.Observable.Operator;
26+
import rx.Producer;
2427
import rx.Subscriber;
2528

2629
/**
@@ -51,6 +54,12 @@ public final class OperatorBufferWithSize<T> implements Operator<List<T>, T> {
5154
* into a buffer at all!
5255
*/
5356
public OperatorBufferWithSize(int count, int skip) {
57+
if (count <= 0) {
58+
throw new IllegalArgumentException("count must be greater than 0");
59+
}
60+
if (skip <= 0) {
61+
throw new IllegalArgumentException("skip must be greater than 0");
62+
}
5463
this.count = count;
5564
this.skip = skip;
5665
}
@@ -60,6 +69,21 @@ public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
6069
if (count == skip) {
6170
return new Subscriber<T>(child) {
6271
List<T> buffer;
72+
73+
@Override
74+
public void setProducer(final Producer producer) {
75+
child.setProducer(new Producer() {
76+
@Override
77+
public void request(long n) {
78+
if (n == Long.MAX_VALUE) {
79+
producer.request(Long.MAX_VALUE);
80+
} else {
81+
producer.request(n * count);
82+
}
83+
}
84+
});
85+
}
86+
6387
@Override
6488
public void onNext(T t) {
6589
if (buffer == null) {
@@ -98,6 +122,39 @@ public void onCompleted() {
98122
return new Subscriber<T>(child) {
99123
final List<List<T>> chunks = new LinkedList<List<T>>();
100124
int index;
125+
126+
@Override
127+
public void setProducer(final Producer producer) {
128+
child.setProducer(new Producer() {
129+
130+
private final AtomicBoolean firstRequest = new AtomicBoolean(false);
131+
132+
@Override
133+
public void request(long n) {
134+
if (n == Long.MAX_VALUE) {
135+
producer.request(Long.MAX_VALUE);
136+
} else {
137+
if (firstRequest.compareAndSet(false, true)) {
138+
// count = 5, skip = 2, n = 3
139+
// * * * * *
140+
// * * * * *
141+
// * * * * *
142+
// request = 5 + 2 * ( 3 - 1)
143+
producer.request(count + skip * (n - 1));
144+
} else {
145+
// count = 5, skip = 2, n = 3
146+
// (* * *) * *
147+
// ( *) * * * *
148+
// * * * * *
149+
// request = skip * n
150+
// "()" means the items already emitted before this request
151+
producer.request(skip * n);
152+
}
153+
}
154+
}
155+
});
156+
}
157+
101158
@Override
102159
public void onNext(T t) {
103160
if (index++ % skip == 0) {

rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.junit.Assert.assertFalse;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Mockito.inOrder;
@@ -28,6 +29,7 @@
2829
import java.util.List;
2930
import java.util.concurrent.CountDownLatch;
3031
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicLong;
3133

3234
import org.junit.Before;
3335
import org.junit.Test;
@@ -36,6 +38,7 @@
3638

3739
import rx.Observable;
3840
import rx.Observer;
41+
import rx.Producer;
3942
import rx.Scheduler;
4043
import rx.Subscriber;
4144
import rx.Subscription;
@@ -44,6 +47,7 @@
4447
import rx.functions.Action1;
4548
import rx.functions.Func0;
4649
import rx.functions.Func1;
50+
import rx.observers.TestSubscriber;
4751
import rx.schedulers.TestScheduler;
4852
import rx.subjects.PublishSubject;
4953

@@ -791,4 +795,100 @@ public Observable<Integer> call(Integer t1) {
791795
verify(o, never()).onCompleted();
792796
verify(o).onError(any(TestException.class));
793797
}
798+
799+
@Test
800+
public void testProducerRequestThroughBufferWithSize1() {
801+
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
802+
ts.requestMore(3);
803+
final AtomicLong requested = new AtomicLong();
804+
Observable.create(new Observable.OnSubscribe<Integer>() {
805+
806+
@Override
807+
public void call(Subscriber<? super Integer> s) {
808+
s.setProducer(new Producer() {
809+
810+
@Override
811+
public void request(long n) {
812+
requested.set(n);
813+
}
814+
815+
});
816+
}
817+
818+
}).buffer(5, 5).subscribe(ts);
819+
assertEquals(15, requested.get());
820+
821+
ts.requestMore(4);
822+
assertEquals(20, requested.get());
823+
}
824+
825+
@Test
826+
public void testProducerRequestThroughBufferWithSize2() {
827+
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
828+
final AtomicLong requested = new AtomicLong();
829+
Observable.create(new Observable.OnSubscribe<Integer>() {
830+
831+
@Override
832+
public void call(Subscriber<? super Integer> s) {
833+
s.setProducer(new Producer() {
834+
835+
@Override
836+
public void request(long n) {
837+
requested.set(n);
838+
}
839+
840+
});
841+
}
842+
843+
}).buffer(5, 5).subscribe(ts);
844+
assertEquals(Long.MAX_VALUE, requested.get());
845+
}
846+
847+
@Test
848+
public void testProducerRequestThroughBufferWithSize3() {
849+
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
850+
ts.requestMore(3);
851+
final AtomicLong requested = new AtomicLong();
852+
Observable.create(new Observable.OnSubscribe<Integer>() {
853+
854+
@Override
855+
public void call(Subscriber<? super Integer> s) {
856+
s.setProducer(new Producer() {
857+
858+
@Override
859+
public void request(long n) {
860+
requested.set(n);
861+
}
862+
863+
});
864+
}
865+
866+
}).buffer(5, 2).subscribe(ts);
867+
assertEquals(9, requested.get());
868+
ts.requestMore(3);
869+
assertEquals(6, requested.get());
870+
}
871+
872+
873+
@Test
874+
public void testProducerRequestThroughBufferWithSize4() {
875+
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
876+
final AtomicLong requested = new AtomicLong();
877+
Observable.create(new Observable.OnSubscribe<Integer>() {
878+
879+
@Override
880+
public void call(Subscriber<? super Integer> s) {
881+
s.setProducer(new Producer() {
882+
883+
@Override
884+
public void request(long n) {
885+
requested.set(n);
886+
}
887+
888+
});
889+
}
890+
891+
}).buffer(5, 2).subscribe(ts);
892+
assertEquals(Long.MAX_VALUE, requested.get());
893+
}
794894
}

0 commit comments

Comments
 (0)