Skip to content

Commit 7741c59

Browse files
authored
3.x: Add onBackpressureReduce operator (#7124)
* onBackpressureReduce operator added * onBackpressureReduce operator added * onBackpressureReduce operator added * 3.x: onBackpressureReduce operator added * 3.x: onBackpressureReduce operator added * 3.x: onBackpressureReduce operator added * 3.x: onBackpressureReduce operator added * 3.x: onBackpressureReduce operator added * 3.x: onBackpressureReduce operator * 3.x: onBackpressureReduce operator * 3.x: onBackpressureReduce operator * 3.x: onBackpressureReduce operator
1 parent c573219 commit 7741c59

File tree

6 files changed

+513
-131
lines changed

6 files changed

+513
-131
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12676,6 +12676,40 @@ public final Flowable<T> onBackpressureLatest() {
1267612676
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this));
1267712677
}
1267812678

12679+
/**
12680+
* Reduces a sequence of two not emitted values via a function into a single value if the downstream is not ready to receive
12681+
* new items (indicated by a lack of {@link Subscription#request(long)} calls from it) and emits this latest
12682+
* item when the downstream becomes ready.
12683+
* <p>
12684+
* <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.onBackpressureReduce.png" alt="">
12685+
* <p>
12686+
* Note that if the current {@code Flowable} does support backpressure, this operator ignores that capability
12687+
* and doesn't propagate any backpressure requests from downstream.
12688+
* <p>
12689+
* Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn,
12690+
* requesting more than 1 from downstream doesn't guarantee a continuous delivery of {@code onNext} events.
12691+
* <dl>
12692+
* <dt><b>Backpressure:</b></dt>
12693+
* <dd>The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
12694+
* manner (i.e., not applying backpressure to it).</dd>
12695+
* <dt><b>Scheduler:</b></dt>
12696+
* <dd>{@code onBackpressureReduce} does not operate by default on a particular {@link Scheduler}.</dd>
12697+
* </dl>
12698+
* @param reducer the bi-function to call when there is more than one non-emitted value to downstream,
12699+
* the first argument of the bi-function is previous item and the second one is currently emitting from upstream
12700+
* @return the new {@code Flowable} instance
12701+
* @throws NullPointerException if {@code reducer} is {@code null}
12702+
* @since 3.0.9 - experimental
12703+
*/
12704+
@Experimental
12705+
@CheckReturnValue
12706+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
12707+
@SchedulerSupport(SchedulerSupport.NONE)
12708+
@NonNull
12709+
public final Flowable<T> onBackpressureReduce(@NonNull BiFunction<T, T, T> reducer) {
12710+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureReduce<>(this, reducer));
12711+
}
12712+
1267912713
/**
1268012714
* Returns a {@code Flowable} instance that if the current {@code Flowable} emits an error, it will emit an {@code onComplete}
1268112715
* and swallow the throwable.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.flowable;
15+
16+
import io.reactivex.rxjava3.core.FlowableSubscriber;
17+
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
18+
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
19+
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.Subscriber;
21+
import org.reactivestreams.Subscription;
22+
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
27+
/**
28+
* Abstract base class for operators that throttle excessive updates from upstream in case if
29+
* downstream {@link Subscriber} is not ready to receive updates
30+
*
31+
* @param <T> the upstream and downstream value type
32+
*/
33+
abstract class AbstractBackpressureThrottlingSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
34+
35+
final Subscriber<? super T> downstream;
36+
37+
Subscription upstream;
38+
39+
volatile boolean done;
40+
Throwable error;
41+
42+
volatile boolean cancelled;
43+
44+
final AtomicLong requested = new AtomicLong();
45+
46+
final AtomicReference<T> current = new AtomicReference<>();
47+
48+
AbstractBackpressureThrottlingSubscriber(Subscriber<? super T> downstream) {
49+
this.downstream = downstream;
50+
}
51+
52+
@Override
53+
public void onSubscribe(Subscription s) {
54+
if (SubscriptionHelper.validate(this.upstream, s)) {
55+
this.upstream = s;
56+
downstream.onSubscribe(this);
57+
s.request(Long.MAX_VALUE);
58+
}
59+
}
60+
61+
@Override
62+
abstract public void onNext(T t);
63+
64+
@Override
65+
public void onError(Throwable t) {
66+
error = t;
67+
done = true;
68+
drain();
69+
}
70+
71+
@Override
72+
public void onComplete() {
73+
done = true;
74+
drain();
75+
}
76+
77+
@Override
78+
public void request(long n) {
79+
if (SubscriptionHelper.validate(n)) {
80+
BackpressureHelper.add(requested, n);
81+
drain();
82+
}
83+
}
84+
85+
@Override
86+
public void cancel() {
87+
if (!cancelled) {
88+
cancelled = true;
89+
upstream.cancel();
90+
91+
if (getAndIncrement() == 0) {
92+
current.lazySet(null);
93+
}
94+
}
95+
}
96+
97+
void drain() {
98+
if (getAndIncrement() != 0) {
99+
return;
100+
}
101+
final Subscriber<? super T> a = downstream;
102+
int missed = 1;
103+
final AtomicLong r = requested;
104+
final AtomicReference<T> q = current;
105+
106+
for (;;) {
107+
long e = 0L;
108+
109+
while (e != r.get()) {
110+
boolean d = done;
111+
T v = q.getAndSet(null);
112+
boolean empty = v == null;
113+
114+
if (checkTerminated(d, empty, a, q)) {
115+
return;
116+
}
117+
118+
if (empty) {
119+
break;
120+
}
121+
122+
a.onNext(v);
123+
124+
e++;
125+
}
126+
127+
if (e == r.get() && checkTerminated(done, q.get() == null, a, q)) {
128+
return;
129+
}
130+
131+
if (e != 0L) {
132+
BackpressureHelper.produced(r, e);
133+
}
134+
135+
missed = addAndGet(-missed);
136+
if (missed == 0) {
137+
break;
138+
}
139+
}
140+
}
141+
142+
boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a, AtomicReference<T> q) {
143+
if (cancelled) {
144+
q.lazySet(null);
145+
return true;
146+
}
147+
148+
if (d) {
149+
Throwable e = error;
150+
if (e != null) {
151+
q.lazySet(null);
152+
a.onError(e);
153+
return true;
154+
} else
155+
if (empty) {
156+
a.onComplete();
157+
return true;
158+
}
159+
}
160+
161+
return false;
162+
}
163+
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureLatest.java

