1
1
/**
2
2
* Copyright 2013 Netflix, Inc.
3
- *
3
+ *
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
6
6
* You may obtain a copy of the License at
7
- *
8
- * http://www.apache.org/licenses/LICENSE-2.0
9
- *
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
10
* Unless required by applicable law or agreed to in writing, software
11
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
15
*/
16
16
package rx .observables ;
17
17
18
+ import java .io .IOException ;
19
+ import java .io .InputStream ;
20
+ import java .io .Reader ;
18
21
import java .nio .ByteBuffer ;
19
22
import java .nio .CharBuffer ;
20
23
import java .nio .charset .CharacterCodingException ;
27
30
import java .util .regex .Pattern ;
28
31
29
32
import rx .Observable ;
30
- import rx .Observable .OnSubscribeFunc ;
31
- import rx .Observer ;
32
- import rx .Subscription ;
33
+ import rx .Observable .OnSubscribe ;
34
+ import rx .Subscriber ;
35
+ import rx .operators . Operator ;
33
36
import rx .util .functions .Func1 ;
34
37
import rx .util .functions .Func2 ;
35
38
36
39
public class StringObservable {
40
+ public static Observable <byte []> from (final InputStream i ) {
41
+ return from (i , 8 * 1024 );
42
+ }
43
+
44
+ public static Observable <byte []> from (final InputStream i , final int size ) {
45
+ return Observable .create (new OnSubscribe <byte []>() {
46
+ @ Override
47
+ public void call (Subscriber <? super byte []> o ) {
48
+ byte [] buffer = new byte [size ];
49
+ try {
50
+ if (o .isUnsubscribed ())
51
+ return ;
52
+ int n = 0 ;
53
+ n = i .read (buffer );
54
+ while (n != -1 && !o .isUnsubscribed ()) {
55
+ o .onNext (Arrays .copyOf (buffer , n ));
56
+ n = i .read (buffer );
57
+ }
58
+ } catch (IOException e ) {
59
+ o .onError (e );
60
+ }
61
+ if (o .isUnsubscribed ())
62
+ return ;
63
+ o .onCompleted ();
64
+ }
65
+ });
66
+ }
67
+
68
+ public static Observable <String > from (final Reader i ) {
69
+ return from (i , 8 * 1024 );
70
+ }
71
+
72
+ public static Observable <String > from (final Reader i , final int size ) {
73
+ return Observable .create (new OnSubscribe <String >() {
74
+ @ Override
75
+ public void call (Subscriber <? super String > o ) {
76
+ char [] buffer = new char [size ];
77
+ try {
78
+ if (o .isUnsubscribed ())
79
+ return ;
80
+ int n = 0 ;
81
+ n = i .read (buffer );
82
+ while (n != -1 && !o .isUnsubscribed ()) {
83
+ o .onNext (new String (buffer ));
84
+ n = i .read (buffer );
85
+ }
86
+ } catch (IOException e ) {
87
+ o .onError (e );
88
+ }
89
+ if (o .isUnsubscribed ())
90
+ return ;
91
+ o .onCompleted ();
92
+ }
93
+ });
94
+ }
95
+
37
96
/**
38
- * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
97
+ * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
98
+ * and where handles when a multibyte character spans two chunks.
39
99
*
40
100
* @param src
41
101
* @param charsetName
@@ -46,7 +106,8 @@ public static Observable<String> decode(Observable<byte[]> src, String charsetNa
46
106
}
47
107
48
108
/**
49
- * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
109
+ * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
110
+ * and where handles when a multibyte character spans two chunks.
50
111
*
51
112
* @param src
52
113
* @param charset
@@ -57,30 +118,31 @@ public static Observable<String> decode(Observable<byte[]> src, Charset charset)
57
118
}
58
119
59
120
/**
60
- * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
121
+ * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
122
+ * and where handles when a multibyte character spans two chunks.
61
123
* This method allows for more control over how malformed and unmappable characters are handled.
62
124
*
63
125
* @param src
64
126
* @param charsetDecoder
65
127
* @return
66
128
*/
67
129
public static Observable <String > decode (final Observable <byte []> src , final CharsetDecoder charsetDecoder ) {
68
- return Observable . create (new OnSubscribeFunc <String >() {
130
+ return src . lift (new Operator <String , byte [] >() {
69
131
@ Override
70
- public Subscription onSubscribe (final Observer <? super String > observer ) {
71
- return src . subscribe ( new Observer <byte []>() {
132
+ public Subscriber <? super byte []> call (final Subscriber <? super String > o ) {
133
+ return new Subscriber <byte []>(o ) {
72
134
private ByteBuffer leftOver = null ;
73
135
74
136
@ Override
75
137
public void onCompleted () {
76
138
if (process (null , leftOver , true ))
77
- observer .onCompleted ();
139
+ o .onCompleted ();
78
140
}
79
141
80
142
@ Override
81
143
public void onError (Throwable e ) {
82
144
if (process (null , leftOver , true ))
83
- observer .onError (e );
145
+ o .onError (e );
84
146
}
85
147
86
148
@ Override
@@ -120,7 +182,7 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
120
182
cr .throwException ();
121
183
}
122
184
catch (CharacterCodingException e ) {
123
- observer .onError (e );
185
+ o .onError (e );
124
186
return false ;
125
187
}
126
188
}
@@ -134,11 +196,11 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
134
196
135
197
String string = cb .toString ();
136
198
if (!string .isEmpty ())
137
- observer .onNext (string );
199
+ o .onNext (string );
138
200
139
201
return true ;
140
202
}
141
- }) ;
203
+ };
142
204
}
143
205
});
144
206
}
@@ -190,13 +252,14 @@ public byte[] call(String str) {
190
252
}
191
253
192
254
/**
193
- * Gather up all of the strings in to one string to be able to use it as one message. Don't use this on infinite streams.
255
+ * Gather up all of the strings in to one string to be able to use it as one message. Don't use
256
+ * this on infinite streams.
194
257
*
195
258
* @param src
196
259
* @return
197
260
*/
198
261
public static Observable <String > stringConcat (Observable <String > src ) {
199
- return src .aggregate (new Func2 <String , String , String >() {
262
+ return src .reduce (new Func2 <String , String , String >() {
200
263
@ Override
201
264
public String call (String a , String b ) {
202
265
return a + b ;
@@ -218,22 +281,25 @@ public String call(String a, String b) {
218
281
*/
219
282
public static Observable <String > split (final Observable <String > src , String regex ) {
220
283
final Pattern pattern = Pattern .compile (regex );
221
- return Observable .create (new OnSubscribeFunc <String >() {
284
+
285
+ return src .lift (new Operator <String , String >() {
222
286
@ Override
223
- public Subscription onSubscribe (final Observer <? super String > observer ) {
224
- return src . subscribe ( new Observer <String >() {
287
+ public Subscriber <? super String > call (final Subscriber <? super String > o ) {
288
+ return new Subscriber <String >(o ) {
225
289
private String leftOver = null ;
226
290
227
291
@ Override
228
292
public void onCompleted () {
229
293
output (leftOver );
230
- observer .onCompleted ();
294
+ if (!o .isUnsubscribed ())
295
+ o .onCompleted ();
231
296
}
232
297
233
298
@ Override
234
299
public void onError (Throwable e ) {
235
300
output (leftOver );
236
- observer .onError (e );
301
+ if (!o .isUnsubscribed ())
302
+ o .onError (e );
237
303
}
238
304
239
305
@ Override
@@ -250,24 +316,29 @@ public void onNext(String segment) {
250
316
}
251
317
252
318
private int emptyPartCount = 0 ;
319
+
253
320
/**
254
321
* when limit == 0 trailing empty parts are not emitted.
322
+ *
255
323
* @param part
256
324
*/
257
325
private void output (String part ) {
258
326
if (part .isEmpty ()) {
259
327
emptyPartCount ++;
260
328
}
261
329
else {
262
- for (; emptyPartCount >0 ; emptyPartCount --)
263
- observer .onNext ("" );
264
- observer .onNext (part );
330
+ for (; emptyPartCount > 0 ; emptyPartCount --)
331
+ if (!o .isUnsubscribed ())
332
+ o .onNext ("" );
333
+ if (!o .isUnsubscribed ())
334
+ o .onNext (part );
265
335
}
266
336
}
267
- }) ;
337
+ };
268
338
}
269
339
});
270
340
}
341
+
271
342
/**
272
343
* Concatenates the sequence of values by adding a separator
273
344
* between them and emitting the result once the source completes.
@@ -276,49 +347,55 @@ private void output(String part) {
276
347
* {@link java.lang.String#valueOf(java.lang.Object)} calls.
277
348
* <p>
278
349
* For example:
350
+ *
279
351
* <pre>
280
- * Observable<Object> source = Observable.from("a" , 1, "c" );
281
- * Observable<String> result = join(source, ", " );
352
+ * Observable<Object> source = Observable.from("a" , 1, "c" );
353
+ * Observable<String> result = join(source, ", " );
282
354
* </pre>
283
355
*
284
356
* will yield a single element equal to "a, 1, c".
285
357
*
286
- * @param source the source sequence of CharSequence values
287
- * @param separator the separator to a
358
+ * @param source
359
+ * the source sequence of CharSequence values
360
+ * @param separator
361
+ * the separator to a
288
362
* @return an Observable which emits a single String value having the concatenated
289
363
* values of the source observable with the separator between elements
290
364
*/
291
365
public static <T > Observable <String > join (final Observable <T > source , final CharSequence separator ) {
292
- return Observable .create (new OnSubscribeFunc <String >() {
293
-
366
+ return source .lift (new Operator <String , T >() {
294
367
@ Override
295
- public Subscription onSubscribe (final Observer <? super String > t1 ) {
296
- return source . subscribe ( new Observer <T >() {
368
+ public Subscriber < T > call (final Subscriber <? super String > o ) {
369
+ return new Subscriber <T >(o ) {
297
370
boolean mayAddSeparator ;
298
371
StringBuilder b = new StringBuilder ();
372
+
299
373
@ Override
300
- public void onNext (T args ) {
301
- if (mayAddSeparator ) {
302
- b .append (separator );
303
- }
304
- mayAddSeparator = true ;
305
- b .append (String .valueOf (args ));
374
+ public void onCompleted () {
375
+ String str = b .toString ();
376
+ b = null ;
377
+ if (!o .isUnsubscribed ())
378
+ o .onNext (str );
379
+ if (!o .isUnsubscribed ())
380
+ o .onCompleted ();
306
381
}
307
382
308
383
@ Override
309
384
public void onError (Throwable e ) {
310
385
b = null ;
311
- t1 .onError (e );
386
+ if (!o .isUnsubscribed ())
387
+ o .onError (e );
312
388
}
313
389
314
390
@ Override
315
- public void onCompleted () {
316
- String str = b .toString ();
317
- b = null ;
318
- t1 .onNext (str );
319
- t1 .onCompleted ();
391
+ public void onNext (Object t ) {
392
+ if (mayAddSeparator ) {
393
+ b .append (separator );
394
+ }
395
+ mayAddSeparator = true ;
396
+ b .append (String .valueOf (t ));
320
397
}
321
- }) ;
398
+ };
322
399
}
323
400
});
324
401
}
0 commit comments