Skip to content

Commit 3c156d4

Browse files
Merge branch 'update-string' of github.com:abersnaze/RxJava into string-observable
2 parents 93541f8 + 80aa7c6 commit 3c156d4

File tree

2 files changed

+167
-83
lines changed

2 files changed

+167
-83
lines changed

rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java

Lines changed: 127 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,9 @@
1515
*/
1616
package rx.observables;
1717

18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.io.Reader;
1821
import java.nio.ByteBuffer;
1922
import java.nio.CharBuffer;
2023
import java.nio.charset.CharacterCodingException;
@@ -27,15 +30,72 @@
2730
import java.util.regex.Pattern;
2831

2932
import rx.Observable;
30-
import rx.Observable.OnSubscribeFunc;
31-
import rx.Observer;
32-
import rx.Subscription;
33+
import rx.Observable.OnSubscribe;
34+
import rx.Subscriber;
35+
import rx.operators.Operator;
3336
import rx.util.functions.Func1;
3437
import rx.util.functions.Func2;
3538

3639
public class StringObservable {
40+
public static Observable<byte[]> from(final InputStream i) {
41+
return from(i, 8 * 1024);
42+
}
43+
44+
public static Observable<byte[]> from(final InputStream i, final int size) {
45+
return Observable.create(new OnSubscribe<byte[]>() {
46+
@Override
47+
public void call(Subscriber<? super byte[]> o) {
48+
byte[] buffer = new byte[size];
49+
try {
50+
if (o.isUnsubscribed())
51+
return;
52+
int n = 0;
53+
n = i.read(buffer);
54+
while (n != -1 && !o.isUnsubscribed()) {
55+
o.onNext(Arrays.copyOf(buffer, n));
56+
n = i.read(buffer);
57+
}
58+
} catch (IOException e) {
59+
o.onError(e);
60+
}
61+
if (o.isUnsubscribed())
62+
return;
63+
o.onCompleted();
64+
}
65+
});
66+
}
67+
68+
public static Observable<String> from(final Reader i) {
69+
return from(i, 8 * 1024);
70+
}
71+
72+
public static Observable<String> from(final Reader i, final int size) {
73+
return Observable.create(new OnSubscribe<String>() {
74+
@Override
75+
public void call(Subscriber<? super String> o) {
76+
char[] buffer = new char[size];
77+
try {
78+
if (o.isUnsubscribed())
79+
return;
80+
int n = 0;
81+
n = i.read(buffer);
82+
while (n != -1 && !o.isUnsubscribed()) {
83+
o.onNext(new String(buffer));
84+
n = i.read(buffer);
85+
}
86+
} catch (IOException e) {
87+
o.onError(e);
88+
}
89+
if (o.isUnsubscribed())
90+
return;
91+
o.onCompleted();
92+
}
93+
});
94+
}
95+
3796
/**
38-
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
97+
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
98+
* and where handles when a multibyte character spans two chunks.
3999
*
40100
* @param src
41101
* @param charsetName
@@ -46,7 +106,8 @@ public static Observable<String> decode(Observable<byte[]> src, String charsetNa
46106
}
47107

48108
/**
49-
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
109+
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
110+
* and where handles when a multibyte character spans two chunks.
50111
*
51112
* @param src
52113
* @param charset
@@ -57,30 +118,31 @@ public static Observable<String> decode(Observable<byte[]> src, Charset charset)
57118
}
58119

59120
/**
60-
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
121+
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
122+
* and where handles when a multibyte character spans two chunks.
61123
* This method allows for more control over how malformed and unmappable characters are handled.
62124
*
63125
* @param src
64126
* @param charsetDecoder
65127
* @return
66128
*/
67129
public static Observable<String> decode(final Observable<byte[]> src, final CharsetDecoder charsetDecoder) {
68-
return Observable.create(new OnSubscribeFunc<String>() {
130+
return src.lift(new Operator<String, byte[]>() {
69131
@Override
70-
public Subscription onSubscribe(final Observer<? super String> observer) {
71-
return src.subscribe(new Observer<byte[]>() {
132+
public Subscriber<? super byte[]> call(final Subscriber<? super String> o) {
133+
return new Subscriber<byte[]>(o) {
72134
private ByteBuffer leftOver = null;
73135

74136
@Override
75137
public void onCompleted() {
76138
if (process(null, leftOver, true))
77-
observer.onCompleted();
139+
o.onCompleted();
78140
}
79141

80142
@Override
81143
public void onError(Throwable e) {
82144
if (process(null, leftOver, true))
83-
observer.onError(e);
145+
o.onError(e);
84146
}
85147

86148
@Override
@@ -120,7 +182,7 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
120182
cr.throwException();
121183
}
122184
catch (CharacterCodingException e) {
123-
observer.onError(e);
185+
o.onError(e);
124186
return false;
125187
}
126188
}
@@ -134,11 +196,11 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
134196

135197
String string = cb.toString();
136198
if (!string.isEmpty())
137-
observer.onNext(string);
199+
o.onNext(string);
138200

139201
return true;
140202
}
141-
});
203+
};
142204
}
143205
});
144206
}
@@ -190,13 +252,14 @@ public byte[] call(String str) {
190252
}
191253

192254
/**
193-
* Gather up all of the strings in to one string to be able to use it as one message. Don't use this on infinite streams.
255+
* Gather up all of the strings in to one string to be able to use it as one message. Don't use
256+
* this on infinite streams.
194257
*
195258
* @param src
196259
* @return
197260
*/
198261
public static Observable<String> stringConcat(Observable<String> src) {
199-
return src.aggregate(new Func2<String, String, String>() {
262+
return src.reduce(new Func2<String, String, String>() {
200263
@Override
201264
public String call(String a, String b) {
202265
return a + b;
@@ -218,22 +281,25 @@ public String call(String a, String b) {
218281
*/
219282
public static Observable<String> split(final Observable<String> src, String regex) {
220283
final Pattern pattern = Pattern.compile(regex);
221-
return Observable.create(new OnSubscribeFunc<String>() {
284+
285+
return src.lift(new Operator<String, String>() {
222286
@Override
223-
public Subscription onSubscribe(final Observer<? super String> observer) {
224-
return src.subscribe(new Observer<String>() {
287+
public Subscriber<? super String> call(final Subscriber<? super String> o) {
288+
return new Subscriber<String>(o) {
225289
private String leftOver = null;
226290

227291
@Override
228292
public void onCompleted() {
229293
output(leftOver);
230-
observer.onCompleted();
294+
if (!o.isUnsubscribed())
295+
o.onCompleted();
231296
}
232297

233298
@Override
234299
public void onError(Throwable e) {
235300
output(leftOver);
236-
observer.onError(e);
301+
if (!o.isUnsubscribed())
302+
o.onError(e);
237303
}
238304

239305
@Override
@@ -250,24 +316,29 @@ public void onNext(String segment) {
250316
}
251317

252318
private int emptyPartCount = 0;
319+
253320
/**
254321
* when limit == 0 trailing empty parts are not emitted.
322+
*
255323
* @param part
256324
*/
257325
private void output(String part) {
258326
if (part.isEmpty()) {
259327
emptyPartCount++;
260328
}
261329
else {
262-
for(; emptyPartCount>0; emptyPartCount--)
263-
observer.onNext("");
264-
observer.onNext(part);
330+
for (; emptyPartCount > 0; emptyPartCount--)
331+
if (!o.isUnsubscribed())
332+
o.onNext("");
333+
if (!o.isUnsubscribed())
334+
o.onNext(part);
265335
}
266336
}
267-
});
337+
};
268338
}
269339
});
270340
}
341+
271342
/**
272343
* Concatenates the sequence of values by adding a separator
273344
* between them and emitting the result once the source completes.
@@ -276,49 +347,55 @@ private void output(String part) {
276347
* {@link java.lang.String#valueOf(java.lang.Object)} calls.
277348
* <p>
278349
* For example:
350+
*
279351
* <pre>
280-
* Observable&lt;Object> source = Observable.from("a", 1, "c");
281-
* Observable&lt;String> result = join(source, ", ");
352+
* Observable&lt;Object&gt; source = Observable.from(&quot;a&quot;, 1, &quot;c&quot;);
353+
* Observable&lt;String&gt; result = join(source, &quot;, &quot;);
282354
* </pre>
283355
*
284356
* will yield a single element equal to "a, 1, c".
285357
*
286-
* @param source the source sequence of CharSequence values
287-
* @param separator the separator to a
358+
* @param source
359+
* the source sequence of CharSequence values
360+
* @param separator
361+
* the separator to a
288362
* @return an Observable which emits a single String value having the concatenated
289363
* values of the source observable with the separator between elements
290364
*/
291365
public static <T> Observable<String> join(final Observable<T> source, final CharSequence separator) {
292-
return Observable.create(new OnSubscribeFunc<String>() {
293-
366+
return source.lift(new Operator<String, T>() {
294367
@Override
295-
public Subscription onSubscribe(final Observer<? super String> t1) {
296-
return source.subscribe(new Observer<T>() {
368+
public Subscriber<T> call(final Subscriber<? super String> o) {
369+
return new Subscriber<T>(o) {
297370
boolean mayAddSeparator;
298371
StringBuilder b = new StringBuilder();
372+
299373
@Override
300-
public void onNext(T args) {
301-
if (mayAddSeparator) {
302-
b.append(separator);
303-
}
304-
mayAddSeparator = true;
305-
b.append(String.valueOf(args));
374+
public void onCompleted() {
375+
String str = b.toString();
376+
b = null;
377+
if (!o.isUnsubscribed())
378+
o.onNext(str);
379+
if (!o.isUnsubscribed())
380+
o.onCompleted();
306381
}
307382

308383
@Override
309384
public void onError(Throwable e) {
310385
b = null;
311-
t1.onError(e);
386+
if (!o.isUnsubscribed())
387+
o.onError(e);
312388
}
313389

314390
@Override
315-
public void onCompleted() {
316-
String str = b.toString();
317-
b = null;
318-
t1.onNext(str);
319-
t1.onCompleted();
391+
public void onNext(Object t) {
392+
if (mayAddSeparator) {
393+
b.append(separator);
394+
}
395+
mayAddSeparator = true;
396+
b.append(String.valueOf(t));
320397
}
321-
});
398+
};
322399
}
323400
});
324401
}

0 commit comments

Comments
 (0)