Skip to content

Commit 3efaa64

Browse files
committed
Adding utility functions for observables of strings useful for processing non blocking IO.
1 parent 13cd6a9 commit 3efaa64

File tree

4 files changed

+537
-0
lines changed

4 files changed

+537
-0
lines changed
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package rx.observables;
2+
3+
import java.nio.ByteBuffer;
4+
import java.nio.CharBuffer;
5+
import java.nio.charset.CharacterCodingException;
6+
import java.nio.charset.Charset;
7+
import java.nio.charset.CharsetDecoder;
8+
import java.nio.charset.CharsetEncoder;
9+
import java.nio.charset.CoderResult;
10+
import java.nio.charset.CodingErrorAction;
11+
import java.util.Arrays;
12+
import java.util.regex.Pattern;
13+
14+
import rx.Observable;
15+
import rx.Observer;
16+
import rx.Subscription;
17+
import rx.Observable.OnSubscribeFunc;
18+
import rx.util.functions.Func1;
19+
import rx.util.functions.Func2;
20+
21+
public class StringObservable {
22+
/**
23+
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
24+
*
25+
* @param src
26+
* @param charsetName
27+
* @return
28+
*/
29+
public static Observable<String> decode(Observable<byte[]> src, String charsetName) {
30+
return decode(src, Charset.forName(charsetName));
31+
}
32+
33+
/**
34+
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
35+
*
36+
* @param src
37+
* @param charset
38+
* @return
39+
*/
40+
public static Observable<String> decode(Observable<byte[]> src, Charset charset) {
41+
return decode(src, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE));
42+
}
43+
44+
/**
45+
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
46+
* This method allows for more control over how malformed and unmappable characters are handled.
47+
*
48+
* @param src
49+
* @param charsetDecoder
50+
* @return
51+
*/
52+
public static Observable<String> decode(final Observable<byte[]> src, final CharsetDecoder charsetDecoder) {
53+
return Observable.create(new OnSubscribeFunc<String>() {
54+
@Override
55+
public Subscription onSubscribe(final Observer<? super String> observer) {
56+
return src.subscribe(new Observer<byte[]>() {
57+
private ByteBuffer leftOver = null;
58+
59+
@Override
60+
public void onCompleted() {
61+
if (process(null, leftOver, true))
62+
observer.onCompleted();
63+
}
64+
65+
@Override
66+
public void onError(Throwable e) {
67+
if (process(null, leftOver, true))
68+
observer.onError(e);
69+
}
70+
71+
@Override
72+
public void onNext(byte[] bytes) {
73+
process(bytes, leftOver, false);
74+
}
75+
76+
public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
77+
ByteBuffer bb;
78+
if (last != null) {
79+
if (next != null) {
80+
// merge leftover in front of the next bytes
81+
bb = ByteBuffer.allocate(last.remaining() + next.length);
82+
bb.put(last);
83+
bb.put(next);
84+
bb.flip();
85+
}
86+
else { // next == null
87+
bb = last;
88+
}
89+
}
90+
else { // last == null
91+
if (next != null) {
92+
bb = ByteBuffer.wrap(next);
93+
}
94+
else { // next == null
95+
return true;
96+
}
97+
}
98+
99+
CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * charsetDecoder.averageCharsPerByte()));
100+
CoderResult cr = charsetDecoder.decode(bb, cb, endOfInput);
101+
cb.flip();
102+
103+
if (cr.isError()) {
104+
try {
105+
cr.throwException();
106+
}
107+
catch (CharacterCodingException e) {
108+
observer.onError(e);
109+
return false;
110+
}
111+
}
112+
113+
if (bb.remaining() > 0) {
114+
leftOver = bb;
115+
}
116+
else {
117+
leftOver = null;
118+
}
119+
120+
String string = cb.toString();
121+
if (!string.isEmpty())
122+
observer.onNext(string);
123+
124+
return true;
125+
}
126+
});
127+
}
128+
});
129+
}
130+
131+
/**
132+
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
133+
*
134+
* @param src
135+
* @param charsetName
136+
* @return
137+
*/
138+
public static Observable<byte[]> encode(Observable<String> src, String charsetName) {
139+
return encode(src, Charset.forName(charsetName));
140+
}
141+
142+
/**
143+
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
144+
*
145+
* @param src
146+
* @param charset
147+
* @return
148+
*/
149+
public static Observable<byte[]> encode(Observable<String> src, Charset charset) {
150+
return encode(src, charset.newEncoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE));
151+
}
152+
153+
/**
154+
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
155+
* This method allows for more control over how malformed and unmappable characters are handled.
156+
*
157+
* @param src
158+
* @param charsetEncoder
159+
* @return
160+
*/
161+
public static Observable<byte[]> encode(Observable<String> src, final CharsetEncoder charsetEncoder) {
162+
return src.map(new Func1<String, byte[]>() {
163+
@Override
164+
public byte[] call(String str) {
165+
CharBuffer cb = CharBuffer.wrap(str);
166+
ByteBuffer bb;
167+
try {
168+
bb = charsetEncoder.encode(cb);
169+
} catch (CharacterCodingException e) {
170+
throw new RuntimeException(e);
171+
}
172+
return Arrays.copyOfRange(bb.array(), bb.position(), bb.limit());
173+
}
174+
});
175+
}
176+
177+
/**
178+
* Gather up all of the strings in to one string to be able to use it as one message. Don't use this on infinite streams.
179+
*
180+
* @param src
181+
* @return
182+
*/
183+
public static Observable<String> stringConcat(Observable<String> src) {
184+
return src.aggregate(new Func2<String, String, String>() {
185+
public String call(String a, String b) {
186+
return a + b;
187+
}
188+
});
189+
}
190+
191+
/**
192+
* Rechunks the strings based on a regex pattern and works on infinite stream.
193+
*
194+
* resplit(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
195+
* resplit(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
196+
*
197+
* See {@link Pattern}
198+
*
199+
* @param src
200+
* @param regex
201+
* @return
202+
*/
203+
public static Observable<String> split(final Observable<String> src, String regex) {
204+
final Pattern pattern = Pattern.compile(regex);
205+
return Observable.create(new OnSubscribeFunc<String>() {
206+
@Override
207+
public Subscription onSubscribe(final Observer<? super String> observer) {
208+
return src.subscribe(new Observer<String>() {
209+
private String leftOver = null;
210+
211+
@Override
212+
public void onCompleted() {
213+
output(leftOver);
214+
observer.onCompleted();
215+
}
216+
217+
@Override
218+
public void onError(Throwable e) {
219+
output(leftOver);
220+
observer.onError(e);
221+
}
222+
223+
@Override
224+
public void onNext(String segment) {
225+
String[] parts = pattern.split(segment, -1);
226+
227+
if (leftOver != null)
228+
parts[0] = leftOver + parts[0];
229+
for (int i = 0; i < parts.length - 1; i++) {
230+
String part = parts[i];
231+
output(part);
232+
}
233+
leftOver = parts[parts.length - 1];
234+
}
235+
236+
private int emptyPartCount = 0;
237+
/**
238+
* when limit == 0 trailing empty parts are not emitted.
239+
* @param part
240+
*/
241+
private void output(String part) {
242+
if (part.isEmpty()) {
243+
emptyPartCount++;
244+
}
245+
else {
246+
for(; emptyPartCount>0; emptyPartCount--)
247+
observer.onNext("");
248+
observer.onNext(part);
249+
}
250+
}
251+
});
252+
}
253+
});
254+
}
255+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package rx.util;
2+
3+
import rx.Notification;
4+
import rx.Observable;
5+
import rx.util.functions.Func1;
6+
import rx.util.functions.Func2;
7+
8+
public class AssertObservable {
9+
/**
10+
* Asserts that two Observables are equal. If they are not, an {@link AssertionError} is thrown
11+
* with the given message. If <code>expecteds</code> and <code>actuals</code> are
12+
* <code>null</code>, they are considered equal.
13+
*
14+
* @param expected
15+
* Observable with expected values.
16+
* @param actual
17+
* Observable with actual values
18+
*/
19+
public static <T> void assertObservableEqualsBlocking(Observable<T> expected, Observable<T> actual) {
20+
assertObservableEqualsBlocking(null, expected, actual);
21+
}
22+
23+
/**
24+
* Asserts that two Observables are equal. If they are not, an {@link AssertionError} is thrown
25+
* with the given message. If <code>expected</code> and <code>actual</code> are
26+
* <code>null</code>, they are considered equal.
27+
*
28+
* @param message
29+
* the identifying message for the {@link AssertionError} (<code>null</code> okay)
30+
* @param expected
31+
* Observable with expected values.
32+
* @param actual
33+
* Observable with actual values
34+
*/
35+
public static <T> void assertObservableEqualsBlocking(String message, Observable<T> expected, Observable<T> actual) {
36+
assertObservableEquals(expected, actual).toBlockingObservable().lastOrDefault(null);
37+
}
38+
39+
/**
40+
* Asserts that two {@link Observable}s are equal and returns an empty {@link Observable}. If
41+
* they are not, an {@link Observable} is returned that calls onError with an
42+
* {@link AssertionError} when subscribed to. If <code>expected</code> and <code>actual</code>
43+
* are <code>null</code>, they are considered equal.
44+
*
45+
* @param message
46+
* the identifying message for the {@link AssertionError} (<code>null</code> okay)
47+
* @param expected
48+
* Observable with expected values.
49+
* @param actual
50+
* Observable with actual values
51+
*/
52+
public static <T> Observable<Void> assertObservableEquals(Observable<T> expected, Observable<T> actual) {
53+
return assertObservableEquals(null, expected, actual);
54+
}
55+
56+
/**
57+
* Asserts that two {@link Observable}s are equal and returns an empty {@link Observable}. If
58+
* they are not, an {@link Observable} is returned that calls onError with an
59+
* {@link AssertionError} when subscribed to with the given message. If <code>expected</code>
60+
* and <code>actual</code> are <code>null</code>, they are considered equal.
61+
*
62+
* @param message
63+
* the identifying message for the {@link AssertionError} (<code>null</code> okay)
64+
* @param expected
65+
* Observable with expected values.
66+
* @param actual
67+
* Observable with actual values
68+
*/
69+
public static <T> Observable<Void> assertObservableEquals(final String message, Observable<T> expected, Observable<T> actual) {
70+
if (actual == null && expected != null) {
71+
return Observable.error(new AssertionError((message != null ? message + ": " : "") + "Actual was null and expected was not"));
72+
}
73+
if (actual != null && expected == null) {
74+
return Observable.error(new AssertionError((message != null ? message + ": " : "") + "Expected was null and actual was not"));
75+
}
76+
if (actual == null && expected == null) {
77+
return Observable.empty();
78+
}
79+
80+
Func2<? super Notification<T>, ? super Notification<T>, Notification<String>> zipFunction = new Func2<Notification<T>, Notification<T>, Notification<String>>() {
81+
@Override
82+
public Notification<String> call(Notification<T> expectedNotfication, Notification<T> actualNotification) {
83+
if (expectedNotfication.equals(actualNotification)) {
84+
StringBuilder message = new StringBuilder();
85+
message.append(expectedNotfication.getKind());
86+
if (expectedNotfication.hasValue())
87+
message.append(" ").append(expectedNotfication.getValue());
88+
if (expectedNotfication.hasThrowable())
89+
message.append(" ").append(expectedNotfication.getThrowable());
90+
return new Notification<String>("equals " + message.toString());
91+
}
92+
else {
93+
StringBuilder error = new StringBuilder();
94+
error.append("expected:<").append(expectedNotfication.getKind());
95+
if (expectedNotfication.hasValue())
96+
error.append(" ").append(expectedNotfication.getValue());
97+
if (expectedNotfication.hasThrowable())
98+
error.append(" ").append(expectedNotfication.getThrowable());
99+
error.append("> but was:<").append(actualNotification.getKind());
100+
if (actualNotification.hasValue())
101+
error.append(" ").append(actualNotification.getValue());
102+
if (actualNotification.hasThrowable())
103+
error.append(" ").append(actualNotification.getThrowable());
104+
error.append(">");
105+
106+
return new Notification<String>(new AssertionError(error.toString()));
107+
}
108+
}
109+
};
110+
111+
Func2<Notification<String>, Notification<String>, Notification<String>> accumulator = new Func2<Notification<String>, Notification<String>, Notification<String>>() {
112+
@Override
113+
public Notification<String> call(Notification<String> a, Notification<String> b) {
114+
String message = a.isOnError() ? a.getThrowable().getMessage() : a.getValue();
115+
boolean fail = a.isOnError();
116+
117+
message += "\n\t" + (b.isOnError() ? b.getThrowable().getMessage() : b.getValue());
118+
fail |= b.isOnError();
119+
120+
if (fail)
121+
return new Notification<String>(new AssertionError(message));
122+
else
123+
return new Notification<String>(message);
124+
}
125+
};
126+
127+
Observable<Void> outcomeObservable = Observable.zip(expected.materialize(), actual.materialize(), zipFunction).aggregate(accumulator).map(new Func1<Notification<String>, Notification<Void>>() {
128+
@Override
129+
public Notification<Void> call(Notification<String> outcome) {
130+
if (outcome.isOnError()) {
131+
String fullMessage = (message != null ? message + ": " : "") + "Observables are different\n\t" + outcome.getThrowable().getMessage();
132+
return new Notification<Void>(new AssertionError(fullMessage));
133+
}
134+
return new Notification<Void>();
135+
}
136+
}).dematerialize();
137+
return outcomeObservable;
138+
}
139+
}

0 commit comments

Comments
 (0)