1515 */
1616package rx .operators ;
1717
18- import java .util .LinkedHashSet ;
19- import java .util .Set ;
2018import java .util .concurrent .ConcurrentLinkedQueue ;
2119import java .util .concurrent .atomic .AtomicBoolean ;
2220import java .util .concurrent .atomic .AtomicReference ;
@@ -59,14 +57,8 @@ static final class State<T> {
5957 final NotificationLite <T > nl = NotificationLite .instance ();
6058 /** The first observer or the one which buffers until the first arrives. */
6159 final AtomicReference <Observer <? super T >> observerRef = new AtomicReference <Observer <? super T >>(new BufferedObserver <T >());
62- /** How many subscribers . */
60+ /** Allow a single subscriber only . */
6361 final AtomicBoolean first = new AtomicBoolean ();
64- /** The rest of the subscribers without buffering. Guarded by this. */
65- final Set <Subscriber <? super T >> subscribers = new LinkedHashSet <Subscriber <? super T >>();
66- /** Guarded by this. */
67- boolean done ;
68- /** Guarded by this. */
69- Throwable exception ;
7062 }
7163
7264 static final class OnSubscribeAction <T > implements OnSubscribe <T > {
@@ -95,33 +87,7 @@ public void call() {
9587 }
9688 }));
9789 } else {
98- Throwable e = null ;
99- boolean done ;
100- synchronized (state ) {
101- done = state .done ;
102- if (!done ) {
103- state .subscribers .add (s );
104- } else {
105- e = state .exception ;
106- }
107- }
108- if (done ) {
109- if (e != null ) {
110- s .onError (e );
111- } else {
112- s .onCompleted ();
113- }
114- return ;
115- }
116- s .add (Subscriptions .create (new Action0 () {
117-
118- @ Override
119- public void call () {
120- synchronized (state ) {
121- state .subscribers .remove (s );
122- }
123- }
124- }));
90+ s .onError (new IllegalStateException ("Only one subscriber allowed!" ));
12591 }
12692 }
12793
@@ -136,64 +102,17 @@ private BufferUntilSubscriber(State<T> state) {
136102 @ Override
137103 public void onCompleted () {
138104 state .observerRef .get ().onCompleted ();
139- // notify the rest
140- Subscriber <?>[] list ;
141- synchronized (state ) {
142- if (!state .done ) {
143- return ;
144- }
145- state .done = true ;
146- if (state .subscribers .isEmpty ()) {
147- return ;
148- }
149- list = state .subscribers .toArray (new Subscriber <?>[state .subscribers .size ()]);
150- state .subscribers .clear ();
151- }
152- for (Subscriber <?> s : list ) {
153- s .onCompleted ();
154- }
155105 }
156106
157107 @ Override
158108 public void onError (Throwable e ) {
159109 state .observerRef .get ().onError (e );
160- // notify the rest
161- Subscriber <?>[] list ;
162- synchronized (state ) {
163- if (!state .done ) {
164- return ;
165- }
166- state .done = true ;
167- state .exception = e ;
168- if (state .subscribers .isEmpty ()) {
169- return ;
170- }
171- list = state .subscribers .toArray (new Subscriber <?>[state .subscribers .size ()]);
172- state .subscribers .clear ();
173- }
174- for (Subscriber <?> s : list ) {
175- s .onError (e );
176- }
177110 }
178111
179112 @ Override
180113 @ SuppressWarnings ({ "unchecked" , "rawtypes" })
181114 public void onNext (T t ) {
182115 state .observerRef .get ().onNext (t );
183- // notify the rest
184- Subscriber [] list ;
185- synchronized (state ) {
186- if (state .done ) {
187- return ;
188- }
189- if (state .subscribers .isEmpty ()) {
190- return ;
191- }
192- list = state .subscribers .toArray (new Subscriber [state .subscribers .size ()]);
193- }
194- for (Subscriber s : list ) {
195- s .onNext (t );
196- }
197116 }
198117
199118 /**
@@ -242,6 +161,7 @@ private void drainIfNeededAndSwitchToActual() {
242161 nl .accept (this , o );
243162 }
244163 // now we can safely change over to the actual and get rid of the pass-thru
164+ // but only if not unsubscribed
245165 observerRef .compareAndSet (this , actual );
246166 }
247167
0 commit comments