Skip to content

Commit 919d310

Browse files
authored
3.x: Add onDropped callback to onBackpressureBuffer (#7567)
* 3.x: Add onDropped callback to onBackpressureBuffer Related #7458 * MBE is expected * Annotate with experimental
1 parent 067f3cb commit 919d310

File tree

6 files changed

+293
-26
lines changed

6 files changed

+293
-26
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12546,7 +12546,7 @@ public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError)
1254612546
@NonNull
1254712547
public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded) {
1254812548
ObjectHelper.verifyPositive(capacity, "capacity");
12549-
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, Functions.EMPTY_ACTION));
12549+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, Functions.EMPTY_ACTION, Functions.emptyConsumer()));
1255012550
}
1255112551

1255212552
/**
@@ -12577,6 +12577,7 @@ public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError,
1257712577
* @throws NullPointerException if {@code onOverflow} is {@code null}
1257812578
* @throws IllegalArgumentException if {@code capacity} is non-positive
1257912579
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
12580+
* @see #onBackpressureBuffer(int, boolean, boolean, Action, Consumer)
1258012581
* @since 1.1.0
1258112582
*/
1258212583
@CheckReturnValue
@@ -12587,7 +12588,51 @@ public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError,
1258712588
@NonNull Action onOverflow) {
1258812589
Objects.requireNonNull(onOverflow, "onOverflow is null");
1258912590
ObjectHelper.verifyPositive(capacity, "capacity");
12590-
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow));
12591+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow, Functions.emptyConsumer()));
12592+
}
12593+
12594+
/**
12595+
* Buffers an optionally unlimited number of items from the current {@code Flowable} and allows it to emit as fast it can while allowing the
12596+
* downstream to consume the items at its own place.
12597+
* If {@code unbounded} is {@code true}, the resulting {@code Flowable} will signal a
12598+
* {@link MissingBackpressureException} via {@code onError} as soon as the buffer's capacity is exceeded, dropping all undelivered
12599+
* items, canceling the flow and calling the {@code onOverflow} action.
12600+
* <p>
12601+
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.v3.png" alt="">
12602+
* <dl>
12603+
* <dt><b>Backpressure:</b></dt>
12604+
* <dd>The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
12605+
* manner (i.e., not applying backpressure to it).</dd>
12606+
* <dt><b>Scheduler:</b></dt>
12607+
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
12608+
* </dl>
12609+
*
12610+
* @param capacity number of slots available in the buffer.
12611+
* @param delayError
12612+
* if {@code true}, an exception from the current {@code Flowable} is delayed until all buffered elements have been
12613+
* consumed by the downstream; if {@code false}, an exception is immediately signaled to the downstream, skipping
12614+
* any buffered element
12615+
* @param unbounded
12616+
* if {@code true}, the capacity value is interpreted as the internal "island" size of the unbounded buffer
12617+
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots.
12618+
* @param onDropped the {@link Consumer} to be called with the item that could not be buffered due to capacity constraints.
12619+
* @return the new {@code Flowable} instance
12620+
* @throws NullPointerException if {@code onOverflow} or {@code onDropped} is {@code null}
12621+
* @throws IllegalArgumentException if {@code capacity} is non-positive
12622+
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
12623+
* @since 3.1.7
12624+
*/
12625+
@CheckReturnValue
12626+
@NonNull
12627+
@BackpressureSupport(BackpressureKind.SPECIAL)
12628+
@SchedulerSupport(SchedulerSupport.NONE)
12629+
@Experimental
12630+
public final Flowable<T> onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded,
12631+
@NonNull Action onOverflow, @NonNull Consumer<? super T> onDropped) {
12632+
Objects.requireNonNull(onOverflow, "onOverflow is null");
12633+
Objects.requireNonNull(onDropped, "onDropped is null");
12634+
ObjectHelper.verifyPositive(capacity, "capacity");
12635+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow, onDropped));
1259112636
}
1259212637

