Skip to content

Commit a394a7d

Browse files
GroupBy and SubscribeOn Tests Passing
1 parent cb5b5fb commit a394a7d

File tree

3 files changed

+156
-142
lines changed

3 files changed

+156
-142
lines changed

rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java

Lines changed: 55 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,44 @@
2222
import rx.Subscriber;
2323
import rx.observables.GroupedObservable;
2424
import rx.subjects.PublishSubject;
25-
import rx.subscriptions.CompositeSubscription;
26-
import rx.subscriptions.Subscriptions;
27-
import rx.util.functions.Action0;
2825
import rx.util.functions.Action1;
2926

3027
/**
31-
* Asynchronously subscribes and unsubscribes Observers on the specified Scheduler.
28+
* Subscribes and unsubscribes Observers on the specified Scheduler.
3229
* <p>
30+
* Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
31+
* in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
32+
* <p>
33+
* See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
34+
* subscribe is solving.
35+
*
3336
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
3437
*/
3538
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
3639

3740
private final Scheduler scheduler;
38-
/**
41+
/**
3942
* Indicate that events fired between the original subscription time and
4043
* the actual subscription time should not get lost.
4144
*/
4245
private final boolean dontLoseEvents;
4346
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
4447
private final int bufferSize;
48+
4549
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
4650
this(scheduler, dontLoseEvents, -1);
4751
}
52+
4853
/**
4954
* Construct a SubscribeOn operator.
50-
* @param scheduler the target scheduler
51-
* @param dontLoseEvents indicate that events should be buffered until the actual subscription happens
52-
* @param bufferSize if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
53-
* block the source. -1 indicates an unbounded buffer
55+
*
56+
* @param scheduler
57+
* the target scheduler
58+
* @param dontLoseEvents
59+
* indicate that events should be buffered until the actual subscription happens
60+
* @param bufferSize
61+
* if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
62+
* block the source. -1 indicates an unbounded buffer
5463
*/
5564
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents, int bufferSize) {
5665
this.scheduler = scheduler;
@@ -71,78 +80,61 @@ public void onCompleted() {
7180
public void onError(Throwable e) {
7281
subscriber.onError(e);
7382
}
83+
7484
boolean checkNeedBuffer(Observable<?> o) {
75-
return dontLoseEvents || ((o instanceof GroupedObservable<?, ?>)
85+
/*
86+
* Included are some Observable types known to be "hot" and thus needing
87+
* buffering when subscribing across thread boundaries otherwise
88+
* we can lose data.
89+
*
90+
* See https://github.com/Netflix/RxJava/issues/844 for more information.
91+
*/
92+
return dontLoseEvents
93+
|| ((o instanceof GroupedObservable<?, ?>)
7694
|| (o instanceof PublishSubject<?>)
7795
// || (o instanceof BehaviorSubject<?, ?>)
7896
);
7997
}
98+
8099
@Override
81100
public void onNext(final Observable<T> o) {
82101
if (checkNeedBuffer(o)) {
83-
final CompositeSubscription cs = new CompositeSubscription();
84-
subscriber.add(cs);
85-
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber, new CompositeSubscription());
102+
// use buffering (possibly blocking) for a possibly synchronous subscribe
103+
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber);
86104
o.subscribe(bus);
87-
scheduler.schedule(new Action1<Inner>() {
105+
subscriber.add(scheduler.schedule(new Action1<Inner>() {
88106
@Override
89107
public void call(final Inner inner) {
90-
cs.add(Subscriptions.create(new Action0() {
91-
@Override
92-
public void call() {
93-
inner.schedule(new Action1<Inner>() {
94-
@Override
95-
public void call(final Inner inner) {
96-
bus.unsubscribe();
97-
}
98-
});
99-
}
100-
}));
101108
bus.enterPassthroughMode();
102109
}
103-
});
110+
}));
104111
return;
105-
}
106-
scheduler.schedule(new Action1<Inner>() {
107-
108-
@Override
109-
public void call(final Inner inner) {
110-
final CompositeSubscription cs = new CompositeSubscription();
111-
subscriber.add(Subscriptions.create(new Action0() {
112-
113-
@Override
114-
public void call() {
115-
inner.schedule(new Action1<Inner>() {
116-
117-
@Override
118-
public void call(final Inner inner) {
119-
cs.unsubscribe();
120-
}
121-
122-
});
123-
}
112+
} else {
113+
// no buffering (async subscribe)
114+
scheduler.schedule(new Action1<Inner>() {
124115

125-
}));
126-
cs.add(subscriber);
127-
o.subscribe(new Subscriber<T>(cs) {
116+
@Override
117+
public void call(final Inner inner) {
118+
o.subscribe(new Subscriber<T>(subscriber) {
128119

129-
@Override
130-
public void onCompleted() {
131-
subscriber.onCompleted();
132-
}
120+
@Override
121+
public void onCompleted() {
122+
subscriber.onCompleted();
123+
}
133124

134-
@Override
135-
public void onError(Throwable e) {
136-
subscriber.onError(e);
137-
}
125+
@Override
126+
public void onError(Throwable e) {
127+
subscriber.onError(e);
128+
}
138129

139-
@Override
140-
public void onNext(T t) {
141-
subscriber.onNext(t);
142-
}
143-
});
144-
}
145-
});
130+
@Override
131+
public void onNext(T t) {
132+
subscriber.onNext(t);
133+
}
134+
});
135+
}
136+
});
137+
}
146138
}
147139

148140
};

rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,6 @@ public void testUnsubscribeOnNestedTakeAndSyncInfiniteStream() throws Interrupte
253253
/*
254254
* We will only take 1 group with 20 events from it and then unsubscribe.
255255
*/
256-
@Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844
257256
@Test
258257
public void testUnsubscribeOnNestedTakeAndAsyncInfiniteStream() throws InterruptedException {
259258
final AtomicInteger subscribeCounter = new AtomicInteger();
@@ -648,7 +647,6 @@ public void call(String s) {
648647
assertEquals(6, results.size());
649648
}
650649

651-
@Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844
652650
@Test
653651
public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
654652
final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
@@ -702,7 +700,7 @@ public void call() {
702700

703701
});
704702
} else {
705-
return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
703+
return group.subscribeOn(Schedulers.newThread(), 1).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
706704

707705
@Override
708706
public String call(Integer t1) {
@@ -803,7 +801,6 @@ public void call(String s) {
803801
assertEquals(6, results.size());
804802
}
805803

806-
@Ignore // failing because of subscribeOn time gap issue: https://github.com/Netflix/RxJava/issues/844
807804
@Test
808805
public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
809806
final ArrayList<String> results = new ArrayList<String>();
@@ -829,7 +826,7 @@ public Integer call(Integer t) {
829826

830827
@Override
831828
public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
832-
return group.subscribeOn(Schedulers.newThread()).map(new Func1<Integer, String>() {
829+
return group.subscribeOn(Schedulers.newThread(), 0).map(new Func1<Integer, String>() {
833830

834831
@Override
835832
public String call(Integer t1) {

0 commit comments

Comments
 (0)