Skip to content

Commit 5d7995c

Browse files
committed
utility function for creating observables for closeable resources
1 parent 428c0b5 commit 5d7995c

File tree

2 files changed

+230
-45
lines changed

2 files changed

+230
-45
lines changed

rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java

Lines changed: 101 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@
1515
*/
1616
package rx.observables;
1717

18+
import rx.Observable;
19+
import rx.Observable.OnSubscribe;
20+
import rx.Observable.Operator;
21+
import rx.Subscriber;
22+
import rx.Subscription;
23+
import rx.functions.Func0;
24+
import rx.functions.Func1;
25+
import rx.functions.Func2;
26+
27+
import java.io.Closeable;
1828
import java.io.IOException;
1929
import java.io.InputStream;
2030
import java.io.Reader;
@@ -27,15 +37,9 @@
2737
import java.nio.charset.CoderResult;
2838
import java.nio.charset.CodingErrorAction;
2939
import java.util.Arrays;
40+
import java.util.concurrent.atomic.AtomicBoolean;
3041
import java.util.regex.Pattern;
3142

32-
import rx.Observable;
33-
import rx.Observable.OnSubscribe;
34-
import rx.Observable.Operator;
35-
import rx.Subscriber;
36-
import rx.functions.Func1;
37-
import rx.functions.Func2;
38-
3943
public class StringObservable {
4044
/**
4145
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
@@ -50,6 +54,73 @@ public class StringObservable {
5054
public static Observable<byte[]> from(final InputStream i) {
5155
return from(i, 8 * 1024);
5256
}
57+
58+
private static class CloseableResource<S extends AutoCloseable> implements Subscription {
59+
private final AtomicBoolean unsubscribed = new AtomicBoolean();
60+
private S closable;
61+
62+
public CloseableResource(S closeable) {
63+
this.closable = closeable;
64+
}
65+
66+
@Override
67+
public void unsubscribe() {
68+
if (unsubscribed.compareAndSet(false, true)) {
69+
try {
70+
closable.close();
71+
} catch (Exception e) {
72+
throw new RuntimeException(e);
73+
}
74+
}
75+
}
76+
77+
@Override
78+
public boolean isUnsubscribed() {
79+
return unsubscribed.get();
80+
}
81+
}
82+
83+
/**
84+
* Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations.
85+
* @see StringObservable#from(UnsafeFunc0, UnsafeFunc1)
86+
*
87+
* @param <R>
88+
*/
89+
public static interface UnsafeFunc0<R> {
90+
public R call() throws Throwable;
91+
}
92+
93+
/**
94+
* Helps in creating an Observable that automatically calls {@link Closeable#close()} on completion, error or unsubscribe.
95+
*
96+
* <pre>
97+
* StringObservable.using(() -> new FileReader(file), (reader) -> StringObservable.from(reader))
98+
* </pre>
99+
*
100+
* @param resourceFactory
101+
* Generates a new {@link Closeable} resource for each new subscription to the returned Observable
102+
* @param observableFactory
103+
* Converts the {@link Closeable} resource into a {@link Observable} with {@link #from(InputStream)} or {@link #from(Reader)}
104+
* @return
105+
*/
106+
public static <R, S extends AutoCloseable> Observable<R> using(final UnsafeFunc0<S> resourceFactory,
107+
final Func1<S, Observable<R>> observableFactory) {
108+
return Observable.using(new Func0<CloseableResource<S>>() {
109+
@Override
110+
public CloseableResource<S> call() {
111+
try {
112+
return new CloseableResource<S>(resourceFactory.call());
113+
} catch (Throwable e) {
114+
throw new RuntimeException(e);
115+
}
116+
}
117+
}, new Func1<CloseableResource<S>, Observable<R>>() {
118+
@Override
119+
public Observable<R> call(CloseableResource<S> t1) {
120+
return observableFactory.call(t1.closable);
121+
}
122+
});
123+
}
53124

54125
/**
55126
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
@@ -320,10 +391,24 @@ public byte[] call(String str) {
320391
* @return the Observable returing all strings concatenated as a single string
321392
*/
322393
public static Observable<String> stringConcat(Observable<String> src) {
323-
return src.reduce(new Func2<String, String, String>() {
394+
return toString(src.reduce(new StringBuilder(), new Func2<StringBuilder, String, StringBuilder>() {
324395
@Override
325-
public String call(String a, String b) {
326-
return a + b;
396+
public StringBuilder call(StringBuilder a, String b) {
397+
return a.append(b);
398+
}
399+
}));
400+
}
401+
402+
/**
403+
* Maps {@link Observable}&lt;{@link Object}&gt; to {@link Observable}&lt;{@link String}&gt; by using {@link String#valueOf(Object)}
404+
* @param src
405+
* @return
406+
*/
407+
public static Observable<String> toString(Observable<?> src) {
408+
return src.map(new Func1<Object, String>() {
409+
@Override
410+
public String call(Object obj) {
411+
return String.valueOf(obj);
327412
}
328413
});
329414
}
@@ -429,11 +514,11 @@ private void output(String part) {
429514
* @return an Observable which emits a single String value having the concatenated
430515
* values of the source observable with the separator between elements
431516
*/
432-
public static <T> Observable<String> join(final Observable<T> source, final CharSequence separator) {
433-
return source.lift(new Operator<String, T>() {
517+
public static Observable<String> join(final Observable<String> source, final CharSequence separator) {
518+
return source.lift(new Operator<String, String>() {
434519
@Override
435-
public Subscriber<T> call(final Subscriber<? super String> o) {
436-
return new Subscriber<T>(o) {
520+
public Subscriber<String> call(final Subscriber<? super String> o) {
521+
return new Subscriber<String>(o) {
437522
boolean mayAddSeparator;
438523
StringBuilder b = new StringBuilder();
439524

@@ -455,12 +540,12 @@ public void onError(Throwable e) {
455540
}
456541

457542
@Override
458-
public void onNext(Object t) {
543+
public void onNext(String t) {
459544
if (mayAddSeparator) {
460545
b.append(separator);
461546
}
462547
mayAddSeparator = true;
463-
b.append(String.valueOf(t));
548+
b.append(t);
464549
}
465550
};
466551
}

0 commit comments

Comments
 (0)