Skip to content

Commit dc4ee52

Browse files
committed
Proposed solution to the time gap, using unbounded buffering.
1 parent 1a86347 commit dc4ee52

File tree

4 files changed

+441
-37
lines changed

4 files changed

+441
-37
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7062,7 +7062,25 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
70627062
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
70637063
*/
70647064
public final Observable<T> subscribeOn(Scheduler scheduler) {
7065-
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
7065+
return nest().lift(new OperatorSubscribeOn<T>(scheduler, false));
7066+
}
7067+
/**
7068+
* Asynchronously subscribes and unsubscribes Observers to this Observable on the specified {@link Scheduler}
7069+
* and allows buffering the events emitted from the source in the time gap between the original and
7070+
* actual subscription.
7071+
* <p>
7072+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/subscribeOn.png">
7073+
*
7074+
* @param scheduler
7075+
* the {@link Scheduler} to perform subscription and unsubscription actions on
7076+
* @param dontLoseEvents indicate that the operator should buffer events emitted in the time gap
7077+
* between the original and actual subscription and replay it to Observers
7078+
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
7079+
* on the specified {@link Scheduler}
7080+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
7081+
*/
7082+
public final Observable<T> subscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
7083+
return nest().lift(new OperatorSubscribeOn<T>(scheduler, dontLoseEvents));
70667084
}
70677085

70687086
/**
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.LinkedList;
19+
import java.util.Queue;
20+
import rx.Subscriber;
21+
import rx.subscriptions.CompositeSubscription;
22+
23+
/**
24+
* Buffers the incoming events until notified, then replays the
25+
* buffered events and continues as a simple pass-through subscriber.
26+
* @param <T> the streamed value type
27+
*/
28+
public class BufferUntilSubscriber<T> extends Subscriber<T> {
29+
/** The actual subscriber. */
30+
private final Subscriber<? super T> actual;
31+
/** The mutual exclusion for the duration of the replay. */
32+
private final Object gate = new Object();
33+
/** Queued events. */
34+
private final Queue<Object> queue = new LinkedList<Object>();
35+
/** Indicate the pass-through mode. */
36+
private volatile boolean passthroughMode;
37+
/** Null sentinel (in case queue type is changed). */
38+
private static final Object NULL_SENTINEL = new Object();
39+
/** Complete sentinel. */
40+
private static final Object COMPLETE_SENTINEL = new Object();
41+
/**
42+
* Container for an onError event.
43+
*/
44+
private static final class ErrorSentinel {
45+
final Throwable t;
46+
47+
public ErrorSentinel(Throwable t) {
48+
this.t = t;
49+
}
50+
51+
}
52+
/**
53+
* Constructor that wraps the actual subscriber and shares its subscription.
54+
* @param actual
55+
*/
56+
public BufferUntilSubscriber(Subscriber<? super T> actual) {
57+
super(actual);
58+
this.actual = actual;
59+
}
60+
/**
61+
* Constructor that wraps the actual subscriber and uses the given composite
62+
* subscription.
63+
* @param actual
64+
* @param cs
65+
*/
66+
public BufferUntilSubscriber(Subscriber<? super T> actual, CompositeSubscription cs) {
67+
super(cs);
68+
this.actual = actual;
69+
}
70+
71+
/**
72+
* Call this method to replay the buffered events and continue as a pass-through subscriber.
73+
* If already in pass-through mode, this method is a no-op.
74+
*/
75+
public void enterPassthroughMode() {
76+
if (!passthroughMode) {
77+
synchronized (gate) {
78+
if (!passthroughMode) {
79+
while (!queue.isEmpty()) {
80+
Object o = queue.poll();
81+
82+
if (o == NULL_SENTINEL) {
83+
actual.onNext(null);
84+
} else
85+
if (o == COMPLETE_SENTINEL) {
86+
actual.onCompleted();
87+
} else
88+
if (o instanceof ErrorSentinel) {
89+
actual.onError(((ErrorSentinel)o).t);
90+
} else
91+
if (o != null) {
92+
@SuppressWarnings("unchecked")
93+
T v = (T)o;
94+
actual.onNext(v);
95+
} else {
96+
throw new NullPointerException();
97+
}
98+
}
99+
/* Test artificial back-pressure.
100+
try {
101+
TimeUnit.SECONDS.sleep(2);
102+
} catch (Throwable t) {
103+
104+
}
105+
*/
106+
passthroughMode = true;
107+
}
108+
}
109+
}
110+
}
111+
112+
@Override
113+
public void onNext(T t) {
114+
if (!passthroughMode) {
115+
synchronized (gate) {
116+
if (!passthroughMode) {
117+
queue.offer(t != null ? t : NULL_SENTINEL);
118+
return;
119+
}
120+
}
121+
}
122+
actual.onNext(t);
123+
}
124+
125+
@Override
126+
public void onError(Throwable e) {
127+
if (!passthroughMode) {
128+
synchronized (gate) {
129+
if (!passthroughMode) {
130+
queue.offer(new ErrorSentinel(e));
131+
return;
132+
}
133+
}
134+
}
135+
actual.onError(e);
136+
}
137+
138+
@Override
139+
public void onCompleted() {
140+
if (!passthroughMode) {
141+
synchronized (gate) {
142+
if (!passthroughMode) {
143+
queue.offer(COMPLETE_SENTINEL);
144+
return;
145+
}
146+
}
147+
}
148+
actual.onCompleted();
149+
}
150+
}

rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import rx.Scheduler;
2121
import rx.Scheduler.Inner;
2222
import rx.Subscriber;
23+
import rx.observables.GroupedObservable;
24+
import rx.subjects.PublishSubject;
2325
import rx.subscriptions.CompositeSubscription;
2426
import rx.subscriptions.Subscriptions;
2527
import rx.util.functions.Action0;
@@ -33,9 +35,15 @@
3335
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
3436

3537
private final Scheduler scheduler;
38+
/**
39+
* Indicate that events fired between the original subscription time and
40+
* the actual subscription time should not get lost.
41+
*/
42+
private final boolean dontLoseEvents;
3643

37-
public OperatorSubscribeOn(Scheduler scheduler) {
44+
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
3845
this.scheduler = scheduler;
46+
this.dontLoseEvents = dontLoseEvents;
3947
}
4048

4149
@Override
@@ -51,9 +59,40 @@ public void onCompleted() {
5159
public void onError(Throwable e) {
5260
subscriber.onError(e);
5361
}
54-
62+
boolean checkNeedBuffer(Observable<?> o) {
63+
return (o instanceof GroupedObservable<?, ?>)
64+
|| (o instanceof PublishSubject<?>)
65+
// || (o instanceof BehaviorSubject<?, ?>)
66+
;
67+
}
5568
@Override
5669
public void onNext(final Observable<T> o) {
70+
if (dontLoseEvents || checkNeedBuffer(o)) {
71+
final CompositeSubscription cs = new CompositeSubscription();
72+
subscriber.add(cs);
73+
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(subscriber, new CompositeSubscription());
74+
o.subscribe(bus);
75+
scheduler.schedule(new Action1<Inner>() {
76+
77+
@Override
78+
public void call(final Inner inner) {
79+
cs.add(Subscriptions.create(new Action0() {
80+
@Override
81+
public void call() {
82+
inner.schedule(new Action1<Inner>() {
83+
@Override
84+
public void call(final Inner inner) {
85+
bus.unsubscribe();
86+
}
87+
});
88+
}
89+
}));
90+
bus.enterPassthroughMode();
91+
}
92+
93+
});
94+
return;
95+
}
5796
scheduler.schedule(new Action1<Inner>() {
5897

5998
@Override

0 commit comments

Comments
 (0)