Skip to content

Commit 8b11ea8

Browse files
authored
2.x: Improve BehaviorSubject JavaDoc + related clarifications (#5780)
* 2.x: Improve BehaviorSubject JavaDoc + related clarifications * Fix grammar, add wiki link
1 parent ba1f40f commit 8b11ea8

File tree

3 files changed

+112
-4
lines changed

3 files changed

+112
-4
lines changed

src/main/java/io/reactivex/plugins/RxJavaPlugins.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,24 @@ public static Scheduler onComputationScheduler(@NonNull Scheduler defaultSchedul
337337

338338
/**
339339
* Called when an undeliverable error occurs.
340+
* <p>
341+
* Undeliverable errors are those {@code Observer.onError()} invocations that are not allowed to happen on
342+
* the given consumer type ({@code Observer}, {@code Subscriber}, etc.) due to protocol restrictions
343+
* because the consumer has either disposed/cancelled its {@code Disposable}/{@code Subscription} or
344+
* has already terminated with an {@code onError()} or {@code onComplete()} signal.
345+
* <p>
346+
* By default, this global error handler prints the stacktrace via {@link Throwable#printStackTrace()}
347+
* and calls {@link java.lang.Thread.UncaughtExceptionHandler#uncaughtException(Thread, Throwable)}
348+
* on the current thread.
349+
* <p>
350+
* Note that on some platforms, the platform runtime terminates the current application with an error if such
351+
* uncaught exceptions happen. In this case, it is recommended the application installs a global error
352+
* handler via the {@link #setErrorHandler(Consumer)} plugin method.
353+
*
340354
* @param error the error to report
355+
* @see #getErrorHandler()
356+
* @see #setErrorHandler(Consumer)
357+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling">Error handling Wiki</a>
341358
*/
342359
public static void onError(@NonNull Throwable error) {
343360
Consumer<? super Throwable> f = errorHandler;

src/main/java/io/reactivex/processors/BehaviorProcessor.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* <img width="640" height="460" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.BehaviorProcessor.png" alt="">
3535
* <p>
3636
* This processor does not have a public constructor by design; a new empty instance of this
37-
* {@code BehaviorSubject} can be created via the {@link #create()} method and
37+
* {@code BehaviorProcessor} can be created via the {@link #create()} method and
3838
* a new non-empty instance can be created via {@link #createDefault(Object)} (named as such to avoid
3939
* overload resolution conflict with {@code Flowable.create} that creates a Flowable, not a {@code BehaviorProcessor}).
4040
* <p>
@@ -81,15 +81,15 @@
8181
* <p>
8282
* Even though {@code BehaviorProcessor} implements the {@code Subscriber} interface, calling
8383
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
84-
* if the processor is used as a standalone source. However, calling {@code onSubscribe} is
85-
* called after the {@code BehaviorProcessor} reached its terminal state will result in the
84+
* if the processor is used as a standalone source. However, calling {@code onSubscribe}
85+
* after the {@code BehaviorProcessor} reached its terminal state will result in the
8686
* given {@code Subscription} being cancelled immediately.
8787
* <p>
8888
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
8989
* is still required to be serialized (called from the same thread or called non-overlappingly from different threads
9090
* through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s
9191
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
92-
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively.
92+
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively).
9393
* <p>
9494
* This {@code BehaviorProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
9595
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the latest observed value
@@ -112,6 +112,16 @@
112112
* <dt><b>Scheduler:</b></dt>
113113
* <dd>{@code BehaviorProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and
114114
* the {@code Subscriber}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd>
115+
* <dt><b>Error handling:</b></dt>
116+
* <dd>When the {@link #onError(Throwable)} is called, the {@code BehaviorProcessor} enters into a terminal state
117+
* and emits the same {@code Throwable} instance to the last set of {@code Subscriber}s. During this emission,
118+
* if one or more {@code Subscriber}s cancel their respective {@code Subscription}s, the
119+
* {@code Throwable} is delivered to the global error handler via
120+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Subscriber}s
121+
* cancel at once).
122+
* If there were no {@code Subscriber}s subscribed to this {@code BehaviorProcessor} when the {@code onError()}
123+
* was called, the global error handler is not invoked.
124+
* </dd>
115125
* </dl>
116126
* <p>
117127
* Example usage:

src/main/java/io/reactivex/subjects/BehaviorSubject.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,87 @@
3131
* <p>
3232
* <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.BehaviorSubject.png" alt="">
3333
* <p>
34+
* This subject does not have a public constructor by design; a new empty instance of this
35+
* {@code BehaviorSubject} can be created via the {@link #create()} method and
36+
* a new non-empty instance can be created via {@link #createDefault(Object)} (named as such to avoid
37+
* overload resolution conflict with {@code Observable.create} that creates an Observable, not a {@code BehaviorSubject}).
38+
* <p>
39+
* Since the {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
40+
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as
41+
* default initial values in {@link #createDefault(Object)} or as parameters to {@link #onNext(Object)} and
42+
* {@link #onError(Throwable)}.
43+
* <p>
44+
* Since a {@code BehaviorSubject} is an {@link io.reactivex.Observable}, it does not support backpressure.
45+
* <p>
46+
* When this {@code BehaviorSubject} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, the
47+
* last observed item (if any) is cleared and late {@link io.reactivex.Observer}s only receive
48+
* the respective terminal event.
49+
* <p>
50+
* The {@code BehaviorSubject} does not support clearing its cached value (to appear empty again), however, the
51+
* effect can be achieved by using a special item and making sure {@code Observer}s subscribe through a
52+
* filter whose predicate filters out this special item:
53+
* <pre><code>
54+
* BehaviorSubject&lt;Integer&gt; subject = BehaviorSubject.create();
55+
*
56+
* final Integer EMPTY = Integer.MIN_VALUE;
57+
*
58+
* Observable&lt;Integer&gt; observable = subject.filter(v -&gt; v != EMPTY);
59+
*
60+
* TestObserver&lt;Integer&gt; to1 = observable.test();
61+
*
62+
* observable.onNext(1);
63+
* // this will "clear" the cache
64+
* observable.onNext(EMPTY);
65+
*
66+
* TestObserver&lt;Integer&gt; to2 = observable.test();
67+
*
68+
* subject.onNext(2);
69+
* subject.onComplete();
70+
*
71+
* // to1 received both non-empty items
72+
* to1.assertResult(1, 2);
73+
*
74+
* // to2 received only 2 even though the current item was EMPTY
75+
* // when it got subscribed
76+
* to2.assertResult(2);
77+
*
78+
* // Observers coming after the subject was terminated receive
79+
* // no items and only the onComplete event in this case.
80+
* observable.test().assertResult();
81+
* </code></pre>
82+
* <p>
83+
* Even though {@code BehaviorSubject} implements the {@code Observer} interface, calling
84+
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
85+
* if the subject is used as a standalone source. However, calling {@code onSubscribe}
86+
* after the {@code BehaviorSubjecct} reached its terminal state will result in the
87+
* given {@code Disposable} being disposed immediately.
88+
* <p>
89+
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
90+
* is still required to be serialized (called from the same thread or called non-overlappingly from different threads
91+
* through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s
92+
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer}
93+
* consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively).
94+
* <p>
95+
* This {@code BehaviorSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
96+
* {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the latest observed value
97+
* in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()},
98+
* {@link #getValues()} or {@link #getValues(Object[])}.
99+
* <dl>
100+
* <dt><b>Scheduler:</b></dt>
101+
* <dd>{@code BehaviorSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
102+
* the {@code Observer}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd>
103+
* <dt><b>Error handling:</b></dt>
104+
* <dd>When the {@link #onError(Throwable)} is called, the {@code BehaviorSubject} enters into a terminal state
105+
* and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission,
106+
* if one or more {@code Observer}s dispose their respective {@code Disposable}s, the
107+
* {@code Throwable} is delivered to the global error handler via
108+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s
109+
* cancel at once).
110+
* If there were no {@code Observer}s subscribed to this {@code BehaviorSubject} when the {@code onError()}
111+
* was called, the global error handler is not invoked.
112+
* </dd>
113+
* </dl>
114+
* <p>
34115
* Example usage:
35116
* <pre> {@code
36117

0 commit comments

Comments
 (0)