1616package rx .internal .operators ;
1717
1818import java .util .concurrent .ConcurrentLinkedQueue ;
19- import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
2019import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
2120
2221import rx .Observer ;
2322import rx .Subscriber ;
2423import rx .functions .Action0 ;
24+ import rx .observers .EmptyObserver ;
2525import rx .observers .Subscribers ;
2626import rx .subjects .Subject ;
2727import rx .subscriptions .Subscriptions ;
5151 */
5252public class BufferUntilSubscriber <T > extends Subject <T , T > {
5353
54+ @ SuppressWarnings ("rawtypes" )
55+ private final static Observer EMPTY_OBSERVER = new EmptyObserver ();
56+
5457 /**
5558 * @warn create() undescribed
5659 * @return
@@ -62,25 +65,22 @@ public static <T> BufferUntilSubscriber<T> create() {
6265
6366 /** The common state. */
6467 static final class State <T > {
65- /** The first observer or the one which buffers until the first arrives. */
66- volatile Observer <? super T > observerRef = new BufferedObserver <T >();
67- /** Allow a single subscriber only. */
68- volatile int first ;
68+ volatile Observer <? super T > observerRef = null ;
6969 /** Field updater for observerRef. */
7070 @ SuppressWarnings ("rawtypes" )
7171 static final AtomicReferenceFieldUpdater <State , Observer > OBSERVER_UPDATER
7272 = AtomicReferenceFieldUpdater .newUpdater (State .class , Observer .class , "observerRef" );
73- /** Field updater for first. */
74- @ SuppressWarnings ("rawtypes" )
75- static final AtomicIntegerFieldUpdater <State > FIRST_UPDATER
76- = AtomicIntegerFieldUpdater .newUpdater (State .class , "first" );
77-
78- boolean casFirst (int expected , int next ) {
79- return FIRST_UPDATER .compareAndSet (this , expected , next );
80- }
81- void setObserverRef (Observer <? super T > o ) {
82- observerRef = o ;
73+
74+ boolean casObserverRef (Observer <? super T > expected , Observer <? super T > next ) {
75+ return OBSERVER_UPDATER .compareAndSet (this , expected , next );
8376 }
77+
78+ Object guard = new Object ();
79+ /* protected by guard */
80+ boolean emitting = false ;
81+
82+ final ConcurrentLinkedQueue <Object > buffer = new ConcurrentLinkedQueue <Object >();
83+ final NotificationLite <T > nl = NotificationLite .instance ();
8484 }
8585
8686 static final class OnSubscribeAction <T > implements OnSubscribe <T > {
@@ -92,122 +92,100 @@ public OnSubscribeAction(State<T> state) {
9292
9393 @ Override
9494 public void call (final Subscriber <? super T > s ) {
95- if (state .casFirst (0 , 1 )) {
96- final NotificationLite <T > nl = NotificationLite .instance ();
97- // drain queued notifications before subscription
98- // we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
99- BufferedObserver <? super T > buffered = (BufferedObserver <? super T >)state .observerRef ;
100- Object o ;
101- while ((o = buffered .buffer .poll ()) != null ) {
102- nl .accept (s , o );
103- }
104- // register real observer for pass-thru ... and drain any further events received on first notification
105- state .setObserverRef (new PassThruObserver <T >(s , buffered .buffer , state ));
95+ if (state .casObserverRef (null , s )) {
10696 s .add (Subscriptions .create (new Action0 () {
10797 @ Override
10898 public void call () {
109- state .setObserverRef ( Subscribers . empty ()) ;
99+ state .observerRef = EMPTY_OBSERVER ;
110100 }
111101 }));
102+ boolean win = false ;
103+ synchronized (state .guard ) {
104+ if (!state .emitting ) {
105+ state .emitting = true ;
106+ win = true ;
107+ }
108+ }
109+ if (win ) {
110+ final NotificationLite <T > nl = NotificationLite .instance ();
111+ while (true ) {
112+ Object o ;
113+ while ((o = state .buffer .poll ()) != null ) {
114+ nl .accept (state .observerRef , o );
115+ }
116+ synchronized (state .guard ) {
117+ if (state .buffer .isEmpty ()) {
118+ // Although the buffer is empty, there is still a chance
119+ // that further events may be put into the `buffer`.
120+ // `emit(Object v)` should handle it.
121+ state .emitting = false ;
122+ break ;
123+ }
124+ }
125+ }
126+ }
112127 } else {
113128 s .onError (new IllegalStateException ("Only one subscriber allowed!" ));
114129 }
115130 }
116131
117132 }
118133 final State <T > state ;
119-
134+
135+ private boolean forward = false ;
136+
120137 private BufferUntilSubscriber (State <T > state ) {
121138 super (new OnSubscribeAction <T >(state ));
122139 this .state = state ;
123140 }
124141
125- @ Override
126- public void onCompleted () {
127- state .observerRef .onCompleted ();
128- }
129-
130- @ Override
131- public void onError (Throwable e ) {
132- state .observerRef .onError (e );
142+ private void emit (Object v ) {
143+ synchronized (state .guard ) {
144+ state .buffer .add (v );
145+ if (state .observerRef != null && !state .emitting ) {
146+ // Have an observer and nobody is emitting,
147+ // should drain the `buffer`
148+ forward = true ;
149+ state .emitting = true ;
150+ }
151+ }
152+ if (forward ) {
153+ Object o ;
154+ while ((o = state .buffer .poll ()) != null ) {
155+ state .nl .accept (state .observerRef , o );
156+ }
157+ // Because `emit(Object v)` will be called in sequence,
158+ // no event will be put into `buffer` after we drain it.
159+ }
133160 }
134161
135162 @ Override
136- public void onNext (T t ) {
137- state .observerRef .onNext (t );
138- }
139-
140- /**
141- * This is a temporary observer between buffering and the actual that gets into the line of notifications
142- * from the producer and will drain the queue of any items received during the race of the initial drain and
143- * switching this.
144- *
145- * It will then immediately swap itself out for the actual (after a single notification), but since this is
146- * now being done on the same producer thread no further buffering will occur.
147- */
148- private static final class PassThruObserver <T > extends Subscriber <T > {
149-
150- private final Observer <? super T > actual ;
151- // this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
152- private final ConcurrentLinkedQueue <Object > buffer ;
153- private final State <T > state ;
154-
155- PassThruObserver (Observer <? super T > actual , ConcurrentLinkedQueue <Object > buffer ,
156- State <T > state ) {
157- this .actual = actual ;
158- this .buffer = buffer ;
159- this .state = state ;
163+ public void onCompleted () {
164+ if (forward ) {
165+ state .observerRef .onCompleted ();
160166 }
161-
162- @ Override
163- public void onCompleted () {
164- drainIfNeededAndSwitchToActual ();
165- actual .onCompleted ();
167+ else {
168+ emit (state .nl .completed ());
166169 }
170+ }
167171
168- @ Override
169- public void onError (Throwable e ) {
170- drainIfNeededAndSwitchToActual ();
171- actual .onError (e );
172+ @ Override
173+ public void onError (Throwable e ) {
174+ if ( forward ) {
175+ state . observerRef .onError (e );
172176 }
173-
174- @ Override
175- public void onNext (T t ) {
176- drainIfNeededAndSwitchToActual ();
177- actual .onNext (t );
177+ else {
178+ emit (state .nl .error (e ));
178179 }
179-
180- private void drainIfNeededAndSwitchToActual () {
181- final NotificationLite <T > nl = NotificationLite .instance ();
182- Object o ;
183- while ((o = buffer .poll ()) != null ) {
184- nl .accept (this , o );
185- }
186- // now we can safely change over to the actual and get rid of the pass-thru
187- // but only if not unsubscribed
188- state .setObserverRef (actual );
189- }
190-
191180 }
192181
193- private static final class BufferedObserver <T > extends Subscriber <T > {
194- private final ConcurrentLinkedQueue <Object > buffer = new ConcurrentLinkedQueue <Object >();
195- private static final NotificationLite <Object > nl = NotificationLite .instance ();
196-
197- @ Override
198- public void onCompleted () {
199- buffer .add (nl .completed ());
200- }
201-
202- @ Override
203- public void onError (Throwable e ) {
204- buffer .add (nl .error (e ));
182+ @ Override
183+ public void onNext (T t ) {
184+ if (forward ) {
185+ state .observerRef .onNext (t );
205186 }
206-
207- @ Override
208- public void onNext (T t ) {
209- buffer .add (nl .next (t ));
187+ else {
188+ emit (state .nl .next (t ));
210189 }
211-
212190 }
213191}
0 commit comments