Skip to content

Commit dade7e1

Browse files
committed
Added bounded buffering capability to SubscribeOn
1 parent dc4ee52 commit dade7e1

File tree

4 files changed

+140
-60
lines changed

4 files changed

+140
-60
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7060,27 +7060,32 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
70607060
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
70617061
* on the specified {@link Scheduler}
70627062
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
7063+
* @see #subscribeOn(rx.Scheduler, int)
70637064
*/
70647065
public final Observable<T> subscribeOn(Scheduler scheduler) {
70657066
return nest().lift(new OperatorSubscribeOn<T>(scheduler, false));
70667067
}
70677068
/**
70687069
* Asynchronously subscribes and unsubscribes Observers to this Observable on the specified {@link Scheduler}
7069-
* and allows buffering the events emitted from the source in the time gap between the original and
7070-
* actual subscription.
7070+
* and allows buffering some events emitted from the source in the time gap between the original and
7071+
* actual subscription, and any excess events will block the source until the actual subscription happens.
7072+
* <p>
7073+
* This overload should help mitigate issues when subscribing to a PublishSubject (and derivatives
7074+
* such as GroupedObservable in operator groupBy) and events fired between the original and actual subscriptions
7075+
* are lost.
70717076
* <p>
70727077
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/subscribeOn.png">
70737078
*
70747079
* @param scheduler
70757080
* the {@link Scheduler} to perform subscription and unsubscription actions on
7076-
* @param dontLoseEvents indicate that the operator should buffer events emitted in the time gap
7077-
* between the original and actual subscription and replay it to Observers
7081+
* @param bufferSize the number of events to buffer before blocking the source while in the time gap,
7082+
* negative value indicates an unlimited buffer
70787083
* @return the source Observable modified so that its subscriptions and unsubscriptions happen
70797084
* on the specified {@link Scheduler}
70807085
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
70817086
*/
7082-
public final Observable<T> subscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
7083-
return nest().lift(new OperatorSubscribeOn<T>(scheduler, dontLoseEvents));
7087+
public final Observable<T> subscribeOn(Scheduler scheduler, int bufferSize) {
7088+
return nest().lift(new OperatorSubscribeOn<T>(scheduler, true, bufferSize));
70847089
}
70857090

70867091
/**

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@
2828
public class BufferUntilSubscriber<T> extends Subscriber<T> {
2929
/** The actual subscriber. */
3030
private final Subscriber<? super T> actual;
31-
/** The mutual exclusion for the duration of the replay. */
32-
private final Object gate = new Object();
33-
/** Queued events. */
34-
private final Queue<Object> queue = new LinkedList<Object>();
3531
/** Indicate the pass-through mode. */
3632
private volatile boolean passthroughMode;
33+
/** Protect mode transition. */
34+
private final Object gate = new Object();
35+
/** The buffered items. */
36+
private final Queue<Object> queue = new LinkedList<Object>();
37+
/** The queue capacity. */
38+
private final int capacity;
3739
/** Null sentinel (in case queue type is changed). */
3840
private static final Object NULL_SENTINEL = new Object();
3941
/** Complete sentinel. */
@@ -51,21 +53,25 @@ public ErrorSentinel(Throwable t) {
5153
}
5254
/**
5355
* Constructor that wraps the actual subscriber and shares its subscription.
54-
* @param actual
56+
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
57+
* @param actual
5558
*/
56-
public BufferUntilSubscriber(Subscriber<? super T> actual) {
59+
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual) {
5760
super(actual);
5861
this.actual = actual;
62+
this.capacity = capacity;
5963
}
6064
/**
6165
* Constructor that wraps the actual subscriber and uses the given composite
6266
* subscription.
67+
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
6368
* @param actual
6469
* @param cs
6570
*/
66-
public BufferUntilSubscriber(Subscriber<? super T> actual, CompositeSubscription cs) {
71+
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual, CompositeSubscription cs) {
6772
super(cs);
6873
this.actual = actual;
74+
this.capacity = capacity;
6975
}
7076

