1515 */
1616package rx .internal .operators ;
1717
18- import java .util .ArrayList ;
19- import java .util .Collections ;
20- import java .util .List ;
21- import rx .Observable ;
18+ import java .util .*;
19+
20+ import rx .*;
2221import rx .Observable .Operator ;
22+ import rx .Observable ;
2323import rx .Observer ;
24- import rx .Subscriber ;
25- import rx .functions .Func0 ;
2624import rx .observers .SerializedSubscriber ;
27- import rx .observers .Subscribers ;
2825
2926/**
3027 * Creates non-overlapping windows of items where each window is terminated by
3431 * @param <U> the boundary value type
3532 */
3633public final class OperatorWindowWithObservable <T , U > implements Operator <Observable <T >, T > {
37- final Func0 <? extends Observable <? extends U >> otherFactory ;
34+ final Observable <U > other ;
3835
39- public OperatorWindowWithObservable (Func0 <? extends Observable <? extends U >> otherFactory ) {
40- this .otherFactory = otherFactory ;
41- }
4236 public OperatorWindowWithObservable (final Observable <U > other ) {
43- this .otherFactory = new Func0 <Observable <U >>() {
44-
45- @ Override
46- public Observable <U > call () {
47- return other ;
48- }
49-
50- };
37+ this .other = other ;
5138 }
5239
5340 @ Override
5441 public Subscriber <? super T > call (Subscriber <? super Observable <T >> child ) {
5542
56- Observable <? extends U > other ;
57- try {
58- other = otherFactory .call ();
59- } catch (Throwable e ) {
60- child .onError (e );
61- return Subscribers .empty ();
62- }
63-
6443 SourceSubscriber <T > sub = new SourceSubscriber <T >(child );
6544 BoundarySubscriber <T , U > bs = new BoundarySubscriber <T , U >(child , sub );
6645
46+ child .add (sub );
47+ child .add (bs );
48+
6749 sub .replaceWindow ();
6850
6951 other .unsafeSubscribe (bs );
@@ -88,7 +70,6 @@ static final class SourceSubscriber<T> extends Subscriber<T> {
8870 List <Object > queue ;
8971
9072 public SourceSubscriber (Subscriber <? super Observable <T >> child ) {
91- super (child );
9273 this .child = new SerializedSubscriber <Observable <T >>(child );
9374 this .guard = new Object ();
9475 }
@@ -288,7 +269,6 @@ void error(Throwable e) {
288269 static final class BoundarySubscriber <T , U > extends Subscriber <U > {
289270 final SourceSubscriber <T > sub ;
290271 public BoundarySubscriber (Subscriber <?> child , SourceSubscriber <T > sub ) {
291- super (child );
292272 this .sub = sub ;
293273 }
294274
0 commit comments