Skip to content

Commit e670021

Browse files
committed
Operator OnBackpressureBlock
1 parent 053e506 commit e670021

File tree

3 files changed

+584
-11
lines changed

3 files changed

+584
-11
lines changed

src/main/java/rx/Observable.java

Lines changed: 171 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,142 @@
1212
*/
1313
package rx;
1414

15-
import java.util.*;
16-
import java.util.concurrent.*;
17-
18-
import rx.exceptions.*;
19-
import rx.functions.*;
20-
import rx.internal.operators.*;
15+
import java.util.ArrayList;
16+
import java.util.Arrays;
17+
import java.util.Collection;
18+
import java.util.Collections;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.NoSuchElementException;
22+
import java.util.concurrent.Future;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import rx.exceptions.Exceptions;
26+
import rx.exceptions.OnErrorNotImplementedException;
27+
import rx.functions.Action0;
28+
import rx.functions.Action1;
29+
import rx.functions.Action2;
30+
import rx.functions.Func0;
31+
import rx.functions.Func1;
32+
import rx.functions.Func2;
33+
import rx.functions.Func3;
34+
import rx.functions.Func4;
35+
import rx.functions.Func5;
36+
import rx.functions.Func6;
37+
import rx.functions.Func7;
38+
import rx.functions.Func8;
39+
import rx.functions.Func9;
40+
import rx.functions.FuncN;
41+
import rx.functions.Functions;
42+
import rx.internal.operators.OnSubscribeAmb;
43+
import rx.internal.operators.OnSubscribeCache;
44+
import rx.internal.operators.OnSubscribeCombineLatest;
45+
import rx.internal.operators.OnSubscribeDefer;
46+
import rx.internal.operators.OnSubscribeDelaySubscription;
47+
import rx.internal.operators.OnSubscribeDelaySubscriptionWithSelector;
48+
import rx.internal.operators.OnSubscribeFromIterable;
49+
import rx.internal.operators.OnSubscribeGroupJoin;
50+
import rx.internal.operators.OnSubscribeJoin;
51+
import rx.internal.operators.OnSubscribeMulticastSelector;
52+
import rx.internal.operators.OnSubscribeRange;
53+
import rx.internal.operators.OnSubscribeRedo;
54+
import rx.internal.operators.OnSubscribeTimerOnce;
55+
import rx.internal.operators.OnSubscribeTimerPeriodically;
56+
import rx.internal.operators.OnSubscribeToObservableFuture;
57+
import rx.internal.operators.OnSubscribeUsing;
58+
import rx.internal.operators.OperatorAll;
59+
import rx.internal.operators.OperatorAny;
60+
import rx.internal.operators.OperatorAsObservable;
61+
import rx.internal.operators.OperatorBufferWithSingleObservable;
62+
import rx.internal.operators.OperatorBufferWithSize;
63+
import rx.internal.operators.OperatorBufferWithStartEndObservable;
64+
import rx.internal.operators.OperatorBufferWithTime;
65+
import rx.internal.operators.OperatorCast;
66+
import rx.internal.operators.OperatorConcat;
67+
import rx.internal.operators.OperatorDebounceWithSelector;
68+
import rx.internal.operators.OperatorDebounceWithTime;
69+
import rx.internal.operators.OperatorDefaultIfEmpty;
70+
import rx.internal.operators.OperatorDelay;
71+
import rx.internal.operators.OperatorDelayWithSelector;
72+
import rx.internal.operators.OperatorDematerialize;
73+
import rx.internal.operators.OperatorDistinct;
74+
import rx.internal.operators.OperatorDistinctUntilChanged;
75+
import rx.internal.operators.OperatorDoOnEach;
76+
import rx.internal.operators.OperatorDoOnSubscribe;
77+
import rx.internal.operators.OperatorDoOnUnsubscribe;
78+
import rx.internal.operators.OperatorElementAt;
79+
import rx.internal.operators.OperatorFilter;
80+
import rx.internal.operators.OperatorFinally;
81+
import rx.internal.operators.OperatorGroupBy;
82+
import rx.internal.operators.OperatorMap;
83+
import rx.internal.operators.OperatorMapNotification;
84+
import rx.internal.operators.OperatorMapPair;
85+
import rx.internal.operators.OperatorMaterialize;
86+
import rx.internal.operators.OperatorMerge;
87+
import rx.internal.operators.OperatorMergeDelayError;
88+
import rx.internal.operators.OperatorMergeMaxConcurrent;
89+
import rx.internal.operators.OperatorMulticast;
90+
import rx.internal.operators.OperatorObserveOn;
91+
import rx.internal.operators.OperatorOnBackpressureBlock;
92+
import rx.internal.operators.OperatorOnBackpressureBuffer;
93+
import rx.internal.operators.OperatorOnBackpressureDrop;
94+
import rx.internal.operators.OperatorOnErrorResumeNextViaFunction;
95+
import rx.internal.operators.OperatorOnErrorResumeNextViaObservable;
96+
import rx.internal.operators.OperatorOnErrorReturn;
97+
import rx.internal.operators.OperatorOnExceptionResumeNextViaObservable;
98+
import rx.internal.operators.OperatorPublish;
99+
import rx.internal.operators.OperatorReplay;
100+
import rx.internal.operators.OperatorRetryWithPredicate;
101+
import rx.internal.operators.OperatorSampleWithObservable;
102+
import rx.internal.operators.OperatorSampleWithTime;
103+
import rx.internal.operators.OperatorScan;
104+
import rx.internal.operators.OperatorSequenceEqual;
105+
import rx.internal.operators.OperatorSerialize;
106+
import rx.internal.operators.OperatorSingle;
107+
import rx.internal.operators.OperatorSkip;
108+
import rx.internal.operators.OperatorSkipLast;
109+
import rx.internal.operators.OperatorSkipLastTimed;
110+
import rx.internal.operators.OperatorSkipTimed;
111+
import rx.internal.operators.OperatorSkipUntil;
112+
import rx.internal.operators.OperatorSkipWhile;
113+
import rx.internal.operators.OperatorSubscribeOn;
114+
import rx.internal.operators.OperatorSwitch;
115+
import rx.internal.operators.OperatorTake;
116+
import rx.internal.operators.OperatorTakeLast;
117+
import rx.internal.operators.OperatorTakeLastTimed;
118+
import rx.internal.operators.OperatorTakeTimed;
119+
import rx.internal.operators.OperatorTakeUntil;
120+
import rx.internal.operators.OperatorTakeWhile;
121+
import rx.internal.operators.OperatorThrottleFirst;
122+
import rx.internal.operators.OperatorTimeInterval;
123+
import rx.internal.operators.OperatorTimeout;
124+
import rx.internal.operators.OperatorTimeoutWithSelector;
125+
import rx.internal.operators.OperatorTimestamp;
126+
import rx.internal.operators.OperatorToMap;
127+
import rx.internal.operators.OperatorToMultimap;
128+
import rx.internal.operators.OperatorToObservableList;
129+
import rx.internal.operators.OperatorToObservableSortedList;
130+
import rx.internal.operators.OperatorUnsubscribeOn;
131+
import rx.internal.operators.OperatorWindowWithObservable;
132+
import rx.internal.operators.OperatorWindowWithSize;
133+
import rx.internal.operators.OperatorWindowWithStartEndObservable;
134+
import rx.internal.operators.OperatorWindowWithTime;
135+
import rx.internal.operators.OperatorZip;
136+
import rx.internal.operators.OperatorZipIterable;
137+
import rx.internal.util.RxRingBuffer;
21138
import rx.internal.util.ScalarSynchronousObservable;
22139
import rx.internal.util.UtilityFunctions;
23-
24-
import rx.observables.*;
140+
import rx.observables.BlockingObservable;
141+
import rx.observables.ConnectableObservable;
142+
import rx.observables.GroupedObservable;
25143
import rx.observers.SafeSubscriber;
26-
import rx.plugins.*;
27-
import rx.schedulers.*;
28-
import rx.subjects.*;
144+
import rx.plugins.RxJavaObservableExecutionHook;
145+
import rx.plugins.RxJavaPlugins;
146+
import rx.schedulers.Schedulers;
147+
import rx.schedulers.TimeInterval;
148+
import rx.schedulers.Timestamped;
149+
import rx.subjects.ReplaySubject;
150+
import rx.subjects.Subject;
29151
import rx.subscriptions.Subscriptions;
30152

