Skip to content

Commit d6eaff3

Browse files
JakeWhartonakarnokd
authored andcommitted
Remove conditional resource management from async listeners. (#4338)
Rename them to have a 'Resource' prefix.
1 parent 2c3ec38 commit d6eaff3

File tree

11 files changed

+46
-122
lines changed

11 files changed

+46
-122
lines changed

src/main/java/io/reactivex/observers/Observers.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ private Observers() {
2929
throw new IllegalStateException("No instances!");
3030
}
3131

32-
public static <T> AsyncObserver<T> emptyAsync() {
33-
return new AsyncObserver<T>() {
32+
public static <T> ResourceObserver<T> emptyResource() {
33+
return new ResourceObserver<T>() {
3434
@Override
3535
public void onNext(T t) {
3636

@@ -138,21 +138,21 @@ public void onComplete() {
138138
};
139139
}
140140

141-
public static <T> AsyncObserver<T> createAsync(Consumer<? super T> onNext) {
142-
return createAsync(onNext, RxJavaPlugins.errorConsumer(), Functions.EMPTY_RUNNABLE, Functions.EMPTY_RUNNABLE);
141+
public static <T> ResourceObserver<T> createResource(Consumer<? super T> onNext) {
142+
return createResource(onNext, RxJavaPlugins.errorConsumer(), Functions.EMPTY_RUNNABLE, Functions.EMPTY_RUNNABLE);
143143
}
144144

145-
public static <T> AsyncObserver<T> createAsync(Consumer<? super T> onNext,
145+
public static <T> ResourceObserver<T> createResource(Consumer<? super T> onNext,
146146
Consumer<? super Throwable> onError) {
147-
return createAsync(onNext, onError, Functions.EMPTY_RUNNABLE, Functions.EMPTY_RUNNABLE);
147+
return createResource(onNext, onError, Functions.EMPTY_RUNNABLE, Functions.EMPTY_RUNNABLE);
148148
}
149149

150-
public static <T> AsyncObserver<T> createAsync(Consumer<? super T> onNext,
150+
public static <T> ResourceObserver<T> createResource(Consumer<? super T> onNext,
151151
Consumer<? super Throwable> onError, Runnable onComplete) {
152-
return createAsync(onNext, onError, onComplete, Functions.EMPTY_RUNNABLE);
152+
return createResource(onNext, onError, onComplete, Functions.EMPTY_RUNNABLE);
153153
}
154154

155-
public static <T> AsyncObserver<T> createAsync(
155+
public static <T> ResourceObserver<T> createResource(
156156
final Consumer<? super T> onNext,
157157
final Consumer<? super Throwable> onError,
158158
final Runnable onComplete,
@@ -161,7 +161,7 @@ public static <T> AsyncObserver<T> createAsync(
161161
Objects.requireNonNull(onError, "onError is null");
162162
Objects.requireNonNull(onComplete, "onComplete is null");
163163
Objects.requireNonNull(onStart, "onStart is null");
164-
return new AsyncObserver<T>() {
164+
return new ResourceObserver<T>() {
165165
boolean done;
166166
@Override
167167
protected void onStart() {

src/main/java/io/reactivex/observers/AsyncObserver.java renamed to src/main/java/io/reactivex/observers/ResourceObserver.java

Lines changed: 10 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,68 +21,29 @@
2121
import io.reactivex.internal.functions.Objects;
2222

2323
/**
24-
* An abstract Subscriber implementation that allows asynchronous cancellation of its
25-
* subscription.
26-
*
27-
* <p>This implementation let's you chose if the AsyncObserver manages resources or not,
28-
* thus saving memory on cases where there is no need for that.
24+
* An abstract Observer that allows asynchronous cancellation of its subscription and associated resources.
2925
*
3026
* <p>All pre-implemented final methods are thread-safe.
3127
*
3228
* @param <T> the value type
3329
*/
34-
public abstract class AsyncObserver<T> implements Observer<T>, Disposable {
30+
public abstract class ResourceObserver<T> implements Observer<T>, Disposable {
3531
/** The active subscription. */
3632
private final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
3733

3834
/** The resource composite, can be null. */
39-
private final ListCompositeDisposable resources;
35+
private final ListCompositeDisposable resources = new ListCompositeDisposable();
4036

4137
/**
42-
* Constructs an AsyncObserver with resource support.
43-
*/
44-
public AsyncObserver() {
45-
this(true);
46-
}
47-
48-
/**
49-
* Constructs an AsyncObserver and allows specifying if it should support resources or not.
50-
* @param withResources true if resource support should be on.
51-
*/
52-
public AsyncObserver(boolean withResources) {
53-
this.resources = withResources ? new ListCompositeDisposable() : null;
54-
}
55-
56-
/**
57-
* Adds a resource to this AsyncObserver.
58-
*
59-
* <p>Note that if the AsyncObserver doesn't manage resources, this method will
60-
* throw an IllegalStateException. Use {@link #supportsResources()} to determine if
61-
* this AsyncObserver manages resources or not.
38+
* Adds a resource to this ResourceObserver.
6239
*
6340
* @param resource the resource to add
6441
*
6542
* @throws NullPointerException if resource is null
66-
* @throws IllegalStateException if this AsyncObserver doesn't manage resources
67-
* @see #supportsResources()
6843
*/
6944
public final void add(Disposable resource) {
7045
Objects.requireNonNull(resource, "resource is null");
71-
if (resources != null) {
72-
add(resource);
73-
} else {
74-
resource.dispose();
75-
throw new IllegalStateException("This AsyncObserver doesn't manage additional resources");
76-
}
77-
}
78-
79-
/**
80-
* Returns true if this AsyncObserver supports resources added via the add() method.
81-
* @return true if this AsyncObserver supports resources added via the add() method
82-
* @see #add(Disposable)
83-
*/
84-
public final boolean supportsResources() {
85-
return resources != null;
46+
resources.add(resource);
8647
}
8748

8849
@Override
@@ -93,7 +54,7 @@ public final void onSubscribe(Disposable s) {
9354
}
9455

9556
/**
96-
* Called once the upstream sets a Subscription on this AsyncObserver.
57+
* Called once the upstream sets a Subscription on this ResourceObserver.
9758
*
9859
* <p>You can perform initialization at this moment. The default
9960
* implementation does nothing.
@@ -103,13 +64,13 @@ protected void onStart() {
10364

10465
/**
10566
* Cancels the main disposable (if any) and disposes the resources associated with
106-
* this AsyncObserver (if any).
67+
* this ResourceObserver (if any).
10768
*
10869
* <p>This method can be called before the upstream calls onSubscribe at which
10970
* case the main Disposable will be immediately disposed.
11071
*/
11172
protected final void cancel() {
112-
if (DisposableHelper.dispose(s) && resources != null) {
73+
if (DisposableHelper.dispose(s)) {
11374
resources.dispose();
11475
}
11576
}
@@ -120,8 +81,8 @@ public final void dispose() {
12081
}
12182

12283
/**
123-
* Returns true if this AsyncObserver has been disposed/cancelled.
124-
* @return true if this AsyncObserver has been disposed/cancelled
84+
* Returns true if this ResourceObserver has been disposed/cancelled.
85+
* @return true if this ResourceObserver has been disposed/cancelled
12586
*/
12687
@Override
12788
public final boolean isDisposed() {

src/main/java/io/reactivex/subscribers/AsyncSubscriber.java renamed to src/main/java/io/reactivex/subscribers/ResourceSubscriber.java

Lines changed: 7 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.reactivex.internal.util.BackpressureHelper;
2424

2525
/**
26-
* An abstract Subscriber implementation that allows asynchronous cancellation of its
26+
* An abstract Subscriber that allows asynchronous cancellation of its
2727
* subscription.
2828
*
2929
* <p>This implementation let's you chose if the AsyncObserver manages resources or not,
@@ -33,63 +33,26 @@
3333
*
3434
* @param <T> the value type
3535
*/
36-
public abstract class AsyncSubscriber<T> implements Subscriber<T>, Disposable {
36+
public abstract class ResourceSubscriber<T> implements Subscriber<T>, Disposable {
3737
/** The active subscription. */
38-
private final AtomicReference<Subscription> s;
38+
private final AtomicReference<Subscription> s = new AtomicReference<Subscription>();
3939

4040
/** The resource composite, can be null. */
41-
private final CompositeDisposable resources;
41+
private final CompositeDisposable resources = new CompositeDisposable();
4242

4343
/** Remembers the request(n) counts until a subscription arrives. */
44-
private final AtomicLong missedRequested;
45-
46-
/**
47-
* Constructs an AsyncObserver with resource support.
48-
*/
49-
public AsyncSubscriber() {
50-
this(true);
51-
}
52-
53-
/**
54-
* Constructs an AsyncObserver and allows specifying if it should support resources or not.
55-
* @param withResources true if resource support should be on.
56-
*/
57-
public AsyncSubscriber(boolean withResources) {
58-
this.resources = withResources ? new CompositeDisposable() : null;
59-
this.missedRequested = new AtomicLong();
60-
this.s = new AtomicReference<Subscription>();
61-
}
44+
private final AtomicLong missedRequested = new AtomicLong();
6245

6346
/**
6447
* Adds a resource to this AsyncObserver.
6548
*
66-
* <p>Note that if the AsyncObserver doesn't manage resources, this method will
67-
* throw an IllegalStateException. Use {@link #supportsResources()} to determine if
68-
* this AsyncObserver manages resources or not.
69-
*
7049
* @param resource the resource to add
7150
*
7251
* @throws NullPointerException if resource is null
73-
* @throws IllegalStateException if this AsyncObserver doesn't manage resources
74-
* @see #supportsResources()
7552
*/
7653
public final void add(Disposable resource) {
7754
Objects.requireNonNull(resource, "resource is null");
78-
if (resources != null) {
79-
add(resource);
80-
} else {
81-
resource.dispose();
82-
throw new IllegalStateException("This AsyncObserver doesn't manage additional resources");
83-
}
84-
}
85-
86-
/**
87-
* Returns true if this AsyncObserver supports resources added via the add() method.
88-
* @return true if this AsyncObserver supports resources added via the add() method
89-
* @see #add(Disposable)
90-
*/
91-
public final boolean supportsResources() {
92-
return resources != null;
55+
resources.add(resource);
9356
}
9457

9558
@Override
@@ -148,7 +111,7 @@ protected final void request(long n) {
148111
* case the Subscription will be immediately cancelled.
149112
*/
150113
protected final void cancel() {
151-
if (SubscriptionHelper.dispose(s) && resources != null) {
114+
if (SubscriptionHelper.dispose(s)) {
152115
resources.dispose();
153116
}
154117
}

src/main/java/io/reactivex/subscribers/Subscribers.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ public void onComplete() {
255255
}
256256
};
257257
}
258-
public static <T> AsyncSubscriber<T> emptyAsync() {
259-
return new AsyncSubscriber<T>() {
258+
public static <T> ResourceSubscriber<T> emptyResource() {
259+
return new ResourceSubscriber<T>() {
260260
@Override
261261
public void onNext(T t) {
262262

@@ -350,21 +350,21 @@ public void onComplete() {
350350
};
351351
}
352352

353-
public static <T> AsyncSubscriber<T> createAsync(Consumer<? super T> onNext) {
354-
return createAsync(onNext, RxJavaPlugins.errorConsumer(), Functions.EMPTY_RUNNABLE, Functions.EMPTY_RUNNABLE);
353+
public static <T> ResourceSubscriber<T> createResource(Consumer<? super T> onNext) {
354+
return createResource(onNext, RxJavaPlugins.errorConsumer(), Functions.EMPTY_RUNNABLE, Functions.EMPTY_RUNNABLE);
355355
}
356356

357-
public static <T> AsyncSubscriber<T> createAsync(Consumer<? super T> onNext,
357+
public static <T> ResourceSubscriber<T> createResource(Consumer<? super T> onNext,
358358
Consumer<? super Throwable> onError) {
359-
return createAsync(onNext, onError, Functions.EMPTY_RUNNABLE, Functions.EMPTY_RUNNABLE);
359+
return createResource(onNext, onError, Functions.EMPTY_RUNNABLE, Functions.EMPTY_RUNNABLE);
360360
}
361361

362-
public static <T> AsyncSubscriber<T> createAsync(Consumer<? super T> onNext,
362+
public static <T> ResourceSubscriber<T> createResource(Consumer<? super T> onNext,
363363
Consumer<? super Throwable> onError, Runnable onComplete) {
364-
return createAsync(onNext, onError, onComplete, Functions.EMPTY_RUNNABLE);
364+
return createResource(onNext, onError, onComplete, Functions.EMPTY_RUNNABLE);
365365
}
366366

367-
public static <T> AsyncSubscriber<T> createAsync(
367+
public static <T> ResourceSubscriber<T> createResource(
368368
final Consumer<? super T> onNext,
369369
final Consumer<? super Throwable> onError,
370370
final Runnable onComplete,
@@ -373,7 +373,7 @@ public static <T> AsyncSubscriber<T> createAsync(
373373
Objects.requireNonNull(onError, "onError is null");
374374
Objects.requireNonNull(onComplete, "onComplete is null");
375375
Objects.requireNonNull(onStart, "onStart is null");
376-
return new AsyncSubscriber<T>() {
376+
return new ResourceSubscriber<T>() {
377377
boolean done;
378378
@Override
379379
protected void onStart() {

src/test/java/io/reactivex/flowable/FlowableBackpressureTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ public void testUserSubscriberUsingRequestSync() {
386386
final AtomicInteger totalReceived = new AtomicInteger();
387387
final AtomicInteger batches = new AtomicInteger();
388388
final AtomicInteger received = new AtomicInteger();
389-
incrementingIntegers(c).subscribe(new AsyncSubscriber<Integer>() {
389+
incrementingIntegers(c).subscribe(new ResourceSubscriber<Integer>() {
390390

391391
@Override
392392
public void onStart() {
@@ -433,7 +433,7 @@ public void testUserSubscriberUsingRequestAsync() throws InterruptedException {
433433
final AtomicInteger batches = new AtomicInteger();
434434
final CountDownLatch latch = new CountDownLatch(1);
435435
incrementingIntegers(c).subscribeOn(Schedulers.newThread()).subscribe(
436-
new AsyncSubscriber<Integer>() {
436+
new ResourceSubscriber<Integer>() {
437437

438438
@Override
439439
public void onStart() {

src/test/java/io/reactivex/flowable/FlowableSubscriberTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void cancel() {
204204

205205
});
206206

207-
AsyncSubscriber<String> as = new AsyncSubscriber<String>() {
207+
ResourceSubscriber<String> as = new ResourceSubscriber<String>() {
208208

209209
@Override
210210
protected void onStart() {

src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,7 @@ public void testBufferWithTimeDoesntUnsubscribeDownstream() throws InterruptedEx
989989
final Subscriber<Object> o = TestHelper.mockSubscriber();
990990

991991
final CountDownLatch cdl = new CountDownLatch(1);
992-
AsyncSubscriber<Object> s = new AsyncSubscriber<Object>() {
992+
ResourceSubscriber<Object> s = new ResourceSubscriber<Object>() {
993993
@Override
994994
public void onNext(Object t) {
995995
o.onNext(t);

src/test/java/io/reactivex/internal/operators/flowable/FlowableFromSourceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ public void subscribe(final FlowableEmitter<Integer> t) {
637637

638638
this.current = t;
639639

640-
final AsyncSubscriber<Integer> as = new AsyncSubscriber<Integer>() {
640+
final ResourceSubscriber<Integer> as = new ResourceSubscriber<Integer>() {
641641

642642
@Override
643643
public void onComplete() {

src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,7 @@ public void testBufferWithTimeDoesntUnsubscribeDownstream() throws InterruptedEx
763763
final Observer<Object> o = TestHelper.mockObserver();
764764

765765
final CountDownLatch cdl = new CountDownLatch(1);
766-
AsyncObserver<Object> s = new AsyncObserver<Object>() {
766+
ResourceObserver<Object> s = new ResourceObserver<Object>() {
767767
@Override
768768
public void onNext(Object t) {
769769
o.onNext(t);

src/test/java/io/reactivex/schedulers/AbstractSchedulerConcurrencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ public void request(long n) {
317317

318318
final AtomicInteger count = new AtomicInteger();
319319
final AtomicBoolean completed = new AtomicBoolean(false);
320-
AsyncSubscriber<Integer> s = new AsyncSubscriber<Integer>() {
320+
ResourceSubscriber<Integer> s = new ResourceSubscriber<Integer>() {
321321
@Override
322322
public void onComplete() {
323323
System.out.println("Completed");

0 commit comments

Comments
 (0)