7177
/**
@@ -96,26 +102,30 @@ public void enterPassthroughMode() {
96102
throw new NullPointerException();
97103
}
98104
}
99-
/* Test artificial back-pressure.
100-
try {
101-
TimeUnit.SECONDS.sleep(2);
102-
} catch (Throwable t) {
103-
104-
}
105-
*/
106105
passthroughMode = true;
106+
gate.notifyAll();
107107
}
108108
}
109109
}
110110
}
111-
112111
@Override
113112
public void onNext(T t) {
114113
if (!passthroughMode) {
115114
synchronized (gate) {
116115
if (!passthroughMode) {
117-
queue.offer(t != null ? t : NULL_SENTINEL);
118-
return;
116+
if (capacity < 0 || queue.size() < capacity) {
117+
queue.offer(t != null ? t : NULL_SENTINEL);
118+
return;
119+
}
120+
try {
121+
while (!passthroughMode) {
122+
gate.wait();
123+
}
124+
} catch (InterruptedException ex) {
125+
Thread.currentThread().interrupt();
126+
actual.onError(ex);
127+
return;
128+
}
119129
}
120130
}
121131
}
@@ -127,8 +137,19 @@ public void onError(Throwable e) {
127137
if (!passthroughMode) {
128138
synchronized (gate) {
129139
if (!passthroughMode) {
130-
queue.offer(new ErrorSentinel(e));
131-
return;
140+
if (capacity < 0 || queue.size() < capacity) {
141+
queue.offer(new ErrorSentinel(e));
142+
return;
143+
}
144+
try {
145+
while (!passthroughMode) {
146+
gate.wait();
147+
}
148+
} catch (InterruptedException ex) {
149+
Thread.currentThread().interrupt();
150+
actual.onError(ex);
151+
return;
152+
}
132153
}
133154
}
134155
}
@@ -140,11 +161,23 @@ public void onCompleted() {
140161
if (!passthroughMode) {
141162
synchronized (gate) {
142163
if (!passthroughMode) {
143-
queue.offer(COMPLETE_SENTINEL);
144-
return;
164+
if (capacity < 0 || queue.size() < capacity) {
165+
queue.offer(COMPLETE_SENTINEL);
166+
return;
167+
}
168+
try {
169+
while (!passthroughMode) {
170+
gate.wait();
171+
}
172+
} catch (InterruptedException ex) {
173+
Thread.currentThread().interrupt();
174+
actual.onError(ex);
175+
return;
176+
}
145177
}
146178
}
147179
}
148180
actual.onCompleted();
149181
}
182+
150183
}

rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,22 @@ public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
4040
* the actual subscription time should not get lost.
4141
*/
4242
private final boolean dontLoseEvents;
43-
43+
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
44+
private final int bufferSize;
4445
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents) {
46+
this(scheduler, dontLoseEvents, -1);
47+
}
48+
/**
49+
* Construct a SubscribeOn operator.
50+
* @param scheduler the target scheduler
51+
* @param dontLoseEvents indicate that events should be buffered until the actual subscription happens
52+
* @param bufferSize if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
53+
* block the source. -1 indicates an unbounded buffer
54+
*/
55+
public OperatorSubscribeOn(Scheduler scheduler, boolean dontLoseEvents, int bufferSize) {
4556
this.scheduler = scheduler;
4657
this.dontLoseEvents = dontLoseEvents;
58+
this.bufferSize = bufferSize;
4759
}
4860

