Skip to content

Commit 8a29a64

Browse files
Merge pull request #733 from akarnokd/BufferWithObservableBoundary
Buffer with Observable boundary.
2 parents dadf17b + 93c9fcc commit 8a29a64

File tree

3 files changed

+296
-2
lines changed

3 files changed

+296
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3200,6 +3200,35 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLates
32003200
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction));
32013201
}
32023202

3203+
/**
3204+
* Create an Observable that emits non-overlapping buffered items once the boundary observable emits an item.
3205+
* <p>
3206+
* Completion of either this or the boundary observable causes the returned observable
3207+
* to emit the latest buffer and complete.
3208+
* @param <B> the boundary value type (ignored)
3209+
* @param boundary the boundary observable
3210+
* @return an Observable that emits buffered items once the boundary observable emits an item.
3211+
* @see #buffer(rx.Observable, int)
3212+
*/
3213+
public <B> Observable<List<T>> buffer(Observable<B> boundary) {
3214+
return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary));
3215+
}
3216+
3217+
/**
3218+
* Create an Observable that emits non-overlapping buffered items once the boundary observable emits an item.
3219+
* <p>
3220+
* Completion of either this or the boundary observable causes the returned observable
3221+
* to emit the latest buffer and complete.
3222+
* @param <B> the boundary value type (ignored)
3223+
* @param boundary the boundary observable
3224+
* @param initialCapacity the initial capacity of each buffer chunk
3225+
* @return an Observable that emits buffered items once the boundary observable emits an item.
3226+
* @see #buffer(rx.Observable, int)
3227+
*/
3228+
public <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity) {
3229+
return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary, initialCapacity));
3230+
}
3231+
32033232
/**
32043233
* Creates an Observable that emits buffers of items it collects from the
32053234
* source Observable. The resulting Observable emits connected,

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.ArrayList;
1819
import java.util.List;
1920
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.atomic.AtomicBoolean;
@@ -397,4 +398,139 @@ public void unsubscribe() {
397398
}
398399
}
399400
}
401+
402+
/**
403+
* Create a buffer operator with the given observable sequence as the buffer boundary.
404+
*/
405+
public static <T, B> OnSubscribeFunc<List<T>> bufferWithBoundaryObservable(Observable<? extends T> source, Observable<B> boundary) {
406+
return new BufferWithObservableBoundary<T, B>(source, boundary, 16);
407+
}
408+
/**
409+
* Create a buffer operator with the given observable sequence as the buffer boundary and
410+
* with the given initial capacity for buffers.
411+
*/
412+
public static <T, B> OnSubscribeFunc<List<T>> bufferWithBoundaryObservable(Observable<? extends T> source, Observable<B> boundary, int initialCapacity) {
413+
if (initialCapacity <= 0) {
414+
throw new IllegalArgumentException("initialCapacity > 0 required");
415+
}
416+
return new BufferWithObservableBoundary<T, B>(source, boundary, initialCapacity);
417+
}
418+
419+
/**
420+
* Buffer until an element is emitted from a helper observable.
421+
* @param <T> the buffered value type
422+
*/
423+
private static final class BufferWithObservableBoundary<T, B> implements OnSubscribeFunc<List<T>> {
424+
final Observable<? extends T> source;
425+
final Observable<B> boundary;
426+
final int initialCapacity;
427+
428+
public BufferWithObservableBoundary(Observable<? extends T> source, Observable<B> boundary, int initialCapacity) {
429+
this.source = source;
430+
this.boundary = boundary;
431+
this.initialCapacity = initialCapacity;
432+
}
433+
434+
@Override
435+
public Subscription onSubscribe(Observer<? super List<T>> t1) {
436+
CompositeSubscription csub = new CompositeSubscription();
437+
438+
SourceObserver<T> so = new SourceObserver<T>(t1, initialCapacity, csub);
439+
csub.add(source.subscribe(so));
440+
csub.add(boundary.subscribe(new BoundaryObserver<B>(so)));
441+
442+
return csub;
443+
}
444+
/**
445+
* Observes the source.
446+
*/
447+
private static final class SourceObserver<T> implements Observer<T> {
448+
final Observer<? super List<T>> observer;
449+
/** The buffer, if null, that indicates a terminal state. */
450+
List<T> buffer;
451+
final int initialCapacity;
452+
final Object guard;
453+
final Subscription cancel;
454+
public SourceObserver(Observer<? super List<T>> observer, int initialCapacity, Subscription cancel) {
455+
this.observer = observer;
456+
this.initialCapacity = initialCapacity;
457+
this.guard = new Object();
458+
this.cancel = cancel;
459+
buffer = new ArrayList<T>(initialCapacity);
460+
}
461+
462+
@Override
463+
public void onNext(T args) {
464+
synchronized (guard) {
465+
buffer.add(args);
466+
}
467+
}
468+
469+
@Override
470+
public void onError(Throwable e) {
471+
synchronized (guard) {
472+
if (buffer == null) {
473+
return;
474+
}
475+
buffer = null;
476+
}
477+
observer.onError(e);
478+
cancel.unsubscribe();
479+
}
480+
481+
@Override
482+
public void onCompleted() {
483+
emitAndComplete();
484+
cancel.unsubscribe();
485+
}
486+
void emitAndReplace() {
487+
List<T> buf;
488+
synchronized (guard) {
489+
if (buffer == null) {
490+
return;
491+
}
492+
buf = buffer;
493+
buffer = new ArrayList<T>(initialCapacity);
494+
}
495+
observer.onNext(buf);
496+
}
497+
void emitAndComplete() {
498+
List<T> buf;
499+
synchronized (guard) {
500+
if (buffer == null) {
501+
return;
502+
}
503+
buf = buffer;
504+
buffer = null;
505+
}
506+
observer.onNext(buf);
507+
observer.onCompleted();
508+
}
509+
}
510+
/**
511+
* Observes the boundary.
512+
*/
513+
private static final class BoundaryObserver<T> implements Observer<T> {
514+
final SourceObserver so;
515+
516+
public BoundaryObserver(SourceObserver so) {
517+
this.so = so;
518+
}
519+
520+
@Override
521+
public void onNext(T args) {
522+
so.emitAndReplace();
523+
}
524+
525+
@Override
526+
public void onError(Throwable e) {
527+
so.onError(e);
528+
}
529+
530+
@Override
531+
public void onCompleted() {
532+
so.onCompleted();
533+
}
534+
}
535+
}
400536
}

