Skip to content

Commit cb427c5

Browse files
authored
2.x: sync javadoc of Flowable (#4342)
1 parent d6eaff3 commit cb427c5

32 files changed

+12084
-553
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivex;
17+
18+
import io.reactivex.exceptions.MissingBackpressureException;
19+
20+
/**
21+
* Generic strategy and default implementations to deal with backpressure buffer overflows.
22+
*/
23+
public final class BackpressureOverflow {
24+
25+
/** Utility class. */
26+
private BackpressureOverflow() {
27+
throw new IllegalStateException("No instances!");
28+
}
29+
30+
/**
31+
* Signal a MissingBackressureException due to lack of requests.
32+
*/
33+
public static final BackpressureOverflow.Strategy ON_OVERFLOW_ERROR = Error.INSTANCE;
34+
35+
/**
36+
* By default, signal a MissingBackressureException due to lack of requests.
37+
*/
38+
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DEFAULT = ON_OVERFLOW_ERROR;
39+
40+
/**
41+
* Drop the oldest value in the buffer.
42+
*/
43+
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_OLDEST = DropOldest.INSTANCE;
44+
45+
/**
46+
* Drop the latest value.
47+
*/
48+
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_LATEST = DropLatest.INSTANCE;
49+
50+
/**
51+
* Represents a callback called when a value is about to be dropped
52+
* due to lack of downstream requests.
53+
*/
54+
public interface Strategy {
55+
56+
/**
57+
* Whether the Backpressure manager should attempt to drop the oldest item, or simply
58+
* drop the item currently causing backpressure.
59+
*
60+
* @return true to request drop of the oldest item, false to drop the newest.
61+
* @throws MissingBackpressureException
62+
*/
63+
boolean mayAttemptDrop() throws MissingBackpressureException;
64+
}
65+
66+
/**
67+
* Drop oldest items from the buffer making room for newer ones.
68+
*/
69+
static class DropOldest implements BackpressureOverflow.Strategy {
70+
static final DropOldest INSTANCE = new DropOldest();
71+
72+
private DropOldest() {}
73+
74+
@Override
75+
public boolean mayAttemptDrop() {
76+
return true;
77+
}
78+
}
79+
80+
/**
81+
* Drop most recent items, but not {@code onError} nor unsubscribe from source
82+
* (as {code OperatorOnBackpressureDrop}).
83+
*/
84+
static class DropLatest implements BackpressureOverflow.Strategy {
85+
static final DropLatest INSTANCE = new DropLatest();
86+
87+
private DropLatest() {}
88+
89+
@Override
90+
public boolean mayAttemptDrop() {
91+
return false;
92+
}
93+
}
94+
95+
/**
96+
* {@code onError} a MissingBackpressureException and unsubscribe from source.
97+
*/
98+
static class Error implements BackpressureOverflow.Strategy {
99+
100+
static final Error INSTANCE = new Error();
101+
102+
private Error() {}
103+
104+
@Override
105+
public boolean mayAttemptDrop() throws MissingBackpressureException {
106+
throw new MissingBackpressureException("Overflowed buffer");
107+
}
108+
}
109+
}

src/main/java/io/reactivex/Completable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -850,7 +850,7 @@ public final <T> Observable<T> endWith(ObservableSource<T> next) {
850850
*/
851851
@SchedulerSupport(SchedulerSupport.CUSTOM)
852852
public final <T> Flowable<T> endWith(Publisher<T> next) {
853-
return this.<T>toFlowable().endWith(next);
853+
return this.<T>toFlowable().concatWith(next);
854854
}
855855

856856
/**

0 commit comments

Comments
 (0)