31153
/**
@@ -182,6 +304,7 @@ public void call(Subscriber<? super R> o) {
182304
* @return the source Observable, transformed by the transformer function
183305
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
184306
*/
307+
@SuppressWarnings("unchecked")
185308
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
186309
return ((Transformer<T, R>) transformer).call(this);
187310
}
@@ -5054,6 +5177,43 @@ public final Observable<T> onBackpressureDrop() {
50545177
return lift(new OperatorOnBackpressureDrop<T>());
50555178
}
50565179

5180+
/**
5181+
* Instructs an Observable that is emitting items faster than its observer can consume them is to
5182+
* block the producer thread.
5183+
* <p>
5184+
* The producer side can emit up to {@code maxQueueLength} onNext elements without blocking, but the
5185+
* consumer side considers the amount its downstream requested through {@code Producer.request(n)}
5186+
* and doesn't emit more than requested even if more is available. For example, using
5187+
* {@code onBackpressureBlock(384).observeOn(Schedulers.io())} will not throw a MissingBackpressureException.
5188+
* <p>
5189+
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
5190+
* and doesn't propagate any backpressure requests from downstream.
5191+
*
5192+
* @param maxQueueLength the maximum number of items the producer can emit without blocking
5193+
* @return the source Observable modified to block {@code onNext} notifications on overflow
5194+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
5195+
*/
5196+
public final Observable<T> onBackpressureBlock(int maxQueueLength) {
5197+
return lift(new OperatorOnBackpressureBlock<T>(maxQueueLength));
5198+
}
5199+
/**
5200+
* Instructs an Observable that is emitting items faster than its observer can consume them is to
5201+
* block the producer thread if the number of undelivered onNext events reaches the system-wide ring buffer size.
5202+
* <p>
5203+
* The producer side can emit up to the system-wide ring buffer size onNext elements without blocking, but the
5204+
* consumer side considers the amount its downstream requested through {@code Producer.request(n)}
5205+
* and doesn't emit more than requested even if available.
5206+
* <p>
5207+
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
5208+
* and doesn't propagate any backpressure requests from downstream.
5209+
*
5210+
* @return the source Observable modified to block {@code onNext} notifications on overflow
5211+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
5212+
*/
5213+
public final Observable<T> onBackpressureBlock() {
5214+
return onBackpressureBlock(RxRingBuffer.SIZE);
5215+
}
5216+
50575217
/**
50585218
* Instructs an Observable to pass control to another Observable rather than invoking
50595219
* {@link Observer#onError onError} if it encounters an error.
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.ArrayBlockingQueue;
20+
import java.util.concurrent.BlockingQueue;
21+
22+
import rx.Observable.Operator;
23+
import rx.Producer;
24+
import rx.Subscriber;
25+
26+
/**
27+
* Operator that uses blocks the producer thread in case a backpressure is needed.
28+
*/
29+
public class OperatorOnBackpressureBlock<T> implements Operator<T, T> {
30+
final int max;
31+
public OperatorOnBackpressureBlock(int max) {
32+
this.max = max;
33+
}
34+
@Override
35+
public Subscriber<? super T> call(Subscriber<? super T> child) {
36+
BlockingSubscriber<T> s = new BlockingSubscriber<T>(max, child);
37+
s.init();
38+
return s;
39+
}
40+
41+
static final class BlockingSubscriber<T> extends Subscriber<T> {
42+
final NotificationLite<T> nl = NotificationLite.instance();
43+
final BlockingQueue<Object> queue;
44+
final Subscriber<? super T> child;
45+
/** Guarded by this. */
46+
long requestedCount;
47+
/** Guarded by this. */
48+
boolean emitting;
49+
volatile boolean terminated;
50+
/** Set before terminated, read after terminated. */
51+
Throwable exception;
52+
public BlockingSubscriber(int max, Subscriber<? super T> child) {
53+
this.queue = new ArrayBlockingQueue<Object>(max);
54+
this.child = child;
55+
}
56+
void init() {
57+
child.add(this);
58+
child.setProducer(new Producer() {
59+
@Override
60+
public void request(long n) {
61+
synchronized (BlockingSubscriber.this) {
62+
if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) {
63+
requestedCount = Long.MAX_VALUE;
64+
} else {
65+
requestedCount += n;
66+
}
67+
}
68+
drain();
69+
}
70+
});
71+
}
72+
@Override
73+
public void onNext(T t) {
74+
try {
75+
queue.put(nl.next(t));
76+
drain();
77+
} catch (InterruptedException ex) {
78+
if (!isUnsubscribed()) {
79+
onError(ex);
80+
}
81+
}
82+
}
83+
@Override
84+
public void onError(Throwable e) {
85+
if (!terminated) {
86+
exception = e;
87+
terminated = true;
88+
drain();
89+
}
90+
}
91+
@Override
92+
public void onCompleted() {
93+
terminated = true;
94+
drain();
95+
}
96+
void drain() {
97+
long n;
98+
synchronized (this) {
99+
if (emitting) {
100+
return;
101+
}
102+
emitting = true;
103+
n = requestedCount;
104+
}
105+
boolean skipFinal = false;
106+
try {
107+
while (true) {
108+
int emitted = 0;
109+
while (n > 0) {
110+
Object o = queue.poll();
111+
if (o == null) {
112+
if (terminated) {
113+
if (exception != null) {
114+
child.onError(exception);
115+
} else {
116+
child.onCompleted();
117+
}
118+
return;
119+
}
120+
if (n == Long.MAX_VALUE) {
121+
return;
122+
} else {
123+
break;
124+
}
125+
} else {
126+
child.onNext(nl.getValue(o));
127+
n--;
128+
emitted++;
129+
}
130+
}
131+
synchronized (this) {
132+
if (requestedCount == Long.MAX_VALUE || emitted == 0) {
133+
skipFinal = true;
134+
emitting = false;
135+
return;
136+
} else {
137+
requestedCount -= emitted;
138+
n += requestedCount;
139+
}
140+
}
141+
}
142+
} finally {
143+
if (!skipFinal) {
144+
synchronized (this) {
145+
emitting = false;
146+
}
147+
}
148+
}
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)