rxjava-core/src/test/java/rx/operators/OperationBufferTest.java

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
import org.junit.Test;
2929
import org.mockito.InOrder;
3030
import org.mockito.Mockito;
31-
import static org.mockito.Mockito.mock;
32-
import static org.mockito.Mockito.times;
31+
import static org.mockito.Mockito.*;
3332

3433
import rx.Observable;
3534
import rx.Observer;
3635
import rx.Subscription;
3736
import rx.schedulers.TestScheduler;
37+
import rx.subjects.PublishSubject;
3838
import rx.subscriptions.Subscriptions;
3939
import rx.util.functions.Action0;
4040
import rx.util.functions.Action1;
@@ -383,4 +383,133 @@ public void testBufferStopsWhenUnsubscribed1() {
383383

384384
inOrder.verifyNoMoreInteractions();
385385
}
386+
387+
@Test
388+
public void bufferWithBONormal1() {
389+
PublishSubject<Integer> source = PublishSubject.create();
390+
PublishSubject<Integer> boundary = PublishSubject.create();
391+
392+
@SuppressWarnings("unchecked")
393+
Observer<Object> o = mock(Observer.class);
394+
InOrder inOrder = Mockito.inOrder(o);
395+
396+
source.buffer(boundary).subscribe(o);
397+
398+
source.onNext(1);
399+
source.onNext(2);
400+
source.onNext(3);
401+
402+
boundary.onNext(1);
403+
404+
inOrder.verify(o, times(1)).onNext(Arrays.asList(1, 2, 3));
405+
406+
source.onNext(4);
407+
source.onNext(5);
408+
409+
boundary.onNext(2);
410+
411+
inOrder.verify(o, times(1)).onNext(Arrays.asList(4, 5));
412+
413+
source.onNext(6);
414+
boundary.onCompleted();
415+
416+
inOrder.verify(o, times(1)).onNext(Arrays.asList(6));
417+
418+
inOrder.verify(o).onCompleted();
419+
420+
verify(o, never()).onError(any(Throwable.class));
421+
}
422+
@Test
423+
public void bufferWithBOEmptyLastViaBoundary() {
424+
PublishSubject<Integer> source = PublishSubject.create();
425+
PublishSubject<Integer> boundary = PublishSubject.create();
426+
427+
@SuppressWarnings("unchecked")
428+
Observer<Object> o = mock(Observer.class);
429+
InOrder inOrder = Mockito.inOrder(o);
430+
431+
source.buffer(boundary).subscribe(o);
432+
433+
boundary.onCompleted();
434+
435+
inOrder.verify(o, times(1)).onNext(Arrays.asList());
436+
437+
inOrder.verify(o).onCompleted();
438+
439+
verify(o, never()).onError(any(Throwable.class));
440+
}
441+
@Test
442+
public void bufferWithBOEmptyLastViaSource() {
443+
PublishSubject<Integer> source = PublishSubject.create();
444+
PublishSubject<Integer> boundary = PublishSubject.create();
445+
446+
@SuppressWarnings("unchecked")
447+
Observer<Object> o = mock(Observer.class);
448+
InOrder inOrder = Mockito.inOrder(o);
449+
450+
source.buffer(boundary).subscribe(o);
451+
452+
source.onCompleted();
453+
454+
inOrder.verify(o, times(1)).onNext(Arrays.asList());
455+
456+
inOrder.verify(o).onCompleted();
457+
458+
verify(o, never()).onError(any(Throwable.class));
459+
}
460+
@Test
461+
public void bufferWithBOEmptyLastViaBoth() {
462+
PublishSubject<Integer> source = PublishSubject.create();
463+
PublishSubject<Integer> boundary = PublishSubject.create();
464+
465+
@SuppressWarnings("unchecked")
466+
Observer<Object> o = mock(Observer.class);
467+
InOrder inOrder = Mockito.inOrder(o);
468+
469+
source.buffer(boundary).subscribe(o);
470+
471+
source.onCompleted();
472+
boundary.onCompleted();
473+
474+
inOrder.verify(o, times(1)).onNext(Arrays.asList());
475+
476+
inOrder.verify(o).onCompleted();
477+
478+
verify(o, never()).onError(any(Throwable.class));
479+
}
480+
481+
@Test
482+
public void bufferWithBOSourceThrows() {
483+
PublishSubject<Integer> source = PublishSubject.create();
484+
PublishSubject<Integer> boundary = PublishSubject.create();
485+
486+
@SuppressWarnings("unchecked")
487+
Observer<Object> o = mock(Observer.class);
488+
489+
source.buffer(boundary).subscribe(o);
490+
source.onNext(1);
491+
source.onError(new OperationReduceTest.CustomException());
492+
493+
verify(o).onError(any(OperationReduceTest.CustomException.class));
494+
verify(o, never()).onCompleted();
495+
verify(o, never()).onNext(any());
496+
}
497+
498+
@Test
499+
public void bufferWithBOBoundaryThrows() {
500+
PublishSubject<Integer> source = PublishSubject.create();
501+
PublishSubject<Integer> boundary = PublishSubject.create();
502+
503+
@SuppressWarnings("unchecked")
504+
Observer<Object> o = mock(Observer.class);
505+
506+
source.buffer(boundary).subscribe(o);
507+
508+
source.onNext(1);
509+
boundary.onError(new OperationReduceTest.CustomException());
510+
511+
verify(o).onError(any(OperationReduceTest.CustomException.class));
512+
verify(o, never()).onCompleted();
513+
verify(o, never()).onNext(any());
514+
}
386515
}

0 commit comments

Comments
 (0)