Skip to content

Commit f81c0ac

Browse files
committed
Reimplementation of Concat, improved handling of Observable<Observable<T>>.
The old version required all of the Observable<T>s to be generated and buffered before the concat could begin. If the outer Observable was asynchronous, items could be dropped (test added). The new version passes the test, and does the best job I could (after examining several possible strategies) of achieving clear and consistent semantics in accordance with the principle of least surprise.
1 parent f4968d6 commit f81c0ac

File tree

2 files changed

+124
-89
lines changed

2 files changed

+124
-89
lines changed

rxjava-core/src/main/java/rx/operators/OperationConcat.java

Lines changed: 122 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
import static org.junit.Assert.*;
1919
import static org.mockito.Mockito.*;
2020

21-
import java.lang.reflect.Array;
21+
import java.util.Arrays;
2222
import java.util.ArrayList;
2323
import java.util.List;
2424
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2526

2627
import org.junit.Assert;
2728
import org.junit.Before;
@@ -30,105 +31,106 @@
3031
import rx.Observable;
3132
import rx.Observer;
3233
import rx.Subscription;
34+
import rx.subscriptions.BooleanSubscription;
3335
import rx.util.AtomicObservableSubscription;
34-
import rx.util.functions.Action1;
36+
import rx.util.Exceptions;
3537
import rx.util.functions.Func1;
3638

