Skip to content

Commit 1de53a2

Browse files
Merge pull request #1073 from akarnokd/OperatorBuffer
OperatorBuffer
2 parents 3904af5 + 6231f7f commit 1de53a2

File tree

7 files changed

+1143
-596
lines changed

7 files changed

+1143
-596
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import rx.observers.SafeSubscriber;
4949
import rx.operators.OnSubscribeFromIterable;
5050
import rx.operators.OnSubscribeRange;
51-
import rx.operators.OperationBuffer;
5251
import rx.operators.OperationCombineLatest;
5352
import rx.operators.OperationConcat;
5453
import rx.operators.OperationDebounce;
@@ -93,6 +92,10 @@
9392
import rx.operators.OperatorAmb;
9493
import rx.operators.OperatorAny;
9594
import rx.operators.OperatorAsObservable;
95+
import rx.operators.OperatorBufferWithSingleObservable;
96+
import rx.operators.OperatorBufferWithSize;
97+
import rx.operators.OperatorBufferWithStartEndObservable;
98+
import rx.operators.OperatorBufferWithTime;
9699
import rx.operators.OperatorCache;
97100
import rx.operators.OperatorCast;
98101
import rx.operators.OperatorDoOnEach;
@@ -2973,7 +2976,7 @@ public final Observable<T> asObservable() {
29732976
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
29742977
*/
29752978
public final <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) {
2976-
return create(OperationBuffer.buffer(this, bufferClosingSelector));
2979+
return lift(new OperatorBufferWithSingleObservable<T, TClosing>(bufferClosingSelector, 16));
29772980
}
29782981

29792982
/**
@@ -2990,7 +2993,7 @@ public final <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<?
29902993
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
29912994
*/
29922995
public final Observable<List<T>> buffer(int count) {
2993-
return create(OperationBuffer.buffer(this, count));
2996+
return lift(new OperatorBufferWithSize<T>(count, count));
29942997
}
29952998

29962999
/**
@@ -3011,7 +3014,7 @@ public final Observable<List<T>> buffer(int count) {
30113014
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
30123015
*/
30133016
public final Observable<List<T>> buffer(int count, int skip) {
3014-
return create(OperationBuffer.buffer(this, count, skip));
3017+
return lift(new OperatorBufferWithSize<T>(count, skip));
30153018
}
30163019

30173020
/**
@@ -3034,7 +3037,7 @@ public final Observable<List<T>> buffer(int count, int skip) {
30343037
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
30353038
*/
30363039
public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit) {
3037-
return create(OperationBuffer.buffer(this, timespan, timeshift, unit));
3040+
return lift(new OperatorBufferWithTime<T>(timespan, timeshift, unit, Integer.MAX_VALUE, Schedulers.computation()));
30383041
}
30393042

30403043
/**
@@ -3058,7 +3061,7 @@ public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit
30583061
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
30593062
*/
30603063
public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
3061-
return create(OperationBuffer.buffer(this, timespan, timeshift, unit, scheduler));
3064+
return lift(new OperatorBufferWithTime<T>(timespan, timeshift, unit, Integer.MAX_VALUE, scheduler));
30623065
}
30633066

30643067
/**
@@ -3079,7 +3082,7 @@ public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit
30793082
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
30803083
*/
30813084
public final Observable<List<T>> buffer(long timespan, TimeUnit unit) {
3082-
return create(OperationBuffer.buffer(this, timespan, unit));
3085+
return lift(new OperatorBufferWithTime<T>(timespan, timespan, unit, Integer.MAX_VALUE, Schedulers.computation()));
30833086
}
30843087

30853088
/**
@@ -3104,7 +3107,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit) {
31043107
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
31053108
*/
31063109
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count) {
3107-
return create(OperationBuffer.buffer(this, timespan, unit, count));
3110+
return lift(new OperatorBufferWithTime<T>(timespan, timespan, unit, count, Schedulers.computation()));
31083111
}
31093112

31103113
/**
@@ -3132,7 +3135,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
31323135
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
31333136
*/
31343137
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
3135-
return create(OperationBuffer.buffer(this, timespan, unit, count, scheduler));
3138+
return lift(new OperatorBufferWithTime<T>(timespan, timespan, unit, count, scheduler));
31363139
}
31373140

31383141
/**
@@ -3156,7 +3159,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count,
31563159
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
31573160
*/
31583161
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler) {
3159-
return create(OperationBuffer.buffer(this, timespan, unit, scheduler));
3162+
return lift(new OperatorBufferWithTime<T>(timespan, timespan, unit, Integer.MAX_VALUE, scheduler));
31603163
}
31613164

31623165
/**
@@ -3176,7 +3179,7 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler
31763179
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
31773180
*/
31783181
public final <TOpening, TClosing> Observable<List<T>> buffer(Observable<? extends TOpening> bufferOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> bufferClosingSelector) {
3179-
return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector));
3182+
return lift(new OperatorBufferWithStartEndObservable<T, TOpening, TClosing>(bufferOpenings, bufferClosingSelector));
31803183
}
31813184

31823185
/**
@@ -3198,7 +3201,7 @@ public final <TOpening, TClosing> Observable<List<T>> buffer(Observable<? extend
31983201
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-buffer">RxJava Wiki: buffer()</a>
31993202
*/
32003203
public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
3201-
return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary));
3204+
return lift(new OperatorBufferWithSingleObservable<T, B>(boundary, 16));
32023205
}
32033206

32043207
/**
@@ -3222,7 +3225,7 @@ public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
32223225
* @see #buffer(rx.Observable, int)
32233226
*/
32243227
public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity) {
3225-
return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary, initialCapacity));
3228+
return lift(new OperatorBufferWithSingleObservable<T, B>(boundary, initialCapacity));
32263229
}
32273230

32283231
/**

0 commit comments

Comments
 (0)