34
34
import rx .functions .Func2 ;
35
35
import rx .internal .operators .OnSubscribeCache ;
36
36
import rx .observers .TestObserver ;
37
+ import rx .observers .TestSubscriber ;
37
38
import rx .schedulers .Schedulers ;
38
39
import rx .subjects .AsyncSubject ;
39
40
import rx .subjects .BehaviorSubject ;
@@ -93,7 +94,8 @@ public void call(String v) {
93
94
}
94
95
assertEquals (1 , counter .get ());
95
96
}
96
- void testWithCustomSubjectAndRepeat (Subject <Integer , Integer > subject , Integer ... expected ) {
97
+
98
+ private void testWithCustomSubjectAndRepeat (Subject <Integer , Integer > subject , Integer ... expected ) {
97
99
Observable <Integer > source0 = Observable .from (1 , 2 , 3 )
98
100
.subscribeOn (Schedulers .io ())
99
101
.flatMap (new Func1 <Integer , Observable <Integer >>() {
@@ -107,62 +109,46 @@ public Integer call(Long t1) {
107
109
});
108
110
}
109
111
});
110
-
112
+
111
113
Observable <Integer > source1 = Observable .create (new OnSubscribeCache <Integer >(source0 , subject ));
112
-
114
+
113
115
Observable <Integer > source2 = source1
114
116
.repeat (4 )
115
117
.zip (Observable .timer (0 , 10 , TimeUnit .MILLISECONDS , Schedulers .newThread ()), new Func2 <Integer , Long , Integer >() {
116
118
@ Override
117
119
public Integer call (Integer t1 , Long t2 ) {
118
120
return t1 ;
119
121
}
120
-
121
- });
122
- final CountDownLatch cdl = new CountDownLatch (1 );
123
- TestObserver <Integer > test = new TestObserver <Integer >(new Observer <Integer >() {
124
- @ Override
125
- public void onNext (Integer t ) {
126
- }
127
122
128
- @ Override
129
- public void onError (Throwable e ) {
130
- cdl .countDown ();
131
- }
132
-
133
- @ Override
134
- public void onCompleted () {
135
- cdl .countDown ();
136
- }
137
- });
138
- source2 .subscribe (test );
123
+ });
124
+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
125
+ source2 .subscribe (ts );
139
126
140
- try {
141
- cdl .await (20 , TimeUnit .SECONDS );
142
- } catch (InterruptedException ex ) {
143
- fail ("Interrupted" );
144
- }
145
-
146
- test .assertReceivedOnNext (Arrays .asList (expected ));
147
- test .assertTerminalEvent ();
148
- assertTrue (test .getOnErrorEvents ().isEmpty ());
127
+ ts .awaitTerminalEvent ();
128
+ ts .assertNoErrors ();
129
+ System .out .println (ts .getOnNextEvents ());
130
+ ts .assertReceivedOnNext (Arrays .asList (expected ));
149
131
}
132
+
150
133
@ Test (timeout = 10000 )
151
134
public void testWithAsyncSubjectAndRepeat () {
152
- testWithCustomSubjectAndRepeat (AsyncSubject .<Integer >create (), 3 , 3 , 3 , 3 );
135
+ testWithCustomSubjectAndRepeat (AsyncSubject .<Integer > create (), 3 , 3 , 3 , 3 );
153
136
}
137
+
154
138
@ Test (timeout = 10000 )
155
139
public void testWithBehaviorSubjectAndRepeat () {
156
140
// BehaviorSubject just completes when repeated
157
141
testWithCustomSubjectAndRepeat (BehaviorSubject .create (0 ), 0 , 1 , 2 , 3 );
158
142
}
143
+
159
144
@ Test (timeout = 10000 )
160
145
public void testWithPublishSubjectAndRepeat () {
161
146
// PublishSubject just completes when repeated
162
- testWithCustomSubjectAndRepeat (PublishSubject .<Integer >create (), 1 , 2 , 3 );
147
+ testWithCustomSubjectAndRepeat (PublishSubject .<Integer > create (), 1 , 2 , 3 );
163
148
}
164
- @ Test (timeout = 10000 )
149
+
150
+ @ Test
165
151
public void testWithReplaySubjectAndRepeat () {
166
- testWithCustomSubjectAndRepeat (ReplaySubject .<Integer >create (), 1 , 2 , 3 , 1 , 2 , 3 , 1 , 2 , 3 , 1 , 2 , 3 );
152
+ testWithCustomSubjectAndRepeat (ReplaySubject .<Integer > create (), 1 , 2 , 3 , 1 , 2 , 3 , 1 , 2 , 3 , 1 , 2 , 3 );
167
153
}
168
154
}
0 commit comments