1259312638
/**
@@ -12653,6 +12698,7 @@ public final Flowable<T> onBackpressureBuffer(int capacity, @NonNull Action onOv
1265312698
* @throws NullPointerException if {@code onOverflow} or {@code overflowStrategy} is {@code null}
1265412699
* @throws IllegalArgumentException if {@code capacity} is non-positive
1265512700
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
12701+
* @see #onBackpressureBuffer(long, Action, BackpressureOverflowStrategy)
1265612702
* @since 2.0
1265712703
*/
1265812704
@CheckReturnValue
@@ -12662,9 +12708,55 @@ public final Flowable<T> onBackpressureBuffer(int capacity, @NonNull Action onOv
1266212708
public final Flowable<T> onBackpressureBuffer(long capacity, @Nullable Action onOverflow, @NonNull BackpressureOverflowStrategy overflowStrategy) {
1266312709
Objects.requireNonNull(overflowStrategy, "overflowStrategy is null");
1266412710
ObjectHelper.verifyPositive(capacity, "capacity");
12665-
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy));
12711+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy, null));
1266612712
}
1266712713

12714+
/**
12715+
* Buffers an optionally unlimited number of items from the current {@code Flowable} and allows it to emit as fast it can while allowing the
12716+
* downstream to consume the items at its own place.
12717+
* The resulting {@code Flowable} will behave as determined by {@code overflowStrategy} if the buffer capacity is exceeded:
12718+
* <ul>
12719+
* <li>{@link BackpressureOverflowStrategy#ERROR} (default) will call {@code onError} dropping all undelivered items,
12720+
* canceling the source, and notifying the producer with {@code onOverflow}. </li>
12721+
* <li>{@link BackpressureOverflowStrategy#DROP_LATEST} will drop any new items emitted by the producer while
12722+
* the buffer is full, without generating any {@code onError}. Each drop will, however, invoke {@code onOverflow}
12723+
* to signal the overflow to the producer.</li>
12724+
* <li>{@link BackpressureOverflowStrategy#DROP_OLDEST} will drop the oldest items in the buffer in order to make
12725+
* room for newly emitted ones. Overflow will not generate an {@code onError}, but each drop will invoke
12726+
* {@code onOverflow} to signal the overflow to the producer.</li>
12727+
* </ul>
12728+
*
12729+
* <p>
12730+
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.v3.png" alt="">
12731+
* <dl>
12732+
* <dt><b>Backpressure:</b></dt>
12733+
* <dd>The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
12734+
* manner (i.e., not applying backpressure to it).</dd>
12735+
* <dt><b>Scheduler:</b></dt>
12736+
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
12737+
* </dl>
12738+
*
12739+
* @param capacity number of slots available in the buffer.
12740+
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots, {@code null} is allowed.
12741+
* @param overflowStrategy how should the resulting {@code Flowable} react to buffer overflows, {@code null} is not allowed.
12742+
* @param onDropped the {@link Consumer} to be called with the item that could not be buffered due to capacity constraints.
12743+
* @return the new {@code Flowable} instance
12744+
* @throws NullPointerException if {@code onOverflow}, {@code overflowStrategy} or {@code onDropped} is {@code null}
12745+
* @throws IllegalArgumentException if {@code capacity} is non-positive
12746+
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
12747+
* @since 3.1.7
12748+
*/
12749+
@CheckReturnValue
12750+
@NonNull
12751+
@BackpressureSupport(BackpressureKind.SPECIAL)
12752+
@SchedulerSupport(SchedulerSupport.NONE)
12753+
@Experimental
12754+
public final Flowable<T> onBackpressureBuffer(long capacity, @Nullable Action onOverflow, @NonNull BackpressureOverflowStrategy overflowStrategy, @NonNull Consumer<? super T> onDropped) {
12755+
Objects.requireNonNull(overflowStrategy, "overflowStrategy is null");
12756+
Objects.requireNonNull(onDropped, "onDropped is null");
12757+
ObjectHelper.verifyPositive(capacity, "capacity");
12758+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy, onDropped));
12759+
}
1266812760
/**
1266912761
* Drops items from the current {@code Flowable} if the downstream is not ready to receive new items (indicated
1267012762
* by a lack of {@link Subscription#request(long)} calls from it).

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBuffer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.rxjava3.annotations.Nullable;
2121
import io.reactivex.rxjava3.core.*;
2222
import io.reactivex.rxjava3.exceptions.*;
23-
import io.reactivex.rxjava3.functions.Action;
23+
import io.reactivex.rxjava3.functions.*;
2424
import io.reactivex.rxjava3.internal.subscriptions.*;
2525
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
2626
import io.reactivex.rxjava3.operators.*;
@@ -30,19 +30,21 @@ public final class FlowableOnBackpressureBuffer<T> extends AbstractFlowableWithU
3030
final boolean unbounded;
3131
final boolean delayError;
3232
final Action onOverflow;
33+
final Consumer<? super T> onDropped;
3334

3435
public FlowableOnBackpressureBuffer(Flowable<T> source, int bufferSize, boolean unbounded,
35-
boolean delayError, Action onOverflow) {
36+
boolean delayError, Action onOverflow, Consumer<? super T> onDropped) {
3637
super(source);
3738
this.bufferSize = bufferSize;
3839
this.unbounded = unbounded;
3940
this.delayError = delayError;
4041
this.onOverflow = onOverflow;
42+
this.onDropped = onDropped;
4143
}
4244

4345
@Override
4446
protected void subscribeActual(Subscriber<? super T> s) {
45-
source.subscribe(new BackpressureBufferSubscriber<>(s, bufferSize, unbounded, delayError, onOverflow));
47+
source.subscribe(new BackpressureBufferSubscriber<>(s, bufferSize, unbounded, delayError, onOverflow, onDropped));
4648
}
4749

4850
static final class BackpressureBufferSubscriber<T> extends BasicIntQueueSubscription<T> implements FlowableSubscriber<T> {
@@ -53,6 +55,7 @@ static final class BackpressureBufferSubscriber<T> extends BasicIntQueueSubscrip
5355
final SimplePlainQueue<T> queue;
5456
final boolean delayError;
5557
final Action onOverflow;
58+
final Consumer<? super T> onDropped;
5659

5760
Subscription upstream;
5861

@@ -66,10 +69,11 @@ static final class BackpressureBufferSubscriber<T> extends BasicIntQueueSubscrip
6669
boolean outputFused;
6770

6871
BackpressureBufferSubscriber(Subscriber<? super T> actual, int bufferSize,
69-
boolean unbounded, boolean delayError, Action onOverflow) {
72+
boolean unbounded, boolean delayError, Action onOverflow, Consumer<? super T> onDropped) {
7073
this.downstream = actual;
7174
this.onOverflow = onOverflow;
7275
this.delayError = delayError;
76+
this.onDropped = onDropped;
7377

7478
SimplePlainQueue<T> q;
7579

@@ -98,6 +102,7 @@ public void onNext(T t) {
98102
MissingBackpressureException ex = new MissingBackpressureException("Buffer is full");
99103
try {
100104
onOverflow.run();
105+
onDropped.accept(t);
101106
} catch (Throwable e) {
102107
Exceptions.throwIfFatal(e);
103108
ex.initCause(e);

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import io.reactivex.rxjava3.core.*;
2222
import io.reactivex.rxjava3.exceptions.*;
23-
import io.reactivex.rxjava3.functions.Action;
23+
import io.reactivex.rxjava3.functions.*;
2424
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
2525
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
2626
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
@@ -38,17 +38,21 @@ public final class FlowableOnBackpressureBufferStrategy<T> extends AbstractFlowa
3838

3939
final BackpressureOverflowStrategy strategy;
4040

41+
final Consumer<? super T> onDropped;
42+
4143
public FlowableOnBackpressureBufferStrategy(Flowable<T> source,
42-
long bufferSize, Action onOverflow, BackpressureOverflowStrategy strategy) {
44+
long bufferSize, Action onOverflow, BackpressureOverflowStrategy strategy,
45+
Consumer<? super T> onDropped) {
4346
super(source);
4447
this.bufferSize = bufferSize;
4548
this.onOverflow = onOverflow;
4649
this.strategy = strategy;
50+
this.onDropped = onDropped;
4751
}
4852

4953
@Override
5054
protected void subscribeActual(Subscriber<? super T> s) {
51-
source.subscribe(new OnBackpressureBufferStrategySubscriber<>(s, onOverflow, strategy, bufferSize));
55+
source.subscribe(new OnBackpressureBufferStrategySubscriber<>(s, onOverflow, strategy, bufferSize, onDropped));
5256
}
5357

5458
static final class OnBackpressureBufferStrategySubscriber<T>
@@ -61,6 +65,8 @@ static final class OnBackpressureBufferStrategySubscriber<T>
6165

6266
final Action onOverflow;
6367

68+
final Consumer<? super T> onDropped;
69+
6470
final BackpressureOverflowStrategy strategy;
6571

6672
final long bufferSize;
@@ -77,13 +83,15 @@ static final class OnBackpressureBufferStrategySubscriber<T>
7783
Throwable error;
7884

7985
OnBackpressureBufferStrategySubscriber(Subscriber<? super T> actual, Action onOverflow,
80-
BackpressureOverflowStrategy strategy, long bufferSize) {
86+
BackpressureOverflowStrategy strategy, long bufferSize,
87+
Consumer<? super T> onDropped) {
8188
this.downstream = actual;
8289
this.onOverflow = onOverflow;
8390
this.strategy = strategy;
8491
this.bufferSize = bufferSize;
8592
this.requested = new AtomicLong();
8693
this.deque = new ArrayDeque<>();
94+
this.onDropped = onDropped;
8795
}
8896

8997
@Override
@@ -104,44 +112,60 @@ public void onNext(T t) {
104112
}
105113
boolean callOnOverflow = false;
106114
boolean callError = false;
115+
boolean callDrain = false;
107116
Deque<T> dq = deque;
117+
T toDrop = null;
108118
synchronized (dq) {
109119
if (dq.size() == bufferSize) {
110120
switch (strategy) {
111121
case DROP_LATEST:
112-
dq.pollLast();
122+
toDrop = dq.pollLast();
113123
dq.offer(t);
114124
callOnOverflow = true;
115125
break;
116126
case DROP_OLDEST:
117-
dq.poll();
127+
toDrop = dq.poll();
118128
dq.offer(t);
119129
callOnOverflow = true;
120130
break;
121131
default:
122132
// signal error
133+
toDrop = t;
123134
callError = true;
124135
break;
125136
}
126137
} else {
127138
dq.offer(t);
139+
callDrain = true;
128140
}
129141
}
130142

131-
if (callOnOverflow) {
132-
if (onOverflow != null) {
133-
try {
134-
onOverflow.run();
135-
} catch (Throwable ex) {
136-
Exceptions.throwIfFatal(ex);
137-
upstream.cancel();
138-
onError(ex);
139-
}
143+
if (callOnOverflow && onOverflow != null) {
144+
try {
145+
onOverflow.run();
146+
} catch (Throwable ex) {
147+
Exceptions.throwIfFatal(ex);
148+
upstream.cancel();
149+
onError(ex);
150+
}
151+
}
152+
153+
if (onDropped != null && toDrop != null) {
154+
try {
155+
onDropped.accept(toDrop);
156+
} catch (Throwable ex) {
157+
Exceptions.throwIfFatal(ex);
158+
upstream.cancel();
159+
onError(ex);
140160
}
141-
} else if (callError) {
161+
}
162+
163+
if (callError) {
142164
upstream.cancel();
143165
onError(MissingBackpressureException.createDefault());
144-
} else {
166+
}
167+
168+
if (callDrain) {
145169
drain();
146170
}
147171
}

0 commit comments

Comments
 (0)