1616package rx .internal .operators ;
1717
1818import java .util .concurrent .ConcurrentLinkedQueue ;
19- import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
2019import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
2120
2221import rx .Observer ;
@@ -62,25 +61,22 @@ public static <T> BufferUntilSubscriber<T> create() {
6261
6362 /** The common state. */
6463 static final class State <T > {
65- /** The first observer or the one which buffers until the first arrives. */
66- volatile Observer <? super T > observerRef = new BufferedObserver <T >();
67- /** Allow a single subscriber only. */
68- volatile int first ;
64+ volatile Observer <? super T > observerRef = null ;
6965 /** Field updater for observerRef. */
7066 @ SuppressWarnings ("rawtypes" )
7167 static final AtomicReferenceFieldUpdater <State , Observer > OBSERVER_UPDATER
7268 = AtomicReferenceFieldUpdater .newUpdater (State .class , Observer .class , "observerRef" );
73- /** Field updater for first. */
74- @ SuppressWarnings ("rawtypes" )
75- static final AtomicIntegerFieldUpdater <State > FIRST_UPDATER
76- = AtomicIntegerFieldUpdater .newUpdater (State .class , "first" );
77-
78- boolean casFirst (int expected , int next ) {
79- return FIRST_UPDATER .compareAndSet (this , expected , next );
80- }
81- void setObserverRef (Observer <? super T > o ) {
82- observerRef = o ;
69+
70+ boolean casObserverRef (Observer <? super T > expected , Observer <? super T > next ) {
71+ return OBSERVER_UPDATER .compareAndSet (this , expected , next );
8372 }
73+
74+ Object guard = new Object ();
75+ /* protected by guard */
76+ boolean emitting = false ;
77+
78+ final ConcurrentLinkedQueue <Object > buffer = new ConcurrentLinkedQueue <Object >();
79+ final NotificationLite <T > nl = NotificationLite .instance ();
8480 }
8581
8682 static final class OnSubscribeAction <T > implements OnSubscribe <T > {
@@ -92,122 +88,100 @@ public OnSubscribeAction(State<T> state) {
9288
9389 @ Override
9490 public void call (final Subscriber <? super T > s ) {
95- if (state .casFirst (0 , 1 )) {
96- final NotificationLite <T > nl = NotificationLite .instance ();
97- // drain queued notifications before subscription
98- // we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
99- BufferedObserver <? super T > buffered = (BufferedObserver <? super T >)state .observerRef ;
100- Object o ;
101- while ((o = buffered .buffer .poll ()) != null ) {
102- nl .accept (s , o );
103- }
104- // register real observer for pass-thru ... and drain any further events received on first notification
105- state .setObserverRef (new PassThruObserver <T >(s , buffered .buffer , state ));
91+ if (state .casObserverRef (null , s )) {
10692 s .add (Subscriptions .create (new Action0 () {
10793 @ Override
10894 public void call () {
109- state .setObserverRef ( Subscribers .empty () );
95+ state .observerRef = Subscribers .empty ();
11096 }
11197 }));
98+ boolean win = false ;
99+ synchronized (state .guard ) {
100+ if (!state .emitting ) {
101+ state .emitting = true ;
102+ win = true ;
103+ }
104+ }
105+ if (win ) {
106+ final NotificationLite <T > nl = NotificationLite .instance ();
107+ while (true ) {
108+ Object o ;
109+ while ((o = state .buffer .poll ()) != null ) {
110+ nl .accept (state .observerRef , o );
111+ }
112+ synchronized (state .guard ) {
113+ if (state .buffer .isEmpty ()) {
114+ // Although the buffer is empty, there is still a chance
115+ // that further events may be put into the `buffer`.
116+ // `emit(Object v)` should handle it.
117+ state .emitting = false ;
118+ break ;
119+ }
120+ }
121+ }
122+ }
112123 } else {
113124 s .onError (new IllegalStateException ("Only one subscriber allowed!" ));
114125 }
115126 }
116127
117128 }
118129 final State <T > state ;
119-
130+
131+ private boolean forward = false ;
132+
120133 private BufferUntilSubscriber (State <T > state ) {
121134 super (new OnSubscribeAction <T >(state ));
122135 this .state = state ;
123136 }
124137
125- @ Override
126- public void onCompleted () {
127- state .observerRef .onCompleted ();
128- }
129-
130- @ Override
131- public void onError (Throwable e ) {
132- state .observerRef .onError (e );
138+ private void emit (Object v ) {
139+ synchronized (state .guard ) {
140+ state .buffer .add (v );
141+ if (state .observerRef != null && !state .emitting ) {
142+ // Have an observer and nobody is emitting,
143+ // should drain the `buffer`
144+ forward = true ;
145+ state .emitting = true ;
146+ }
147+ }
148+ if (forward ) {
149+ Object o ;
150+ while ((o = state .buffer .poll ()) != null ) {
151+ state .nl .accept (state .observerRef , o );
152+ }
153+ // Because `emit(Object v)` will be called in sequence,
154+ // no event will be put into `buffer` after we drain it.
155+ }
133156 }
134157
135158 @ Override
136- public void onNext (T t ) {
137- state .observerRef .onNext (t );
138- }
139-
140- /**
141- * This is a temporary observer between buffering and the actual that gets into the line of notifications
142- * from the producer and will drain the queue of any items received during the race of the initial drain and
143- * switching this.
144- *
145- * It will then immediately swap itself out for the actual (after a single notification), but since this is
146- * now being done on the same producer thread no further buffering will occur.
147- */
148- private static final class PassThruObserver <T > extends Subscriber <T > {
149-
150- private final Observer <? super T > actual ;
151- // this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
152- private final ConcurrentLinkedQueue <Object > buffer ;
153- private final State <T > state ;
154-
155- PassThruObserver (Observer <? super T > actual , ConcurrentLinkedQueue <Object > buffer ,
156- State <T > state ) {
157- this .actual = actual ;
158- this .buffer = buffer ;
159- this .state = state ;
160- }
161-
162- @ Override
163- public void onCompleted () {
164- drainIfNeededAndSwitchToActual ();
165- actual .onCompleted ();
159+ public void onCompleted () {
160+ if (forward ) {
161+ state .observerRef .onCompleted ();
166162 }
167-
168- @ Override
169- public void onError (Throwable e ) {
170- drainIfNeededAndSwitchToActual ();
171- actual .onError (e );
163+ else {
164+ emit (state .nl .completed ());
172165 }
166+ }
173167
174- @ Override
175- public void onNext ( T t ) {
176- drainIfNeededAndSwitchToActual ();
177- actual . onNext ( t );
168+ @ Override
169+ public void onError ( Throwable e ) {
170+ if ( forward ) {
171+ state . observerRef . onError ( e );
178172 }
179-
180- private void drainIfNeededAndSwitchToActual () {
181- final NotificationLite <T > nl = NotificationLite .instance ();
182- Object o ;
183- while ((o = buffer .poll ()) != null ) {
184- nl .accept (this , o );
185- }
186- // now we can safely change over to the actual and get rid of the pass-thru
187- // but only if not unsubscribed
188- state .setObserverRef (actual );
173+ else {
174+ emit (state .nl .error (e ));
189175 }
190-
191176 }
192177
193- private static final class BufferedObserver <T > extends Subscriber <T > {
194- private final ConcurrentLinkedQueue <Object > buffer = new ConcurrentLinkedQueue <Object >();
195- private static final NotificationLite <Object > nl = NotificationLite .instance ();
196-
197- @ Override
198- public void onCompleted () {
199- buffer .add (nl .completed ());
200- }
201-
202- @ Override
203- public void onError (Throwable e ) {
204- buffer .add (nl .error (e ));
178+ @ Override
179+ public void onNext (T t ) {
180+ if (forward ) {
181+ state .observerRef .onNext (t );
205182 }
206-
207- @ Override
208- public void onNext (T t ) {
209- buffer .add (nl .next (t ));
183+ else {
184+ emit (state .nl .next (t ));
210185 }
211-
212186 }
213187}
0 commit comments