Skip to content

Commit 3d25617

Browse files
authored
3.x: [Java 8] Add AutoCloseable <-> Disposable conversions, nicen docs (#6780)
1 parent 8d41cc5 commit 3d25617

File tree

6 files changed

+191
-34
lines changed

6 files changed

+191
-34
lines changed

src/main/java/io/reactivex/rxjava3/disposables/ActionDisposable.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
1818

1919
/**
20-
* A Disposable container that manages an Action instance.
20+
* A Disposable container that manages an {@link Action} instance.
2121
*/
2222
final class ActionDisposable extends ReferenceDisposable<Action> {
2323

@@ -35,4 +35,9 @@ protected void onDisposed(@NonNull Action value) {
3535
throw ExceptionHelper.wrapOrThrow(ex);
3636
}
3737
}
38+
39+
@Override
40+
public String toString() {
41+
return "ActionDisposable(disposed=" + isDisposed() + ", " + get() + ")";
42+
}
3843
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.disposables;
15+
16+
import io.reactivex.rxjava3.annotations.NonNull;
17+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
18+
19+
/**
20+
* A disposable container that manages an {@link AutoCloseable} instance.
21+
* @since 3.0.0
22+
*/
23+
final class AutoCloseableDisposable extends ReferenceDisposable<AutoCloseable> {
24+
25+
private static final long serialVersionUID = -6646144244598696847L;
26+
27+
AutoCloseableDisposable(AutoCloseable value) {
28+
super(value);
29+
}
30+
31+
@Override
32+
protected void onDisposed(@NonNull AutoCloseable value) {
33+
try {
34+
value.close();
35+
} catch (Throwable ex) {
36+
RxJavaPlugins.onError(ex);
37+
}
38+
}
39+
40+
@Override
41+
public String toString() {
42+
return "AutoCloseableDisposable(disposed=" + isDisposed() + ", " + get() + ")";
43+
}
44+
45+
}

src/main/java/io/reactivex/rxjava3/disposables/Disposables.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ private Disposables() {
3535
}
3636

3737
/**
38-
* Construct a Disposable by wrapping a Runnable that is
39-
* executed exactly once when the Disposable is disposed.
38+
* Construct a {@code Disposable} by wrapping a {@link Runnable} that is
39+
* executed exactly once when the {@code Disposable} is disposed.
4040
* @param run the Runnable to wrap
4141
* @return the new Disposable instance
4242
*/
@@ -47,8 +47,8 @@ public static Disposable fromRunnable(@NonNull Runnable run) {
4747
}
4848

4949
/**
50-
* Construct a Disposable by wrapping a Action that is
51-
* executed exactly once when the Disposable is disposed.
50+
* Construct a {@code Disposable} by wrapping a {@link Action} that is
51+
* executed exactly once when the {@code Disposable} is disposed.
5252
* @param run the Action to wrap
5353
* @return the new Disposable instance
5454
*/
@@ -59,10 +59,13 @@ public static Disposable fromAction(@NonNull Action run) {
5959
}
6060

6161
/**
62-
* Construct a Disposable by wrapping a Future that is
63-
* cancelled exactly once when the Disposable is disposed.
62+
* Construct a {@code Disposable} by wrapping a {@link Future} that is
63+
* cancelled exactly once when the {@code Disposable} is disposed.
64+
* <p>
65+
* The {@code Future} is cancelled with {@code mayInterruptIfRunning == true}.
6466
* @param future the Future to wrap
6567
* @return the new Disposable instance
68+
* @see #fromFuture(Future, boolean)
6669
*/
6770
@NonNull
6871
public static Disposable fromFuture(@NonNull Future<?> future) {
@@ -71,10 +74,10 @@ public static Disposable fromFuture(@NonNull Future<?> future) {
7174
}
7275

7376
/**
74-
* Construct a Disposable by wrapping a Future that is
75-
* cancelled exactly once when the Disposable is disposed.
77+
* Construct a {@code Disposable} by wrapping a {@link Future} that is
78+
* cancelled exactly once when the {@code Disposable} is disposed.
7679
* @param future the Future to wrap
77-
* @param allowInterrupt if true, the future cancel happens via Future.cancel(true)
80+
* @param allowInterrupt if true, the future cancel happens via {@code Future.cancel(true)}
7881
* @return the new Disposable instance
7982
*/
8083
@NonNull
@@ -84,8 +87,8 @@ public static Disposable fromFuture(@NonNull Future<?> future, boolean allowInte
8487
}
8588

8689
/**
87-
* Construct a Disposable by wrapping a Subscription that is
88-
* cancelled exactly once when the Disposable is disposed.
90+
* Construct a {@code Disposable} by wrapping a {@link Subscription} that is
91+
* cancelled exactly once when the {@code Disposable} is disposed.
8992
* @param subscription the Runnable to wrap
9093
* @return the new Disposable instance
9194
*/
@@ -96,17 +99,42 @@ public static Disposable fromSubscription(@NonNull Subscription subscription) {
9699
}
97100

98101
/**
99-
* Returns a new, non-disposed Disposable instance.
100-
* @return a new, non-disposed Disposable instance
102+
* Construct a {@code Disposable} by wrapping an {@link AutoCloseable} that is
103+
* closed exactly once when the {@code Disposable} is disposed.
104+
* @param autoCloseable the AutoCloseable to wrap
105+
* @return the new Disposable instance
106+
* @since 3.0.0
107+
*/
108+
@NonNull
109+
public static Disposable fromAutoCloseable(@NonNull AutoCloseable autoCloseable) {
110+
Objects.requireNonNull(autoCloseable, "autoCloseable is null");
111+
return new AutoCloseableDisposable(autoCloseable);
112+
}
113+
114+
/**
115+
* Construct an {@link AutoCloseable} by wrapping a {@code Disposable} that is
116+
* disposed when the returned {@code AutoCloseable} is closed.
117+
* @param disposable the Disposable instance
118+
* @return the new AutoCloseable instance
119+
* @since 3.0.0
120+
*/
121+
@NonNull
122+
public static AutoCloseable toAutoCloseable(@NonNull Disposable disposable) {
123+
return disposable::dispose;
124+
}
125+
126+
/**
127+
* Returns a new, non-disposed {@code Disposable} instance.
128+
* @return a new, non-disposed {@code Disposable} instance
101129
*/
102130
@NonNull
103131
public static Disposable empty() {
104132
return fromRunnable(Functions.EMPTY_RUNNABLE);
105133
}
106134

107135
/**
108-
* Returns a disposed Disposable instance.
109-
* @return a disposed Disposable instance
136+
* Returns a shared, disposed {@code Disposable} instance.
137+
* @return a shared, disposed {@code Disposable} instance
110138
*/
111139
@NonNull
112140
public static Disposable disposed() {

src/main/java/io/reactivex/rxjava3/disposables/FutureDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import java.util.concurrent.atomic.AtomicReference;
1717

1818
/**
19-
* A Disposable container that cancels a Future instance.
19+
* A Disposable container that cancels a {@link Future} instance.
2020
*/
2121
final class FutureDisposable extends AtomicReference<Future<?>> implements Disposable {
2222

src/main/java/io/reactivex/rxjava3/disposables/RunnableDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import io.reactivex.rxjava3.annotations.NonNull;
1616

1717
/**
18-
* A disposable container that manages a Runnable instance.
18+
* A disposable container that manages a {@link Runnable} instance.
1919
*/
2020
final class RunnableDisposable extends ReferenceDisposable<Runnable> {
2121

src/test/java/io/reactivex/rxjava3/disposables/DisposablesTest.java

Lines changed: 95 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.reactivestreams.Subscription;
2626

2727
import io.reactivex.rxjava3.core.RxJavaTest;
28+
import io.reactivex.rxjava3.exceptions.TestException;
2829
import io.reactivex.rxjava3.functions.Action;
2930
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
3031
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
@@ -34,11 +35,19 @@ public class DisposablesTest extends RxJavaTest {
3435

3536
@Test
3637
public void unsubscribeOnlyOnce() {
37-
Runnable dispose = mock(Runnable.class);
38-
Disposable subscription = Disposables.fromRunnable(dispose);
39-
subscription.dispose();
40-
subscription.dispose();
41-
verify(dispose, times(1)).run();
38+
Runnable run = mock(Runnable.class);
39+
40+
Disposable d = Disposables.fromRunnable(run);
41+
42+
assertTrue(d.toString(), d.toString().contains("RunnableDisposable(disposed=false, "));
43+
44+
d.dispose();
45+
assertTrue(d.toString(), d.toString().contains("RunnableDisposable(disposed=true, "));
46+
47+
d.dispose();
48+
assertTrue(d.toString(), d.toString().contains("RunnableDisposable(disposed=true, "));
49+
50+
verify(run, times(1)).run();
4251
}
4352

4453
@Test
@@ -61,22 +70,20 @@ public void utilityClass() {
6170
}
6271

6372
@Test
64-
public void fromAction() {
65-
class AtomicAction extends AtomicBoolean implements Action {
73+
public void fromAction() throws Throwable {
74+
Action action = mock(Action.class);
6675

67-
private static final long serialVersionUID = -1517510584253657229L;
76+
Disposable d = Disposables.fromAction(action);
6877

69-
@Override
70-
public void run() throws Exception {
71-
set(true);
72-
}
73-
}
78+
assertTrue(d.toString(), d.toString().contains("ActionDisposable(disposed=false, "));
7479

75-
AtomicAction aa = new AtomicAction();
80+
d.dispose();
81+
assertTrue(d.toString(), d.toString().contains("ActionDisposable(disposed=true, "));
7682

77-
Disposables.fromAction(aa).dispose();
83+
d.dispose();
84+
assertTrue(d.toString(), d.toString().contains("ActionDisposable(disposed=true, "));
7885

79-
assertTrue(aa.get());
86+
verify(action, times(1)).run();
8087
}
8188

8289
@Test
@@ -174,4 +181,76 @@ public void setOnceTwice() {
174181
RxJavaPlugins.reset();
175182
}
176183
}
184+
185+
@Test
186+
public void fromAutoCloseable() {
187+
AtomicInteger counter = new AtomicInteger();
188+
189+
AutoCloseable ac = () -> counter.getAndIncrement();
190+
191+
Disposable d = Disposables.fromAutoCloseable(ac);
192+
193+
assertFalse(d.isDisposed());
194+
assertEquals(0, counter.get());
195+
assertTrue(d.toString(), d.toString().contains("AutoCloseableDisposable(disposed=false, "));
196+
197+
d.dispose();
198+
199+
assertTrue(d.isDisposed());
200+
assertEquals(1, counter.get());
201+
assertTrue(d.toString(), d.toString().contains("AutoCloseableDisposable(disposed=true, "));
202+
203+
d.dispose();
204+
205+
assertTrue(d.isDisposed());
206+
assertEquals(1, counter.get());
207+
assertTrue(d.toString(), d.toString().contains("AutoCloseableDisposable(disposed=true, "));
208+
}
209+
210+
@Test
211+
public void fromAutoCloseableThrows() throws Throwable {
212+
TestHelper.withErrorTracking(errors -> {
213+
AutoCloseable ac = () -> { throw new TestException(); };
214+
215+
Disposable d = Disposables.fromAutoCloseable(ac);
216+
217+
assertFalse(d.isDisposed());
218+
219+
assertTrue(errors.isEmpty());
220+
221+
d.dispose();
222+
223+
assertTrue(d.isDisposed());
224+
assertEquals(1, errors.size());
225+
226+
d.dispose();
227+
228+
assertTrue(d.isDisposed());
229+
assertEquals(1, errors.size());
230+
231+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
232+
});
233+
}
234+
235+
@Test
236+
public void toAutoCloseable() throws Exception {
237+
AtomicInteger counter = new AtomicInteger();
238+
239+
Disposable d = Disposables.fromAction(() -> counter.getAndIncrement());
240+
241+
AutoCloseable ac = Disposables.toAutoCloseable(d);
242+
243+
assertFalse(d.isDisposed());
244+
assertEquals(0, counter.get());
245+
246+
ac.close();
247+
248+
assertTrue(d.isDisposed());
249+
assertEquals(1, counter.get());
250+
251+
ac.close();
252+
253+
assertTrue(d.isDisposed());
254+
assertEquals(1, counter.get());
255+
}
177256
}

0 commit comments

Comments
 (0)