Skip to content

Commit 82c0aec

Browse files
committed
Window with Observable boundary.
1 parent fe438ca commit 82c0aec

File tree

3 files changed

+380
-0
lines changed

3 files changed

+380
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3373,6 +3373,21 @@ public <TOpening, TClosing> Observable<Observable<T>> window(Observable<? extend
33733373
return create(OperationWindow.window(this, windowOpenings, closingSelector));
33743374
}
33753375

3376+
/**
3377+
* Create an Observable which emits non-overlapping windows of items it collects from the
3378+
* source observable where the boundary of each window is determined by the items
3379+
* emitted from the boundary observable.
3380+
* @param <U> the window element type (ignored)
3381+
* @param boundary the Observable sequence whose emitted item is used for closing
3382+
* and opening windows
3383+
* @return an Observable which emits non-overlapping windows of items it collects from the
3384+
* source observable where the boundary of each window is determined by the items
3385+
* emitted from the boundary observable
3386+
*/
3387+
public <U> Observable<Observable<T>> window(Observable<U> boundary) {
3388+
return create(OperationWindow.window(this, boundary));
3389+
}
3390+
33763391
/**
33773392
* Creates an Observable that emits windows of items it collects from the
33783393
* source Observable. The resulting Observable emits connected,

rxjava-core/src/main/java/rx/operators/OperationWindow.java

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
import rx.Scheduler;
2424
import rx.Subscription;
2525
import rx.schedulers.Schedulers;
26+
import rx.subjects.PublishSubject;
27+
import rx.subjects.Subject;
28+
import rx.subscriptions.CompositeSubscription;
29+
import rx.subscriptions.Subscriptions;
2630
import rx.util.functions.Func0;
2731
import rx.util.functions.Func1;
2832

@@ -354,4 +358,144 @@ public Observable<T> getContents() {
354358
return Observable.from(contents);
355359
}
356360
}
361+
/**
362+
* Emits windows of values of the source Observable where the window boundary is
363+
* determined by the items of the boundary Observable.
364+
*/
365+
public static <T, U> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, Observable<U> boundary) {
366+
return new WindowViaObservable<T, U>(source, boundary);
367+
}
368+
/**
369+
* Create non-overlapping windows from the source values by using another observable's
370+
* values as to when to replace a window.
371+
*/
372+
private static final class WindowViaObservable<T, U> implements OnSubscribeFunc<Observable<T>> {
373+
final Observable<? extends T> source;
374+
final Observable<U> boundary;
375+
376+
public WindowViaObservable(Observable<? extends T> source, Observable<U> boundary) {
377+
this.source = source;
378+
this.boundary = boundary;
379+
}
380+
381+
@Override
382+
public Subscription onSubscribe(Observer<? super Observable<T>> t1) {
383+
CompositeSubscription csub = new CompositeSubscription();
384+
385+
final SourceObserver<T> so = new SourceObserver<T>(t1, csub);
386+
try {
387+
t1.onNext(so.subject);
388+
} catch (Throwable t) {
389+
t1.onError(t);
390+
return Subscriptions.empty();
391+
}
392+
csub.add(source.subscribe(so));
393+
394+
if (!csub.isUnsubscribed()) {
395+
csub.add(boundary.subscribe(new BoundaryObserver<T, U>(so)));
396+
}
397+
398+
return csub;
399+
}
400+
/**
401+
* Observe the source and emit the values into the current window.
402+
*/
403+
private static final class SourceObserver<T> implements Observer<T> {
404+
final Observer<? super Observable<T>> observer;
405+
final Subscription cancel;
406+
final Object guard;
407+
Subject<T, T> subject;
408+
409+
public SourceObserver(Observer<? super Observable<T>> observer, Subscription cancel) {
410+
this.observer = observer;
411+
this.cancel = cancel;
412+
this.guard = new Object();
413+
this.subject = create();
414+
}
415+
416+
Subject<T, T> create() {
417+
return PublishSubject.create();
418+
}
419+
@Override
420+
public void onNext(T args) {
421+
synchronized (guard) {
422+
if (subject == null) {
423+
return;
424+
}
425+
subject.onNext(args);
426+
}
427+
}
428+
429+
@Override
430+
public void onError(Throwable e) {
431+
synchronized (guard) {
432+
if (subject == null) {
433+
return;
434+
}
435+
Subject<T, T> s = subject;
436+
subject = null;
437+
438+
s.onError(e);
439+
observer.onError(e);
440+
}
441+
cancel.unsubscribe();
442+
}
443+
444+
@Override
445+
public void onCompleted() {
446+
synchronized (guard) {
447+
if (subject == null) {
448+
return;
449+
}
450+
Subject<T, T> s = subject;
451+
subject = null;
452+
453+
s.onCompleted();
454+
observer.onCompleted();
455+
}
456+
cancel.unsubscribe();
457+
}
458+
public void replace() {
459+
try {
460+
synchronized (guard) {
461+
if (subject == null) {
462+
return;
463+
}
464+
Subject<T, T> s = subject;
465+
s.onCompleted();
466+
467+
subject = create();
468+
observer.onNext(subject);
469+
}
470+
} catch (Throwable t) {
471+
onError(t);
472+
}
473+
}
474+
}
475+
/**
476+
* Observe the boundary and replace the window on each item.
477+
*/
478+
private static final class BoundaryObserver<T, U> implements Observer<U> {
479+
final SourceObserver<T> so;
480+
481+
public BoundaryObserver(SourceObserver<T> so) {
482+
this.so = so;
483+
}
484+
485+
@Override
486+
public void onNext(U args) {
487+
so.replace();
488+
}
489+
490+
@Override
491+
public void onError(Throwable e) {
492+
so.onError(e);
493+
}
494+
495+
@Override
496+
public void onCompleted() {
497+
so.onCompleted();
498+
}
499+
}
500+
}
357501
}

0 commit comments

Comments
 (0)