15
15
*/
16
16
package rx .internal .operators ;
17
17
18
+ import java .util .concurrent .atomic .AtomicReference ;
19
+
18
20
import rx .Observable ;
19
21
import rx .Observable .Operator ;
20
22
import rx .Scheduler ;
21
23
import rx .Subscriber ;
22
24
import rx .functions .Func1 ;
23
- import rx .observables . GroupedObservable ;
25
+ import rx .subjects . Subject ;
24
26
25
27
/**
26
28
* Identifies unit of work that can be executed in parallel on a given Scheduler.
@@ -38,34 +40,86 @@ public OperatorParallel(Func1<Observable<T>, Observable<R>> f, Scheduler schedul
38
40
}
39
41
40
42
@ Override
41
- public Subscriber <? super T > call (Subscriber <? super R > op ) {
43
+ public Subscriber <? super T > call (final Subscriber <? super R > child ) {
44
+
45
+ @ SuppressWarnings ("unchecked" )
46
+ final UnicastPassThruSubject <T >[] subjects = new UnicastPassThruSubject [degreeOfParallelism ];
47
+ @ SuppressWarnings ("unchecked" )
48
+ final Observable <R >[] os = new Observable [degreeOfParallelism ];
49
+ for (int i = 0 ; i < subjects .length ; i ++) {
50
+ subjects [i ] = UnicastPassThruSubject .<T > create ();
51
+ os [i ] = f .call (subjects [i ].observeOn (scheduler ));
52
+ }
53
+
54
+ // subscribe BEFORE receiving data so everything is hooked up
55
+ Observable .merge (os ).unsafeSubscribe (child );
56
+
57
+ return new Subscriber <T >(child ) {
58
+
59
+ int index = 0 ; // trust that we receive data synchronously
60
+
61
+ @ Override
62
+ public void onCompleted () {
63
+ for (UnicastPassThruSubject <T > s : subjects ) {
64
+ s .onCompleted ();
65
+ }
66
+ }
67
+
68
+ @ Override
69
+ public void onError (Throwable e ) {
70
+ // bypass the subjects and immediately terminate
71
+ child .onError (e );
72
+ }
73
+
74
+ @ Override
75
+ public void onNext (T t ) {
76
+ // round-robin subjects
77
+ subjects [index ++].onNext (t );
78
+ if (index >= degreeOfParallelism ) {
79
+ index = 0 ;
80
+ }
81
+ }
82
+
83
+ };
84
+
85
+ }
86
+
87
+ private static class UnicastPassThruSubject <T > extends Subject <T , T > {
88
+
89
+ private static <T > UnicastPassThruSubject <T > create () {
90
+ final AtomicReference <Subscriber <? super T >> subscriber = new AtomicReference <Subscriber <? super T >>();
91
+ return new UnicastPassThruSubject <T >(subscriber , new OnSubscribe <T >() {
92
+
93
+ @ Override
94
+ public void call (Subscriber <? super T > s ) {
95
+ subscriber .set (s );
96
+ }
97
+
98
+ });
42
99
43
- Func1 <Subscriber <? super GroupedObservable <Long , T >>, Subscriber <? super T >> groupBy =
44
- new OperatorGroupBy <Long , T >(new Func1 <T , Long >() {
100
+ }
45
101
46
- long i = 0 ;
102
+ private final AtomicReference < Subscriber <? super T >> subscriber ;
47
103
48
- @ Override
49
- public Long call ( T t ) {
50
- return i ++ % degreeOfParallelism ;
51
- }
104
+ protected UnicastPassThruSubject ( AtomicReference < Subscriber <? super T >> subscriber , OnSubscribe < T > onSubscribe ) {
105
+ super ( onSubscribe );
106
+ this . subscriber = subscriber ;
107
+ }
52
108
53
- });
109
+ @ Override
110
+ public void onCompleted () {
111
+ subscriber .get ().onCompleted ();
112
+ }
54
113
55
- Func1 <Subscriber <? super Observable <R >>, Subscriber <? super GroupedObservable <Long , T >>> map =
56
- new OperatorMap <GroupedObservable <Long , T >, Observable <R >>(
57
- new Func1 <GroupedObservable <Long , T >, Observable <R >>() {
114
+ @ Override
115
+ public void onError (Throwable e ) {
116
+ subscriber .get ().onError (e );
117
+ }
58
118
59
- @ Override
60
- public Observable <R > call (GroupedObservable <Long , T > g ) {
61
- // Must use observeOn not subscribeOn because we have a single source behind groupBy.
62
- // The origin is already subscribed to, we are moving each group on to a new thread
63
- // but the origin itself can only be on a single thread.
64
- return f .call (g .observeOn (scheduler ));
65
- }
66
- });
119
+ @ Override
120
+ public void onNext (T t ) {
121
+ subscriber .get ().onNext (t );
122
+ }
67
123
68
- // bind together Observers
69
- return groupBy .call (map .call (new OperatorMerge <R >().call (op )));
70
124
}
71
125
}
0 commit comments