Skip to content

Commit cd91a9f

Browse files
authored
2.x: better documentation on the abstract consumer classes (#5210)
1 parent da06e59 commit cd91a9f

14 files changed

+529
-15
lines changed

src/main/java/io/reactivex/observers/DefaultObserver.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,48 @@
1818
import io.reactivex.internal.disposables.DisposableHelper;
1919

2020
/**
21-
* Abstract base implementation of an Observer with support for cancelling a
21+
* Abstract base implementation of an {@link io.reactivex.Observer Observer} with support for cancelling a
2222
* subscription via {@link #cancel()} (synchronously) and calls {@link #onStart()}
2323
* when the subscription happens.
2424
*
25+
* <p>All pre-implemented final methods are thread-safe.
26+
*
27+
* <p>Use the protected {@link #cancel()} to dispose the sequence from within an
28+
* {@code onNext} implementation.
29+
*
30+
* <p>Like all other consumers, {@code DefaultObserver} can be subscribed only once.
31+
* Any subsequent attempt to subscribe it to a new source will yield an
32+
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
33+
*
34+
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
35+
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
36+
* If for some reason this can't be avoided, use {@link io.reactivex.Observable#safeSubscribe(io.reactivex.Observer)}
37+
* instead of the standard {@code subscribe()} method.
38+
*
39+
* <p>Example<code><pre>
40+
* Disposable d =
41+
* Observable.range(1, 5)
42+
* .subscribeWith(new DefaultObserver&lt;Integer>() {
43+
* &#64;Override public void onStart() {
44+
* System.out.println("Start!");
45+
* }
46+
* &#64;Override public void onNext(Integer t) {
47+
* if (t == 3) {
48+
* cancel();
49+
* }
50+
* System.out.println(t);
51+
* }
52+
* &#64;Override public void onError(Throwable t) {
53+
* t.printStackTrace();
54+
* }
55+
* &#64;Override public void onComplete() {
56+
* System.out.println("Done!");
57+
* }
58+
* });
59+
* // ...
60+
* d.dispose();
61+
* </pre></code>
62+
*
2563
* @param <T> the value type
2664
*/
2765
public abstract class DefaultObserver<T> implements Observer<T> {

src/main/java/io/reactivex/observers/DisposableCompletableObserver.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,33 @@
2121

2222
/**
2323
* An abstract {@link CompletableObserver} that allows asynchronous cancellation by implementing Disposable.
24+
*
25+
* <p>All pre-implemented final methods are thread-safe.
26+
*
27+
* <p>Like all other consumers, {@code DisposableCompletableObserver} can be subscribed only once.
28+
* Any subsequent attempt to subscribe it to a new source will yield an
29+
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
30+
*
31+
* <p>Implementation of {@link #onStart()}, {@link #onError(Throwable)} and
32+
* {@link #onComplete()} are not allowed to throw any unchecked exceptions.
33+
*
34+
* <p>Example<code><pre>
35+
* Disposable d =
36+
* Completable.complete().delay(1, TimeUnit.SECONDS)
37+
* .subscribeWith(new DisposableMaybeObserver&lt;Integer>() {
38+
* &#64;Override public void onStart() {
39+
* System.out.println("Start!");
40+
* }
41+
* &#64;Override public void onError(Throwable t) {
42+
* t.printStackTrace();
43+
* }
44+
* &#64;Override public void onComplete() {
45+
* System.out.println("Done!");
46+
* }
47+
* });
48+
* // ...
49+
* d.dispose();
50+
* </pre></code>
2451
*/
2552
public abstract class DisposableCompletableObserver implements CompletableObserver, Disposable {
2653
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();

src/main/java/io/reactivex/observers/DisposableMaybeObserver.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,41 @@
2222
/**
2323
* An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing Disposable.
2424
*
25+
* <p>All pre-implemented final methods are thread-safe.
26+
*
27+
* <p>Note that {@link #onSuccess(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} are
28+
* exclusive to each other, unlike a regular {@link io.reactivex.Observer Observer}, and
29+
* {@code onComplete()} is never called after an {@code onSuccess()}.
30+
*
31+
* <p>Like all other consumers, {@code DisposableMaybeObserver} can be subscribed only once.
32+
* Any subsequent attempt to subscribe it to a new source will yield an
33+
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
34+
*
35+
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)} and
36+
* {@link #onComplete()} are not allowed to throw any unchecked exceptions.
37+
*
38+
* <p>Example<code><pre>
39+
* Disposable d =
40+
* Maybe.just(1).delay(1, TimeUnit.SECONDS)
41+
* .subscribeWith(new DisposableMaybeObserver&lt;Integer>() {
42+
* &#64;Override public void onStart() {
43+
* System.out.println("Start!");
44+
* }
45+
* &#64;Override public void onSuccess(Integer t) {
46+
* System.out.println(t);
47+
* }
48+
* &#64;Override public void onError(Throwable t) {
49+
* t.printStackTrace();
50+
* }
51+
* &#64;Override public void onComplete() {
52+
* System.out.println("Done!");
53+
* }
54+
* });
55+
* // ...
56+
* d.dispose();
57+
* </pre></code>
58+
*
59+
*
2560
* @param <T> the received value type
2661
*/
2762
public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Disposable {

src/main/java/io/reactivex/observers/DisposableObserver.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,44 @@
2222
/**
2323
* An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable.
2424
*
25+
* <p>All pre-implemented final methods are thread-safe.
26+
*
27+
* <p>Use the protected {@link #dispose()} to dispose the sequence from within an
28+
* {@code onNext} implementation.
29+
*
30+
* <p>Like all other consumers, {@code DefaultObserver} can be subscribed only once.
31+
* Any subsequent attempt to subscribe it to a new source will yield an
32+
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
33+
*
34+
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
35+
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
36+
* If for some reason this can't be avoided, use {@link io.reactivex.Observable#safeSubscribe(io.reactivex.Observer)}
37+
* instead of the standard {@code subscribe()} method.
38+
*
39+
* <p>Example<code><pre>
40+
* Disposable d =
41+
* Observable.range(1, 5)
42+
* .subscribeWith(new DisposableObserver&lt;Integer>() {
43+
* &#64;Override public void onStart() {
44+
* System.out.println("Start!");
45+
* }
46+
* &#64;Override public void onNext(Integer t) {
47+
* if (t == 3) {
48+
* dispose();
49+
* }
50+
* System.out.println(t);
51+
* }
52+
* &#64;Override public void onError(Throwable t) {
53+
* t.printStackTrace();
54+
* }
55+
* &#64;Override public void onComplete() {
56+
* System.out.println("Done!");
57+
* }
58+
* });
59+
* // ...
60+
* d.dispose();
61+
* </pre></code>
62+
*
2563
* @param <T> the received value type
2664
*/
2765
public abstract class DisposableObserver<T> implements Observer<T>, Disposable {

src/main/java/io/reactivex/observers/DisposableSingleObserver.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,33 @@
2222
/**
2323
* An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing Disposable.
2424
*
25+
* <p>All pre-implemented final methods are thread-safe.
26+
*
27+
* <p>Like all other consumers, {@code DisposableSingleObserver} can be subscribed only once.
28+
* Any subsequent attempt to subscribe it to a new source will yield an
29+
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
30+
*
31+
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)}
32+
* are not allowed to throw any unchecked exceptions.
33+
*
34+
* <p>Example<code><pre>
35+
* Disposable d =
36+
* Single.just(1).delay(1, TimeUnit.SECONDS)
37+
* .subscribeWith(new DisposableSingleObserver&lt;Integer>() {
38+
* &#64;Override public void onStart() {
39+
* System.out.println("Start!");
40+
* }
41+
* &#64;Override public void onSuccess(Integer t) {
42+
* System.out.println(t);
43+
* }
44+
* &#64;Override public void onError(Throwable t) {
45+
* t.printStackTrace();
46+
* }
47+
* });
48+
* // ...
49+
* d.dispose();
50+
* </pre></code>
51+
*
2552
* @param <T> the received value type
2653
*/
2754
public abstract class DisposableSingleObserver<T> implements SingleObserver<T>, Disposable {

src/main/java/io/reactivex/observers/ResourceCompletableObserver.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,51 @@
2525
* An abstract {@link CompletableObserver} that allows asynchronous cancellation of its subscription and associated resources.
2626
*
2727
* <p>All pre-implemented final methods are thread-safe.
28+
*
29+
* <p>Override the protected {@link #onStart()} to perform initialization when this
30+
* {@code ResourceCompletableObserver} is subscribed to a source.
31+
*
32+
* <p>Use the protected {@link #dispose()} to dispose the sequence externally and release
33+
* all resources.
34+
*
35+
* <p>To release the associated resources, one has to call {@link #dispose()}
36+
* in {@code onError()} and {@code onComplete()} explicitly.
37+
*
38+
* <p>Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s)
39+
* with this {@code ResourceCompletableObserver} that will be cleaned up when {@link #dispose()} is called.
40+
* Removing previously associated resources is not possible but one can create a
41+
* {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this
42+
* {@code ResourceCompletableObserver} and then add/remove resources to/from the {@code CompositeDisposable}
43+
* freely.
44+
*
45+
* <p>Like all other consumers, {@code ResourceCompletableObserver} can be subscribed only once.
46+
* Any subsequent attempt to subscribe it to a new source will yield an
47+
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
48+
*
49+
* <p>Implementation of {@link #onStart()}, {@link #onError(Throwable)}
50+
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
51+
*
52+
* <p>Example<code><pre>
53+
* Disposable d =
54+
* Completable.complete().delay(1, TimeUnit.SECONDS)
55+
* .subscribeWith(new ResourceCompletableObserver() {
56+
* &#64;Override public void onStart() {
57+
* add(Schedulers.single()
58+
* .scheduleDirect(() -> System.out.println("Time!"),
59+
* 2, TimeUnit.SECONDS));
60+
* }
61+
* &#64;Override public void onError(Throwable t) {
62+
* t.printStackTrace();
63+
* dispose();
64+
* }
65+
* &#64;Override public void onComplete() {
66+
* System.out.println("Done!");
67+
* dispose();
68+
* }
69+
* });
70+
* // ...
71+
* d.dispose();
72+
* </pre></code>
2873
*/
2974
public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable {
3075
/** The active subscription. */

src/main/java/io/reactivex/observers/ResourceMaybeObserver.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,59 @@
2626
*
2727
* <p>All pre-implemented final methods are thread-safe.
2828
*
29+
* <p>Note that {@link #onSuccess(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} are
30+
* exclusive to each other, unlike a regular {@link io.reactivex.Observer Observer}, and
31+
* {@code onComplete()} is never called after an {@code onSuccess()}.
32+
*
33+
* <p>Override the protected {@link #onStart()} to perform initialization when this
34+
* {@code ResourceMaybeObserver} is subscribed to a source.
35+
*
36+
* <p>Use the protected {@link #dispose()} to dispose the sequence externally and release
37+
* all resources.
38+
*
39+
* <p>To release the associated resources, one has to call {@link #dispose()}
40+
* in {@code onSuccess()}, {@code onError()} and {@code onComplete()} explicitly.
41+
*
42+
* <p>Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s)
43+
* with this {@code ResourceMaybeObserver} that will be cleaned up when {@link #dispose()} is called.
44+
* Removing previously associated resources is not possible but one can create a
45+
* {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this
46+
* {@code ResourceMaybeObserver} and then add/remove resources to/from the {@code CompositeDisposable}
47+
* freely.
48+
*
49+
* <p>Like all other consumers, {@code ResourceMaybeObserver} can be subscribed only once.
50+
* Any subsequent attempt to subscribe it to a new source will yield an
51+
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
52+
*
53+
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)}
54+
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
55+
*
56+
* <p>Example<code><pre>
57+
* Disposable d =
58+
* Maybe.just(1).delay(1, TimeUnit.SECONDS)
59+
* .subscribeWith(new ResourceMaybeObserver&lt;Integer>() {
60+
* &#64;Override public void onStart() {
61+
* add(Schedulers.single()
62+
* .scheduleDirect(() -> System.out.println("Time!"),
63+
* 2, TimeUnit.SECONDS));
64+
* }
65+
* &#64;Override public void onSuccess(Integer t) {
66+
* System.out.println(t);
67+
* dispose();
68+
* }
69+
* &#64;Override public void onError(Throwable t) {
70+
* t.printStackTrace();
71+
* dispose();
72+
* }
73+
* &#64;Override public void onComplete() {
74+
* System.out.println("Done!");
75+
* dispose();
76+
* }
77+
* });
78+
* // ...
79+
* d.dispose();
80+
* </pre></code>
81+
*
2982
* @param <T> the value type
3083
*/
3184
public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable {

src/main/java/io/reactivex/observers/ResourceObserver.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,57 @@
2525
*
2626
* <p>All pre-implemented final methods are thread-safe.
2727
*
28+
* <p>To release the associated resources, one has to call {@link #dispose()}
29+
* in {@code onError()} and {@code onComplete()} explicitly.
30+
*
31+
* <p>Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s)
32+
* with this {@code ResourceObserver} that will be cleaned up when {@link #dispose()} is called.
33+
* Removing previously associated resources is not possible but one can create a
34+
* {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this
35+
* {@code ResourceObserver} and then add/remove resources to/from the {@code CompositeDisposable}
36+
* freely.
37+
*
38+
* <p>Use the {@link #dispose()} to dispose the sequence from within an
39+
* {@code onNext} implementation.
40+
*
41+
* <p>Like all other consumers, {@code ResourceObserver} can be subscribed only once.
42+
* Any subsequent attempt to subscribe it to a new source will yield an
43+
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
44+
*
45+
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
46+
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
47+
* If for some reason this can't be avoided, use {@link io.reactivex.Observable#safeSubscribe(io.reactivex.Observer)}
48+
* instead of the standard {@code subscribe()} method.
49+
*
50+
* <p>Example<code><pre>
51+
* Disposable d =
52+
* Observable.range(1, 5)
53+
* .subscribeWith(new ResourceObserver&lt;Integer>() {
54+
* &#64;Override public void onStart() {
55+
* add(Schedulers.single()
56+
* .scheduleDirect(() -> System.out.println("Time!"),
57+
* 2, TimeUnit.SECONDS));
58+
* request(1);
59+
* }
60+
* &#64;Override public void onNext(Integer t) {
61+
* if (t == 3) {
62+
* dispose();
63+
* }
64+
* System.out.println(t);
65+
* }
66+
* &#64;Override public void onError(Throwable t) {
67+
* t.printStackTrace();
68+
* dispose();
69+
* }
70+
* &#64;Override public void onComplete() {
71+
* System.out.println("Done!");
72+
* dispose();
73+
* }
74+
* });
75+
* // ...
76+
* d.dispose();
77+
* </pre></code>
78+
*
2879
* @param <T> the value type
2980
*/
3081
public abstract class ResourceObserver<T> implements Observer<T>, Disposable {

0 commit comments

Comments
 (0)