4961
@Override
@@ -60,20 +72,19 @@ public void onError(Throwable e) {
6072
subscriber.onError(e);
6173
}
6274
boolean checkNeedBuffer(Observable<?> o) {
63-
return (o instanceof GroupedObservable<?, ?>)
75+
return dontLoseEvents || ((o instanceof GroupedObservable<?, ?>)
6476
|| (o instanceof PublishSubject<?>)
6577
// || (o instanceof BehaviorSubject<?, ?>)
66-
;
78+
);
6779
}
6880
@Override
6981
public void onNext(final Observable<T> o) {
70-
if (dontLoseEvents || checkNeedBuffer(o)) {
82+
if (checkNeedBuffer(o)) {
7183
final CompositeSubscription cs = new CompositeSubscription();
7284
subscriber.add(cs);
73-
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(subscriber, new CompositeSubscription());
85+
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber, new CompositeSubscription());
7486
o.subscribe(bus);
7587
scheduler.schedule(new Action1<Inner>() {
76-
7788
@Override
7889
public void call(final Inner inner) {
7990
cs.add(Subscriptions.create(new Action0() {
@@ -89,7 +100,6 @@ public void call(final Inner inner) {
89100
}));
90101
bus.enterPassthroughMode();
91102
}
92-
93103
});
94104
return;
95105
}

rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java

Lines changed: 58 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
import static org.junit.Assert.*;
2020

2121
import java.util.Arrays;
22+
import java.util.Collections;
23+
import java.util.List;
2224
import java.util.concurrent.CountDownLatch;
2325
import java.util.concurrent.TimeUnit;
2426
import java.util.concurrent.atomic.AtomicReference;
27+
import org.junit.Ignore;
2528

2629
import org.junit.Test;
2730

@@ -39,6 +42,7 @@
3942
import rx.subscriptions.CompositeSubscription;
4043
import rx.subscriptions.MultipleAssignmentSubscription;
4144
import rx.subscriptions.Subscriptions;
45+
import rx.util.Timestamped;
4246
import rx.util.functions.Action0;
4347
import rx.util.functions.Action1;
4448
import rx.util.functions.Func1;
@@ -153,7 +157,7 @@ public void call(
153157
assertEquals(1, observer.getOnCompletedEvents().size());
154158
}
155159

156-
static class SlowScheduler extends Scheduler {
160+
public static class SlowScheduler extends Scheduler {
157161
final Scheduler actual;
158162
final long delay;
159163
final TimeUnit unit;
@@ -168,35 +172,14 @@ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) {
168172

169173
@Override
170174
public Subscription schedule(final Action1<Scheduler.Inner> action) {
171-
final CompositeSubscription cs = new CompositeSubscription();
172-
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
173-
cs.add(mas);
174-
mas.set(actual.schedule(new Action1<Inner>() {
175-
176-
@Override
177-
public void call(Inner t1) {
178-
// cs.delete(mas);
179-
cs.add(actual.schedule(action, delay, unit));
180-
}
181-
182-
}));
183-
return cs;
175+
return actual.schedule(action, delay, unit);
184176
}
185177

186178
@Override
187179
public Subscription schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit delayUnit) {
188-
final CompositeSubscription cs = new CompositeSubscription();
189-
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
190-
cs.add(mas);
191-
mas.set(actual.schedule(new Action1<Inner>() {
192-
@Override
193-
public void call(Inner t1) {
194-
// cs.delete(mas);
195-
long nanos = unit.toNanos(delay) + delayUnit.toNanos(delayTime);
196-
cs.add(actual.schedule(action, nanos, TimeUnit.NANOSECONDS));
197-
}
198-
}));
199-
return cs;
180+
TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit;
181+
long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit);
182+
return actual.schedule(action, t, common);
200183
}
201184
}
202185

@@ -338,4 +321,53 @@ public void call(String s) {
338321
System.out.println("Results: " + results);
339322
assertEquals(6, results.size());
340323
}
324+
void testBoundedBufferingWithSize(int size) throws Exception {
325+
Observable<Long> timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS);
326+
327+
final List<Long> deltas = Collections.synchronizedList(new ArrayList<Long>());
328+
329+
Subscription s = timer.timestamp().subscribeOn(
330+
new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size).map(new Func1<Timestamped<Long>, Long>() {
331+
@Override
332+
public Long call(Timestamped<Long> t1) {
333+
long v = System.currentTimeMillis() - t1.getTimestampMillis();
334+
return v;
335+
}
336+
}).doOnNext(new Action1<Long>() {
337+
@Override
338+
public void call(Long t1) {
339+
deltas.add(t1);
340+
}
341+
}).subscribe();
342+
343+
Thread.sleep(2050);
344+
345+
s.unsubscribe();
346+
347+
if (deltas.size() < size + 1) {
348+
fail("To few items in deltas: " + deltas);
349+
}
350+
for (int i = 0; i < size + 1; i++) {
351+
if (deltas.get(i) < 500) {
352+
fail(i + "th item arrived too early: " + deltas);
353+
}
354+
}
355+
for (int i = size + 1; i < deltas.size(); i++) {
356+
if (deltas.get(i) >= 500) {
357+
fail(i + "th item arrived too late: " + deltas);
358+
}
359+
}
360+
}
361+
@Test(timeout = 5000)
362+
public void testBoundedBufferingOfZero() throws Exception {
363+
testBoundedBufferingWithSize(0);
364+
}
365+
@Test(timeout = 5000)
366+
public void testBoundedBufferingOfOne() throws Exception {
367+
testBoundedBufferingWithSize(1);
368+
}
369+
@Test(timeout = 5000)
370+
public void testBoundedBufferingOfTwo() throws Exception {
371+
testBoundedBufferingWithSize(2);
372+
}
341373
}

0 commit comments

Comments
 (0)