Skip to content

Commit 1cac6fc

Browse files
Hide Buffer/Blocking SubscribeOn Behavior
- I want to have it for internal usage but am not ready to publicly expose it.
1 parent a394a7d commit 1cac6fc

File tree

4 files changed

+14
-50
lines changed

4 files changed

+14
-50
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7055,32 +7055,9 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
70557055
* @see #subscribeOn(rx.Scheduler, int)
70567056
*/
70577057
public final Observable<T> subscribeOn(Scheduler scheduler) {
7058-
return nest().lift(new OperatorSubscribeOn<T>(scheduler, false));
7058+
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
70597059
}
70607060

7061-
/**
7062-
* Asynchronously subscribes and unsubscribes Observers to this Observable on the specified {@link Scheduler}
7063-
* and allows buffering some events emitted from the source in the time gap between the original and
7064-
* actual subscription, and any excess events will block the source until the actual subscription happens.
7065-
* <p>
7066-
* This overload should help mitigate issues when subscribing to a PublishSubject (and derivatives
7067-
* such as GroupedObservable in operator groupBy) and events fired between the original and actual subscriptions
7068-
* are lost.
7069-
* <p>
7070-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/subscribeOn.png">
7071-
*
7072-
* @param scheduler
7073-
* the {@link Scheduler} to perform subscription and unsubscription actions on
7074-
* @param bufferSize the number of events to buffer before blocking the source while in the time gap,
7075-
* negative value indicates an unlimited buffer
7076-
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
7077-
* on the specified {@link Scheduler}
7078-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
7079-
*/
7080-
public final Observable<T> subscribeOn(Scheduler scheduler, int bufferSize) {
7081-
return nest().lift(new OperatorSubscribeOn<T>(scheduler, true, bufferSize));
7082-
}
7083-
70847061
/**
70857062
* Returns an Observable that extracts a Double from each of the items emitted by the source
70867063
* Observable via a function you specify, and then emits the sum of these Doubles.

rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import rx.Scheduler;
2121
import rx.Scheduler.Inner;
2222
import rx.Subscriber;
23-
import rx.observables.GroupedObservable;
24-
import rx.subjects.PublishSubject;
2523
import rx.util.functions.Action1;
2624

2725
/**
@@ -46,24 +44,24 @@ public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
4644
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
4745
private final int bufferSize;
4846

49-
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
50-
this(scheduler, dontLoseEvents, -1);
47+
public OperatorSubscribeOn(Scheduler scheduler) {
48+
this.scheduler = scheduler;
49+
this.dontLoseEvents = false;
50+
this.bufferSize = -1;
5151
}
5252

5353
/**
5454
* Construct a SubscribeOn operator.
5555
*
5656
* @param scheduler
5757
* the target scheduler
58-
* @param dontLoseEvents
59-
* indicate that events should be buffered until the actual subscription happens
6058
* @param bufferSize
6159
* if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
6260
* block the source. -1 indicates an unbounded buffer
6361
*/
64-
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents, int bufferSize) {
62+
public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) {
6563
this.scheduler = scheduler;
66-
this.dontLoseEvents = dontLoseEvents;
64+
this.dontLoseEvents = true;
6765
this.bufferSize = bufferSize;
6866
}
6967

@@ -82,18 +80,7 @@ public void onError(Throwable e) {
8280
}
8381

8482
boolean checkNeedBuffer(Observable<?> o) {
85-
/*
86-
* Included are some Observable types known to be "hot" and thus needing
87-
* buffering when subscribing across thread boundaries otherwise
88-
* we can lose data.
89-
*
90-
* See https://github.com/Netflix/RxJava/issues/844 for more information.
91-
*/
92-
return dontLoseEvents
93-
|| ((o instanceof GroupedObservable<?, ?>)
94-
|| (o instanceof PublishSubject<?>)
95-
// || (o instanceof BehaviorSubject<?, ?>)
96-
);
83+
return dontLoseEvents;
9784
}
9885

9986
@Override

rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ public void call() {
700700

701701
});
702702
} else {
703-
return group.subscribeOn(Schedulers.newThread(), 1).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
703+
return group.nest().lift(new OperatorSubscribeOn<Integer>(Schedulers.newThread(), 1)).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
704704

705705
@Override
706706
public String call(Integer t1) {
@@ -826,7 +826,7 @@ public Integer call(Integer t) {
826826

827827
@Override
828828
public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
829-
return group.subscribeOn(Schedulers.newThread(), 0).map(new Func1<Integer, String>() {
829+
return group.nest().lift(new OperatorSubscribeOn<Integer>(Schedulers.newThread(), 0)).map(new Func1<Integer, String>() {
830830

831831
@Override
832832
public String call(Integer t1) {

rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public Subscription schedule(final Action1<Scheduler.Inner> action, final long d
186186
public void testSubscribeOnPublishSubjectWithSlowScheduler() {
187187
PublishSubject<Integer> ps = PublishSubject.create();
188188
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
189-
ps.subscribeOn(new SlowScheduler()).subscribe(ts);
189+
ps.nest().lift(new OperatorSubscribeOn<Integer>(new SlowScheduler(), 0)).subscribe(ts);
190190
ps.onNext(1);
191191
ps.onNext(2);
192192
ps.onCompleted();
@@ -220,7 +220,7 @@ public Integer call(Integer t) {
220220

221221
@Override
222222
public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
223-
return group.subscribeOn(Schedulers.newThread()).map(new Func1<Integer, String>() {
223+
return group.nest().lift(new OperatorSubscribeOn<Integer>(Schedulers.newThread(), 0)).map(new Func1<Integer, String>() {
224224

225225
@Override
226226
public String call(Integer t1) {
@@ -326,8 +326,8 @@ void testBoundedBufferingWithSize(int size) throws Exception {
326326

327327
final List<Long> deltas = Collections.synchronizedList(new ArrayList<Long>());
328328

329-
Subscription s = timer.timestamp().subscribeOn(
330-
new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size).map(new Func1<Timestamped<Long>, Long>() {
329+
Subscription s = timer.timestamp().nest().lift(new OperatorSubscribeOn<Timestamped<Long>>(
330+
new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size)).map(new Func1<Timestamped<Long>, Long>() {
331331
@Override
332332
public Long call(Timestamped<Long> t1) {
333333
long v = System.currentTimeMillis() - t1.getTimestampMillis();

0 commit comments

Comments
 (0)