File tree Expand file tree Collapse file tree 2 files changed +39
-7
lines changed Expand file tree Collapse file tree 2 files changed +39
-7
lines changed Original file line number Diff line number Diff line change @@ -38,18 +38,22 @@ public OperatorTake(int limit) {
38
38
}
39
39
40
40
@ Override
41
- public Subscriber <? super T > call (final Subscriber <? super T > o ) {
42
- CompositeSubscription parent = new CompositeSubscription ();
41
+ public Subscriber <? super T > call (final Subscriber <? super T > child ) {
42
+ final CompositeSubscription parent = new CompositeSubscription ();
43
43
if (limit == 0 ) {
44
- o .onCompleted ();
44
+ child .onCompleted ();
45
45
parent .unsubscribe ();
46
46
}
47
+
47
48
/*
48
49
* We decouple the parent and child subscription so there can be multiple take() in a chain
49
50
* such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children.
50
51
*
51
52
* Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM.
53
+ *
54
+ * However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child'
52
55
*/
56
+ child .add (parent );
53
57
return new Subscriber <T >(parent ) {
54
58
55
59
int count = 0 ;
@@ -58,24 +62,24 @@ public Subscriber<? super T> call(final Subscriber<? super T> o) {
58
62
@ Override
59
63
public void onCompleted () {
60
64
if (!completed ) {
61
- o .onCompleted ();
65
+ child .onCompleted ();
62
66
}
63
67
}
64
68
65
69
@ Override
66
70
public void onError (Throwable e ) {
67
71
if (!completed ) {
68
- o .onError (e );
72
+ child .onError (e );
69
73
}
70
74
}
71
75
72
76
@ Override
73
77
public void onNext (T i ) {
74
78
if (!isUnsubscribed ()) {
75
- o .onNext (i );
79
+ child .onNext (i );
76
80
if (++count >= limit ) {
77
81
completed = true ;
78
- o .onCompleted ();
82
+ child .onCompleted ();
79
83
unsubscribe ();
80
84
}
81
85
}
Original file line number Diff line number Diff line change 21
21
22
22
import java .util .Arrays ;
23
23
import java .util .concurrent .atomic .AtomicBoolean ;
24
+ import java .util .concurrent .atomic .AtomicInteger ;
24
25
import java .util .concurrent .atomic .AtomicLong ;
25
26
26
27
import org .junit .Test ;
@@ -210,6 +211,33 @@ public void call(Long l) {
210
211
assertEquals (10 , count .get ());
211
212
}
212
213
214
+ @ Test (timeout = 2000 )
215
+ public void testMultiTake () {
216
+ final AtomicInteger count = new AtomicInteger ();
217
+ Observable .create (new OnSubscribe <Integer >() {
218
+
219
+ @ Override
220
+ public void call (Subscriber <? super Integer > s ) {
221
+ for (int i = 0 ; !s .isUnsubscribed (); i ++) {
222
+ System .out .println ("Emit: " + i );
223
+ count .incrementAndGet ();
224
+ s .onNext (i );
225
+ }
226
+ }
227
+
228
+ }).take (100 ).take (1 ).toBlockingObservable ().forEach (new Action1 <Integer >() {
229
+
230
+ @ Override
231
+ public void call (Integer t1 ) {
232
+ System .out .println ("Receive: " + t1 );
233
+
234
+ }
235
+
236
+ });
237
+
238
+ assertEquals (1 , count .get ());
239
+ }
240
+
213
241
private static class TestObservableFunc implements Observable .OnSubscribeFunc <String > {
214
242
215
243
final Subscription s ;
You can’t perform that action at this time.
0 commit comments