Lines changed: 4 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,8 @@
1313

1414
package io.reactivex.rxjava3.internal.operators.flowable;
1515

16-
import java.util.concurrent.atomic.*;
17-
18-
import org.reactivestreams.*;
19-
20-
import io.reactivex.rxjava3.core.*;
21-
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
22-
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
16+
import io.reactivex.rxjava3.core.Flowable;
17+
import org.reactivestreams.Subscriber;
2318

2419
public final class FlowableOnBackpressureLatest<T> extends AbstractFlowableWithUpstream<T, T> {
2520

@@ -32,140 +27,18 @@ protected void subscribeActual(Subscriber<? super T> s) {
3227
source.subscribe(new BackpressureLatestSubscriber<>(s));
3328
}
3429

35-
static final class BackpressureLatestSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
30+
static final class BackpressureLatestSubscriber<T> extends AbstractBackpressureThrottlingSubscriber<T> {
3631

3732
private static final long serialVersionUID = 163080509307634843L;
3833

39-
final Subscriber<? super T> downstream;
40-
41-
Subscription upstream;
42-
43-
volatile boolean done;
44-
Throwable error;
45-
46-
volatile boolean cancelled;
47-
48-
final AtomicLong requested = new AtomicLong();
49-
50-
final AtomicReference<T> current = new AtomicReference<>();
51-
5234
BackpressureLatestSubscriber(Subscriber<? super T> downstream) {
53-
this.downstream = downstream;
54-
}
55-
56-
@Override
57-
public void onSubscribe(Subscription s) {
58-
if (SubscriptionHelper.validate(this.upstream, s)) {
59-
this.upstream = s;
60-
downstream.onSubscribe(this);
61-
s.request(Long.MAX_VALUE);
62-
}
35+
super(downstream);
6336
}
6437

6538
@Override
6639
public void onNext(T t) {
6740
current.lazySet(t);
6841
drain();
6942
}
70-
71-
@Override
72-
public void onError(Throwable t) {
73-
error = t;
74-
done = true;
75-
drain();
76-
}
77-
78-
@Override
79-
public void onComplete() {
80-
done = true;
81-
drain();
82-
}
83-
84-
@Override
85-
public void request(long n) {
86-
if (SubscriptionHelper.validate(n)) {
87-
BackpressureHelper.add(requested, n);
88-
drain();
89-
}
90-
}
91-
92-
@Override
93-
public void cancel() {
94-
if (!cancelled) {
95-
cancelled = true;
96-
upstream.cancel();
97-
98-
if (getAndIncrement() == 0) {
99-
current.lazySet(null);
100-
}
101-
}
102-
}
103-
104-
void drain() {
105-
if (getAndIncrement() != 0) {
106-
return;
107-
}
108-
final Subscriber<? super T> a = downstream;
109-
int missed = 1;
110-
final AtomicLong r = requested;
111-
final AtomicReference<T> q = current;
112-
113-
for (;;) {
114-
long e = 0L;
115-
116-
while (e != r.get()) {
117-
boolean d = done;
118-
T v = q.getAndSet(null);
119-
boolean empty = v == null;
120-
121-
if (checkTerminated(d, empty, a, q)) {
122-
return;
123-
}
124-
125-
if (empty) {
126-
break;
127-
}
128-
129-
a.onNext(v);
130-
131-
e++;
132-
}
133-
134-
if (e == r.get() && checkTerminated(done, q.get() == null, a, q)) {
135-
return;
136-
}
137-
138-
if (e != 0L) {
139-
BackpressureHelper.produced(r, e);
140-
}
141-
142-
missed = addAndGet(-missed);
143-
if (missed == 0) {
144-
break;
145-
}
146-
}
147-
}
148-
149-
boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a, AtomicReference<T> q) {
150-
if (cancelled) {
151-
q.lazySet(null);
152-
return true;
153-
}
154-
155-
if (d) {
156-
Throwable e = error;
157-
if (e != null) {
158-
q.lazySet(null);
159-
a.onError(e);
160-
return true;
161-
} else
162-
if (empty) {
163-
a.onComplete();
164-
return true;
165-
}
166-
}
167-
168-
return false;
169-
}
17043
}
17144
}

0 commit comments

Comments
 (0)