Skip to content

Commit a8c1e3c

Browse files
committed
OperatorConcat
1 parent 4e77f8a commit a8c1e3c

File tree

4 files changed

+206
-203
lines changed

4 files changed

+206
-203
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import rx.operators.OnSubscribeRange;
5151
import rx.operators.OperationBuffer;
5252
import rx.operators.OperationCombineLatest;
53-
import rx.operators.OperationConcat;
5453
import rx.operators.OperationDebounce;
5554
import rx.operators.OperationDefaultIfEmpty;
5655
import rx.operators.OperationDefer;
@@ -95,6 +94,7 @@
9594
import rx.operators.OperatorAsObservable;
9695
import rx.operators.OperatorCache;
9796
import rx.operators.OperatorCast;
97+
import rx.operators.OperatorConcat;
9898
import rx.operators.OperatorDoOnEach;
9999
import rx.operators.OperatorElementAt;
100100
import rx.operators.OperatorFilter;
@@ -740,7 +740,7 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combin
740740
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.concat.aspx">MSDN: Observable.Concat</a>
741741
*/
742742
public final static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) {
743-
return create(OperationConcat.concat(observables));
743+
return observables.lift(new OperatorConcat<T>());
744744
}
745745

746746
/**
@@ -761,7 +761,7 @@ public final static <T> Observable<T> concat(Observable<? extends Observable<? e
761761
@SuppressWarnings("unchecked")
762762
// suppress because the types are checked by the method signature before using a vararg
763763
public final static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2) {
764-
return create(OperationConcat.concat(t1, t2));
764+
return concat(from(t1, t2));
765765
}
766766

767767
/**
@@ -784,7 +784,7 @@ public final static <T> Observable<T> concat(Observable<? extends T> t1, Observa
784784
@SuppressWarnings("unchecked")
785785
// suppress because the types are checked by the method signature before using a vararg
786786
public final static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3) {
787-
return create(OperationConcat.concat(t1, t2, t3));
787+
return concat(from(t1, t2, t3));
788788
}
789789

790790
/**
@@ -809,7 +809,7 @@ public final static <T> Observable<T> concat(Observable<? extends T> t1, Observa
809809
@SuppressWarnings("unchecked")
810810
// suppress because the types are checked by the method signature before using a vararg
811811
public final static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4) {
812-
return create(OperationConcat.concat(t1, t2, t3, t4));
812+
return concat(from(t1, t2, t3, t4));
813813
}
814814

815815
/**
@@ -836,7 +836,7 @@ public final static <T> Observable<T> concat(Observable<? extends T> t1, Observa
836836
@SuppressWarnings("unchecked")
837837
// suppress because the types are checked by the method signature before using a vararg
838838
public final static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5) {
839-
return create(OperationConcat.concat(t1, t2, t3, t4, t5));
839+
return concat(from(t1, t2, t3, t4, t5));
840840
}
841841

842842
/**
@@ -865,7 +865,7 @@ public final static <T> Observable<T> concat(Observable<? extends T> t1, Observa
865865
@SuppressWarnings("unchecked")
866866
// suppress because the types are checked by the method signature before using a vararg
867867
public final static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6) {
868-
return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6));
868+
return concat(from(t1, t2, t3, t4, t5, t6));
869869
}
870870

871871
/**
@@ -896,7 +896,7 @@ public final static <T> Observable<T> concat(Observable<? extends T> t1, Observa
896896
@SuppressWarnings("unchecked")
897897
// suppress because the types are checked by the method signature before using a vararg
898898
public final static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7) {
899-
return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7));
899+
return concat(from(t1, t2, t3, t4, t5, t6, t7));
900900
}
901901

902902
/**
@@ -929,7 +929,7 @@ public final static <T> Observable<T> concat(Observable<? extends T> t1, Observa
929929
@SuppressWarnings("unchecked")
930930
// suppress because the types are checked by the method signature before using a vararg
931931
public final static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8) {
932-
return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7, t8));
932+
return concat(from(t1, t2, t3, t4, t5, t6, t7, t8));
933933
}
934934

935935
/**
@@ -964,7 +964,7 @@ public final static <T> Observable<T> concat(Observable<? extends T> t1, Observa
964964
@SuppressWarnings("unchecked")
965965
// suppress because the types are checked by the method signature before using a vararg
966966
public final static <T> Observable<T> concat(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8, Observable<? extends T> t9) {
967-
return create(OperationConcat.concat(t1, t2, t3, t4, t5, t6, t7, t8, t9));
967+
return concat(from(t1, t2, t3, t4, t5, t6, t7, t8, t9));
968968
}
969969

970970
/**

rxjava-core/src/main/java/rx/operators/OperationConcat.java

Lines changed: 0 additions & 175 deletions
This file was deleted.
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
import rx.Observable;
21+
import rx.Observable.Operator;
22+
import rx.Subscriber;
23+
import rx.functions.Action0;
24+
import rx.observers.SerializedSubscriber;
25+
import rx.subscriptions.SerialSubscription;
26+
import rx.subscriptions.Subscriptions;
27+
28+
/**
29+
* Returns an Observable that emits the items emitted by two or more Observables, one after the
30+
* other.
31+
* <p>
32+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/concat.png">
33+
*/
34+
public final class OperatorConcat<T> implements Operator<T, Observable<? extends T>> {
35+
final NotificationLite<Observable<? extends T>> nl = NotificationLite.instance();
36+
@Override
37+
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
38+
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);
39+
final SerialSubscription current = new SerialSubscription();
40+
child.add(current);
41+
return new ConcatSubscriber(s, current);
42+
}
43+
44+
final class ConcatSubscriber extends Subscriber<Observable<? extends T>> {
45+
46+
private final Subscriber<T> s;
47+
private final SerialSubscription current;
48+
final ConcurrentLinkedQueue<Object> queue;
49+
final AtomicInteger wip;
50+
51+
public ConcatSubscriber(Subscriber<T> s, SerialSubscription current) {
52+
super(s);
53+
this.s = s;
54+
this.current = current;
55+
this.queue = new ConcurrentLinkedQueue<Object>();
56+
this.wip = new AtomicInteger();
57+
add(Subscriptions.create(new Action0() {
58+
@Override
59+
public void call() {
60+
queue.clear();
61+
}
62+
}));
63+
}
64+
65+
@Override
66+
public void onNext(Observable<? extends T> t) {
67+
queue.add(nl.next(t));
68+
if (wip.getAndIncrement() == 0) {
69+
subscribeNext();
70+
}
71+
}
72+
73+
@Override
74+
public void onError(Throwable e) {
75+
s.onError(e);
76+
unsubscribe();
77+
}
78+
79+
@Override
80+
public void onCompleted() {
81+
queue.add(nl.completed());
82+
if (wip.getAndIncrement() == 0) {
83+
subscribeNext();
84+
}
85+
}
86+
87+
void subscribeNext() {
88+
Object o = queue.poll();
89+
if (nl.isCompleted(o)) {
90+
s.onCompleted();
91+
} else
92+
if (o != null) {
93+
Observable<? extends T> obs = nl.getValue(o);
94+
Subscriber<T> sourceSub = new Subscriber<T>() {
95+
96+
@Override
97+
public void onNext(T t) {
98+
s.onNext(t);
99+
}
100+
101+
@Override
102+
public void onError(Throwable e) {
103+
ConcatSubscriber.this.onError(e);
104+
}
105+
106+
@Override
107+
public void onCompleted() {
108+
if (wip.decrementAndGet() > 0) {
109+
subscribeNext();
110+
}
111+
}
112+
113+
};
114+
current.set(sourceSub);
115+
obs.unsafeSubscribe(sourceSub);
116+
}
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)