3739
public final class OperationConcat {
3840

3941
/**
40-
* Combine the observable sequences from the list of Observables into one observable sequence without any transformation.
41-
*
42-
* @param sequences
43-
* An observable sequence of elements to project.
42+
* Combine the observable sequences from the list of Observables into one
43+
* observable sequence without any transformation. If either the outer
44+
* observable or an inner observable calls onError, we will call onError.
45+
*
46+
* <p/>
47+
*
48+
* The outer observable might run on a separate thread from (one of) the
49+
* inner observables; in this case care must be taken to avoid a deadlock.
50+
* The Concat operation may block the outer thread while servicing an inner
51+
* thread in order to ensure a well-defined ordering of elements; therefore
52+
* none of the inner threads must be implemented in a way that might wait on
53+
* the outer thread.
54+
*
55+
* @param sequences An observable sequence of elements to project.
4456
* @return An observable sequence whose elements are the result of combining the output from the list of Observables.
4557
*/
4658
public static <T> Func1<Observer<T>, Subscription> concat(final Observable<T>... sequences) {
47-
return new Func1<Observer<T>, Subscription>() {
48-
49-
@Override
50-
public Subscription call(Observer<T> observer) {
51-
return new Concat<T>(sequences).call(observer);
52-
}
53-
};
59+
return concat(Observable.toObservable(sequences));
5460
}
5561

5662
public static <T> Func1<Observer<T>, Subscription> concat(final List<Observable<T>> sequences) {
57-
@SuppressWarnings("unchecked")
58-
Observable<T>[] o = sequences.toArray((Observable<T>[]) Array.newInstance(Observable.class, sequences.size()));
59-
return concat(o);
63+
return concat(Observable.toObservable(sequences));
6064
}
6165

6266
public static <T> Func1<Observer<T>, Subscription> concat(final Observable<Observable<T>> sequences) {
63-
final List<Observable<T>> list = new ArrayList<Observable<T>>();
64-
sequences.toList().subscribe(new Action1<List<Observable<T>>>() {
65-
@Override
66-
public void call(List<Observable<T>> t1) {
67-
list.addAll(t1);
68-
}
69-
70-
});
71-
72-
return concat(list);
73-
}
74-
75-
private static class Concat<T> implements Func1<Observer<T>, Subscription> {
76-
private final Observable<T>[] sequences;
77-
private int num = 0;
78-
private int count = 0;
79-
private Subscription s;
80-
81-
Concat(final Observable<T>... sequences) {
82-
this.sequences = sequences;
83-
this.num = sequences.length - 1;
84-
}
85-
86-
private final AtomicObservableSubscription Subscription = new AtomicObservableSubscription();
87-
88-
private final Subscription actualSubscription = new Subscription() {
67+
return new Func1<Observer<T>, Subscription>() {
8968

9069
@Override
91-
public void unsubscribe() {
92-
if (null != s)
93-
s.unsubscribe();
70+
public Subscription call(Observer<T> observer) {
71+
return new ConcatSubscription<T>(sequences, observer);
9472
}
9573
};
74+
}
9675

97-
public Subscription call(Observer<T> observer) {
98-
s = sequences[count].subscribe(new ConcatObserver(observer));
99-
100-
return Subscription.wrap(actualSubscription);
101-
}
102-
103-
private class ConcatObserver implements Observer<T> {
104-
private final Observer<T> observer;
76+
private static class ConcatSubscription<T> extends BooleanSubscription {
77+
// Might be updated by an inner thread's onError during the outer
78+
// thread's onNext, then read in the outer thread's onComplete.
79+
final AtomicBoolean innerError = new AtomicBoolean(false);
10580

106-
ConcatObserver(Observer<T> observer) {
107-
this.observer = observer;
108-
}
109-
110-
@Override
111-
public void onCompleted() {
112-
if (num == count)
113-
observer.onCompleted();
114-
else {
115-
count++;
116-
s = sequences[count].subscribe(this);
81+
public ConcatSubscription(Observable<Observable<T>> sequences, final Observer<T> observer) {
82+
final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription();
83+
outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {
84+
@Override
85+
public void onNext(Observable<T> nextSequence) {
86+
// We will not return from onNext until the inner observer completes.
87+
// NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted.
88+
final CountDownLatch latch = new CountDownLatch(1);
89+
final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription();
90+
innerSubscription.wrap(nextSequence.subscribe(new Observer<T>() {
91+
@Override
92+
public void onNext(T item) {
93+
// If the Concat's subscriber called unsubscribe() before the return of onNext, we must do so also.
94+
observer.onNext(item);
95+
if (isUnsubscribed()) {
96+
innerSubscription.unsubscribe();
97+
outerSubscription.unsubscribe();
98+
}
99+
}
100+
@Override
101+
public void onError(Exception e) {
102+
outerSubscription.unsubscribe();
103+
innerError.set(true);
104+
observer.onError(e);
105+
latch.countDown();
106+
}
107+
@Override
108+
public void onCompleted() {
109+
// Continue on to the next sequence
110+
latch.countDown();
111+
}
112+
}));
113+
try {
114+
latch.await();
115+
} catch (InterruptedException e) {
116+
Thread.currentThread().interrupt();
117+
throw Exceptions.propagate(e);
118+
}
117119
}
118-
}
119-
120-
@Override
121-
public void onError(Exception e) {
122-
observer.onError(e);
123-
124-
}
125-
126-
@Override
127-
public void onNext(T args) {
128-
observer.onNext(args);
129-
130-
}
131-
}
120+
@Override
121+
public void onError(Exception e) {
122+
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
123+
observer.onError(e);
124+
}
125+
@Override
126+
public void onCompleted() {
127+
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
128+
if (!innerError.get()) {
129+
observer.onCompleted();
130+
}
131+
}
132+
}));
133+
}
132134
}
133135

134136
public static class UnitTest {
@@ -193,8 +195,8 @@ public void testConcatWithList() {
193195
public void testConcatUnsubscribe() {
194196
final CountDownLatch callOnce = new CountDownLatch(1);
195197
final CountDownLatch okToContinue = new CountDownLatch(1);
196-
final TestObservable w1 = new TestObservable(null, null, "one", "two", "three");
197-
final TestObservable w2 = new TestObservable(callOnce, okToContinue, "four", "five", "six");
198+
final TestObservable<String> w1 = new TestObservable<String>(null, null, "one", "two", "three");
199+
final TestObservable<String> w2 = new TestObservable<String>(callOnce, okToContinue, "four", "five", "six");
198200

199201
@SuppressWarnings("unchecked")
200202
Observer<String> aObserver = mock(Observer.class);
@@ -256,7 +258,40 @@ public void unsubscribe() {
256258
Assert.assertEquals(expected.length, index);
257259
}
258260

259-
private static class TestObservable extends Observable<String> {
261+
@Test
262+
public void testBlockedObservableOfObservables() {
263+
final String[] o = { "1", "3", "5", "7" };
264+
final String[] e = { "2", "4", "6" };
265+
final Observable<String> odds = Observable.toObservable(o);
266+
final Observable<String> even = Observable.toObservable(e);
267+
final CountDownLatch callOnce = new CountDownLatch(1);
268+
final CountDownLatch okToContinue = new CountDownLatch(1);
269+
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(callOnce, okToContinue, odds, even);
270+
Func1<Observer<String>, Subscription> concatF = concat(observableOfObservables);
271+
Observable<String> concat = Observable.create(concatF);
272+
concat.subscribe(observer);
273+
try {
274+
//Block main thread to allow observables to serve up o1.
275+
callOnce.await();
276+
} catch (Exception ex) {
277+
ex.printStackTrace();
278+
fail(ex.getMessage());
279+
}
280+
// The concated observable should have served up all of the odds.
281+
Assert.assertEquals(o.length, index);
282+
try {
283+
// unblock observables so it can serve up o2 and complete
284+
okToContinue.countDown();
285+
observableOfObservables.t.join();
286+
} catch (Exception ex) {
287+
ex.printStackTrace();
288+
fail(ex.getMessage());
289+
}
290+
// The concatenated observable should now have served up all the evens.
291+
Assert.assertEquals(expected.length, index);
292+
}
293+
294+
private static class TestObservable<T> extends Observable<T> {
260295

261296
private final Subscription s = new Subscription() {
262297

@@ -266,28 +301,28 @@ public void unsubscribe() {
266301
}
267302

268303
};
269-
private final String[] values;
304+
private final List<T> values;
270305
private Thread t = null;
271306
private int count = 0;
272307
private boolean subscribed = true;
273308
private final CountDownLatch once;
274309
private final CountDownLatch okToContinue;
275310

276-
public TestObservable(CountDownLatch once, CountDownLatch okToContinue, String... values) {
277-
this.values = values;
311+
public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) {
312+
this.values = Arrays.asList(values);
278313
this.once = once;
279314
this.okToContinue = okToContinue;
280315
}
281316

282317
@Override
283-
public Subscription subscribe(final Observer<String> observer) {
318+
public Subscription subscribe(final Observer<T> observer) {
284319
t = new Thread(new Runnable() {
285320

286321
@Override
287322
public void run() {
288323
try {
289-
while (count < values.length && subscribed) {
290-
observer.onNext(values[count]);
324+
while (count < values.size() && subscribed) {
325+
observer.onNext(values.get(count));
291326
count++;
292327
//Unblock the main thread to call unsubscribe.
293328
if (null != once)

settings.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ rootProject.name='rxjava'
22
include 'rxjava-core', \
33
'language-adaptors:rxjava-groovy', \
44
'language-adaptors:rxjava-jruby', \
5-
'language-adaptors:rxjava-clojure', \
6-
'language-adaptors:rxjava-scala'
5+
'language-adaptors:rxjava-clojure'
6+

0 commit comments

Comments
 (0)