Skip to content

Commit 1f43bc8

Browse files
Merge branch 'BlockingBufferUntilExperiment' of github.com:akarnokd/RxJava into subscribeOn-blockingBuffer
2 parents 2cedb25 + 5209ab1 commit 1f43bc8

File tree

4 files changed

+532
-38
lines changed

4 files changed

+532
-38
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7052,9 +7052,32 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
70527052
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
70537053
* on the specified {@link Scheduler}
70547054
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
7055+
* @see #subscribeOn(rx.Scheduler, int)
70557056
*/
70567057
public final Observable<T> subscribeOn(Scheduler scheduler) {
7057-
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
7058+
return nest().lift(new OperatorSubscribeOn<T>(scheduler, false));
7059+
}
7060+
/**
7061+
* Asynchronously subscribes and unsubscribes Observers to this Observable on the specified {@link Scheduler}
7062+
* and allows buffering some events emitted from the source in the time gap between the original and
7063+
* actual subscription, and any excess events will block the source until the actual subscription happens.
7064+
* <p>
7065+
* This overload should help mitigate issues when subscribing to a PublishSubject (and derivatives
7066+
* such as GroupedObservable in operator groupBy) and events fired between the original and actual subscriptions
7067+
* are lost.
7068+
* <p>
7069+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/subscribeOn.png">
7070+
*
7071+
* @param scheduler
7072+
* the {@link Scheduler} to perform subscription and unsubscription actions on
7073+
* @param bufferSize the number of events to buffer before blocking the source while in the time gap,
7074+
* negative value indicates an unlimited buffer
7075+
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
7076+
* on the specified {@link Scheduler}
7077+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
7078+
*/
7079+
public final Observable<T> subscribeOn(Scheduler scheduler, int bufferSize) {
7080+
return nest().lift(new OperatorSubscribeOn<T>(scheduler, true, bufferSize));
70587081
}
70597082

70607083
/**
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.LinkedList;
19+
import java.util.Queue;
20+
import rx.Subscriber;
21+
import rx.subscriptions.CompositeSubscription;
22+
23+
/**
24+
* Buffers the incoming events until notified, then replays the
25+
* buffered events and continues as a simple pass-through subscriber.
26+
* @param <T> the streamed value type
27+
*/
28+
public class BufferUntilSubscriber<T> extends Subscriber<T> {
29+
/** The actual subscriber. */
30+
private final Subscriber<? super T> actual;
31+
/** Indicate the pass-through mode. */
32+
private volatile boolean passthroughMode;
33+
/** Protect mode transition. */
34+
private final Object gate = new Object();
35+
/** The buffered items. */
36+
private final Queue<Object> queue = new LinkedList<Object>();
37+
/** The queue capacity. */
38+
private final int capacity;
39+
/** Null sentinel (in case queue type is changed). */
40+
private static final Object NULL_SENTINEL = new Object();
41+
/** Complete sentinel. */
42+
private static final Object COMPLETE_SENTINEL = new Object();
43+
/**
44+
* Container for an onError event.
45+
*/
46+
private static final class ErrorSentinel {
47+
final Throwable t;
48+
49+
public ErrorSentinel(Throwable t) {
50+
this.t = t;
51+
}
52+
53+
}
54+
/**
55+
* Constructor that wraps the actual subscriber and shares its subscription.
56+
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
57+
* @param actual
58+
*/
59+
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual) {
60+
super(actual);
61+
this.actual = actual;
62+
this.capacity = capacity;
63+
}
64+
/**
65+
* Constructor that wraps the actual subscriber and uses the given composite
66+
* subscription.
67+
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
68+
* @param actual
69+
* @param cs
70+
*/
71+
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual, CompositeSubscription cs) {
72+
super(cs);
73+
this.actual = actual;
74+
this.capacity = capacity;
75+
}
76+
77+
/**
78+
* Call this method to replay the buffered events and continue as a pass-through subscriber.
79+
* If already in pass-through mode, this method is a no-op.
80+
*/
81+
public void enterPassthroughMode() {
82+
if (!passthroughMode) {
83+
synchronized (gate) {
84+
if (!passthroughMode) {
85+
while (!queue.isEmpty()) {
86+
Object o = queue.poll();
87+
if (!actual.isUnsubscribed()) {
88+
if (o == NULL_SENTINEL) {
89+
actual.onNext(null);
90+
} else
91+
if (o == COMPLETE_SENTINEL) {
92+
actual.onCompleted();
93+
} else
94+
if (o instanceof ErrorSentinel) {
95+
actual.onError(((ErrorSentinel)o).t);
96+
} else
97+
if (o != null) {
98+
@SuppressWarnings("unchecked")
99+
T v = (T)o;
100+
actual.onNext(v);
101+
} else {
102+
throw new NullPointerException();
103+
}
104+
}
105+
}
106+
passthroughMode = true;
107+
gate.notifyAll();
108+
}
109+
}
110+
}
111+
}
112+
@Override
113+
public void onNext(T t) {
114+
if (!passthroughMode) {
115+
synchronized (gate) {
116+
if (!passthroughMode) {
117+
if (capacity < 0 || queue.size() < capacity) {
118+
queue.offer(t != null ? t : NULL_SENTINEL);
119+
return;
120+
}
121+
try {
122+
while (!passthroughMode) {
123+
gate.wait();
124+
}
125+
if (actual.isUnsubscribed()) {
126+
return;
127+
}
128+
} catch (InterruptedException ex) {
129+
Thread.currentThread().interrupt();
130+
actual.onError(ex);
131+
return;
132+
}
133+
}
134+
}
135+
}
136+
actual.onNext(t);
137+
}
138+
139+
@Override
140+
public void onError(Throwable e) {
141+
if (!passthroughMode) {
142+
synchronized (gate) {
143+
if (!passthroughMode) {
144+
if (capacity < 0 || queue.size() < capacity) {
145+
queue.offer(new ErrorSentinel(e));
146+
return;
147+
}
148+
try {
149+
while (!passthroughMode) {
150+
gate.wait();
151+
}
152+
if (actual.isUnsubscribed()) {
153+
return;
154+
}
155+
} catch (InterruptedException ex) {
156+
Thread.currentThread().interrupt();
157+
actual.onError(ex);
158+
return;
159+
}
160+
}
161+
}
162+
}
163+
actual.onError(e);
164+
}
165+
166+
@Override
167+
public void onCompleted() {
168+
if (!passthroughMode) {
169+
synchronized (gate) {
170+
if (!passthroughMode) {
171+
if (capacity < 0 || queue.size() < capacity) {
172+
queue.offer(COMPLETE_SENTINEL);
173+
return;
174+
}
175+
try {
176+
while (!passthroughMode) {
177+
gate.wait();
178+
}
179+
if (actual.isUnsubscribed()) {
180+
return;
181+
}
182+
} catch (InterruptedException ex) {
183+
Thread.currentThread().interrupt();
184+
actual.onError(ex);
185+
return;
186+
}
187+
}
188+
}
189+
}
190+
actual.onCompleted();
191+
}
192+
193+
}

rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import rx.Scheduler;
2121
import rx.Scheduler.Inner;
2222
import rx.Subscriber;
23+
import rx.observables.GroupedObservable;
24+
import rx.subjects.PublishSubject;
2325
import rx.subscriptions.CompositeSubscription;
2426
import rx.subscriptions.Subscriptions;
2527
import rx.util.functions.Action0;
@@ -33,9 +35,27 @@
3335
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
3436

3537
private final Scheduler scheduler;
36-
37-
public OperatorSubscribeOn(Scheduler scheduler) {
38+
/**
39+
* Indicate that events fired between the original subscription time and
40+
* the actual subscription time should not get lost.
41+
*/
42+
private final boolean dontLoseEvents;
43+
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
44+
private final int bufferSize;
45+
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
46+
this(scheduler, dontLoseEvents, -1);
47+
}
48+
/**
49+
* Construct a SubscribeOn operator.
50+
* @param scheduler the target scheduler
51+
* @param dontLoseEvents indicate that events should be buffered until the actual subscription happens
52+
* @param bufferSize if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
53+
* block the source. -1 indicates an unbounded buffer
54+
*/
55+
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents, int bufferSize) {
3856
this.scheduler = scheduler;
57+
this.dontLoseEvents = dontLoseEvents;
58+
this.bufferSize = bufferSize;
3959
}
4060

4161
@Override
@@ -51,9 +71,38 @@ public void onCompleted() {
5171
public void onError(Throwable e) {
5272
subscriber.onError(e);
5373
}
54-
74+
boolean checkNeedBuffer(Observable<?> o) {
75+
return dontLoseEvents || ((o instanceof GroupedObservable<?, ?>)
76+
|| (o instanceof PublishSubject<?>)
77+
// || (o instanceof BehaviorSubject<?, ?>)
78+
);
79+
}
5580
@Override
5681
public void onNext(final Observable<T> o) {
82+
if (checkNeedBuffer(o)) {
83+
final CompositeSubscription cs = new CompositeSubscription();
84+
subscriber.add(cs);
85+
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber, new CompositeSubscription());
86+
o.subscribe(bus);
87+
scheduler.schedule(new Action1<Inner>() {
88+
@Override
89+
public void call(final Inner inner) {
90+
cs.add(Subscriptions.create(new Action0() {
91+
@Override
92+
public void call() {
93+
inner.schedule(new Action1<Inner>() {
94+
@Override
95+
public void call(final Inner inner) {
96+
bus.unsubscribe();
97+
}
98+
});
99+
}
100+
}));
101+
bus.enterPassthroughMode();
102+
}
103+
});
104+
return;
105+
}
57106
scheduler.schedule(new Action1<Inner>() {
58107

59108
@Override

0 commit comments

Comments
 (0)