27
27
import org .mockito .InOrder ;
28
28
29
29
import rx .Observable ;
30
+ import rx .Observable .OnSubscribe ;
30
31
import rx .Observer ;
31
32
import rx .Scheduler ;
32
- import rx .Subscription ;
33
+ import rx .Subscriber ;
33
34
import rx .functions .Action0 ;
34
35
import rx .schedulers .TestScheduler ;
35
- import rx .subscriptions . Subscriptions ;
36
+ import rx .subjects . PublishSubject ;
36
37
37
- public class OperationThrottleFirstTest {
38
+ public class OperatorThrottleFirstTest {
38
39
39
40
private TestScheduler scheduler ;
40
41
private Scheduler .Worker innerScheduler ;
@@ -50,20 +51,18 @@ public void before() {
50
51
51
52
@ Test
52
53
public void testThrottlingWithCompleted () {
53
- Observable <String > source = Observable .create (new Observable . OnSubscribeFunc <String >() {
54
+ Observable <String > source = Observable .create (new OnSubscribe <String >() {
54
55
@ Override
55
- public Subscription onSubscribe ( Observer <? super String > observer ) {
56
+ public void call ( Subscriber <? super String > observer ) {
56
57
publishNext (observer , 100 , "one" ); // publish as it's first
57
58
publishNext (observer , 300 , "two" ); // skip as it's last within the first 400
58
59
publishNext (observer , 900 , "three" ); // publish
59
60
publishNext (observer , 905 , "four" ); // skip
60
61
publishCompleted (observer , 1000 ); // Should be published as soon as the timeout expires.
61
-
62
- return Subscriptions .empty ();
63
62
}
64
63
});
65
64
66
- Observable <String > sampled = Observable . create ( OperationThrottleFirst . throttleFirst (source , 400 , TimeUnit .MILLISECONDS , scheduler ) );
65
+ Observable <String > sampled = source . throttleFirst (400 , TimeUnit .MILLISECONDS , scheduler );
67
66
sampled .subscribe (observer );
68
67
69
68
InOrder inOrder = inOrder (observer );
@@ -79,19 +78,17 @@ public Subscription onSubscribe(Observer<? super String> observer) {
79
78
80
79
@ Test
81
80
public void testThrottlingWithError () {
82
- Observable <String > source = Observable .create (new Observable . OnSubscribeFunc <String >() {
81
+ Observable <String > source = Observable .create (new OnSubscribe <String >() {
83
82
@ Override
84
- public Subscription onSubscribe ( Observer <? super String > observer ) {
83
+ public void call ( Subscriber <? super String > observer ) {
85
84
Exception error = new TestException ();
86
85
publishNext (observer , 100 , "one" ); // Should be published since it is first
87
86
publishNext (observer , 200 , "two" ); // Should be skipped since onError will arrive before the timeout expires
88
87
publishError (observer , 300 , error ); // Should be published as soon as the timeout expires.
89
-
90
- return Subscriptions .empty ();
91
88
}
92
89
});
93
90
94
- Observable <String > sampled = Observable . create ( OperationThrottleFirst . throttleFirst (source , 400 , TimeUnit .MILLISECONDS , scheduler ) );
91
+ Observable <String > sampled = source . throttleFirst (400 , TimeUnit .MILLISECONDS , scheduler );
95
92
sampled .subscribe (observer );
96
93
97
94
InOrder inOrder = inOrder (observer );
@@ -132,4 +129,36 @@ public void call() {
132
129
@ SuppressWarnings ("serial" )
133
130
private class TestException extends Exception {
134
131
}
132
+
133
+ @ Test
134
+ public void testThrottle () {
135
+ @ SuppressWarnings ("unchecked" )
136
+ Observer <Integer > observer = mock (Observer .class );
137
+ TestScheduler s = new TestScheduler ();
138
+ PublishSubject <Integer > o = PublishSubject .create ();
139
+ o .throttleFirst (500 , TimeUnit .MILLISECONDS , s ).subscribe (observer );
140
+
141
+ // send events with simulated time increments
142
+ s .advanceTimeTo (0 , TimeUnit .MILLISECONDS );
143
+ o .onNext (1 ); // deliver
144
+ o .onNext (2 ); // skip
145
+ s .advanceTimeTo (501 , TimeUnit .MILLISECONDS );
146
+ o .onNext (3 ); // deliver
147
+ s .advanceTimeTo (600 , TimeUnit .MILLISECONDS );
148
+ o .onNext (4 ); // skip
149
+ s .advanceTimeTo (700 , TimeUnit .MILLISECONDS );
150
+ o .onNext (5 ); // skip
151
+ o .onNext (6 ); // skip
152
+ s .advanceTimeTo (1001 , TimeUnit .MILLISECONDS );
153
+ o .onNext (7 ); // deliver
154
+ s .advanceTimeTo (1501 , TimeUnit .MILLISECONDS );
155
+ o .onCompleted ();
156
+
157
+ InOrder inOrder = inOrder (observer );
158
+ inOrder .verify (observer ).onNext (1 );
159
+ inOrder .verify (observer ).onNext (3 );
160
+ inOrder .verify (observer ).onNext (7 );
161
+ inOrder .verify (observer ).onCompleted ();
162
+ inOrder .verifyNoMoreInteractions ();
163
+ }
135
164
}
0 commit comments