1515 */
1616package rx .internal .operators ;
1717
18+ import java .util .concurrent .atomic .AtomicReference ;
19+
1820import rx .Observable ;
1921import rx .Observable .Operator ;
2022import rx .Scheduler ;
2123import rx .Subscriber ;
2224import rx .functions .Func1 ;
23- import rx .observables . GroupedObservable ;
25+ import rx .subjects . Subject ;
2426
2527/**
2628 * Identifies unit of work that can be executed in parallel on a given Scheduler.
@@ -38,34 +40,86 @@ public OperatorParallel(Func1<Observable<T>, Observable<R>> f, Scheduler schedul
3840 }
3941
4042 @ Override
41- public Subscriber <? super T > call (Subscriber <? super R > op ) {
43+ public Subscriber <? super T > call (final Subscriber <? super R > child ) {
44+
45+ @ SuppressWarnings ("unchecked" )
46+ final UnicastPassThruSubject <T >[] subjects = new UnicastPassThruSubject [degreeOfParallelism ];
47+ @ SuppressWarnings ("unchecked" )
48+ final Observable <R >[] os = new Observable [degreeOfParallelism ];
49+ for (int i = 0 ; i < subjects .length ; i ++) {
50+ subjects [i ] = UnicastPassThruSubject .<T > create ();
51+ os [i ] = f .call (subjects [i ].observeOn (scheduler ));
52+ }
53+
54+ // subscribe BEFORE receiving data so everything is hooked up
55+ Observable .merge (os ).unsafeSubscribe (child );
56+
57+ return new Subscriber <T >(child ) {
58+
59+ int index = 0 ; // trust that we receive data synchronously
60+
61+ @ Override
62+ public void onCompleted () {
63+ for (UnicastPassThruSubject <T > s : subjects ) {
64+ s .onCompleted ();
65+ }
66+ }
67+
68+ @ Override
69+ public void onError (Throwable e ) {
70+ // bypass the subjects and immediately terminate
71+ child .onError (e );
72+ }
73+
74+ @ Override
75+ public void onNext (T t ) {
76+ // round-robin subjects
77+ subjects [index ++].onNext (t );
78+ if (index >= degreeOfParallelism ) {
79+ index = 0 ;
80+ }
81+ }
82+
83+ };
84+
85+ }
86+
87+ private static class UnicastPassThruSubject <T > extends Subject <T , T > {
88+
89+ private static <T > UnicastPassThruSubject <T > create () {
90+ final AtomicReference <Subscriber <? super T >> subscriber = new AtomicReference <Subscriber <? super T >>();
91+ return new UnicastPassThruSubject <T >(subscriber , new OnSubscribe <T >() {
92+
93+ @ Override
94+ public void call (Subscriber <? super T > s ) {
95+ subscriber .set (s );
96+ }
97+
98+ });
4299
43- Func1 <Subscriber <? super GroupedObservable <Long , T >>, Subscriber <? super T >> groupBy =
44- new OperatorGroupBy <Long , T >(new Func1 <T , Long >() {
100+ }
45101
46- long i = 0 ;
102+ private final AtomicReference < Subscriber <? super T >> subscriber ;
47103
48- @ Override
49- public Long call ( T t ) {
50- return i ++ % degreeOfParallelism ;
51- }
104+ protected UnicastPassThruSubject ( AtomicReference < Subscriber <? super T >> subscriber , OnSubscribe < T > onSubscribe ) {
105+ super ( onSubscribe );
106+ this . subscriber = subscriber ;
107+ }
52108
53- });
109+ @ Override
110+ public void onCompleted () {
111+ subscriber .get ().onCompleted ();
112+ }
54113
55- Func1 <Subscriber <? super Observable <R >>, Subscriber <? super GroupedObservable <Long , T >>> map =
56- new OperatorMap <GroupedObservable <Long , T >, Observable <R >>(
57- new Func1 <GroupedObservable <Long , T >, Observable <R >>() {
114+ @ Override
115+ public void onError (Throwable e ) {
116+ subscriber .get ().onError (e );
117+ }
58118
59- @ Override
60- public Observable <R > call (GroupedObservable <Long , T > g ) {
61- // Must use observeOn not subscribeOn because we have a single source behind groupBy.
62- // The origin is already subscribed to, we are moving each group on to a new thread
63- // but the origin itself can only be on a single thread.
64- return f .call (g .observeOn (scheduler ));
65- }
66- });
119+ @ Override
120+ public void onNext (T t ) {
121+ subscriber .get ().onNext (t );
122+ }
67123
68- // bind together Observers
69- return groupBy .call (map .call (new OperatorMerge <R >().call (op )));
70124 }
71125}
0 commit comments