1616package rx .observers ;
1717
1818import rx .Observer ;
19- import rx .exceptions .Exceptions ;
19+ import rx .exceptions .*;
20+ import rx .internal .operators .NotificationLite ;
2021
2122/**
2223 * Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and
3536public class SerializedObserver <T > implements Observer <T > {
3637 private final Observer <? super T > actual ;
3738
38- private boolean emitting = false ;
39- private boolean terminated = false ;
39+ private boolean emitting ;
40+ /** Set to true if a terminal event was received. */
41+ private volatile boolean terminated ;
42+ /** If not null, it indicates more work. */
4043 private FastList queue ;
44+ private final NotificationLite <T > nl = NotificationLite .instance ();
4145
42- private static final int MAX_DRAIN_ITERATION = Integer .MAX_VALUE ;
43- private static final Object NULL_SENTINEL = new Object ();
44- private static final Object COMPLETE_SENTINEL = new Object ();
46+ /** Number of iterations without additional safepoint poll in the drain loop. */
47+ private static final int MAX_DRAIN_ITERATION = 1024 ;
4548
4649 static final class FastList {
4750 Object [] array ;
@@ -64,150 +67,119 @@ public void add(Object o) {
6467 }
6568 }
6669
67- private static final class ErrorSentinel {
68- final Throwable e ;
69-
70- ErrorSentinel (Throwable e ) {
71- this .e = e ;
72- }
73- }
74-
7570 public SerializedObserver (Observer <? super T > s ) {
7671 this .actual = s ;
7772 }
7873
7974 @ Override
80- public void onCompleted () {
81- FastList list ;
75+ public void onNext (T t ) {
76+ if (terminated ) {
77+ return ;
78+ }
8279 synchronized (this ) {
8380 if (terminated ) {
8481 return ;
8582 }
86- terminated = true ;
8783 if (emitting ) {
88- if (queue == null ) {
89- queue = new FastList ();
84+ FastList list = queue ;
85+ if (list == null ) {
86+ list = new FastList ();
87+ queue = list ;
9088 }
91- queue .add (COMPLETE_SENTINEL );
89+ list .add (nl . next ( t ) );
9290 return ;
9391 }
9492 emitting = true ;
95- list = queue ;
96- queue = null ;
9793 }
98- drainQueue (list );
99- actual .onCompleted ();
94+ try {
95+ actual .onNext (t );
96+ } catch (Throwable e ) {
97+ terminated = true ;
98+ Exceptions .throwIfFatal (e );
99+ actual .onError (OnErrorThrowable .addValueAsLastCause (e , t ));
100+ return ;
101+ }
102+ for (;;) {
103+ for (int i = 0 ; i < MAX_DRAIN_ITERATION ; i ++) {
104+ FastList list ;
105+ synchronized (this ) {
106+ list = queue ;
107+ if (list == null ) {
108+ emitting = false ;
109+ return ;
110+ }
111+ queue = null ;
112+ }
113+ for (Object o : list .array ) {
114+ if (o == null ) {
115+ break ;
116+ }
117+ try {
118+ if (nl .accept (actual , o )) {
119+ terminated = true ;
120+ return ;
121+ }
122+ } catch (Throwable e ) {
123+ terminated = true ;
124+ Exceptions .throwIfFatal (e );
125+ actual .onError (OnErrorThrowable .addValueAsLastCause (e , t ));
126+ return ;
127+ }
128+ }
129+ }
130+ }
100131 }
101-
132+
102133 @ Override
103134 public void onError (final Throwable e ) {
104135 Exceptions .throwIfFatal (e );
105- FastList list ;
136+ if (terminated ) {
137+ return ;
138+ }
106139 synchronized (this ) {
107140 if (terminated ) {
108141 return ;
109142 }
143+ terminated = true ;
110144 if (emitting ) {
111- if (queue == null ) {
112- queue = new FastList ();
145+ /*
146+ * FIXME: generally, errors jump the queue but this wasn't true
147+ * for SerializedObserver and may break existing expectations.
148+ */
149+ FastList list = queue ;
150+ if (list == null ) {
151+ list = new FastList ();
152+ queue = list ;
113153 }
114- queue .add (new ErrorSentinel (e ));
154+ list .add (nl . error (e ));
115155 return ;
116156 }
117157 emitting = true ;
118- list = queue ;
119- queue = null ;
120158 }
121- drainQueue (list );
122159 actual .onError (e );
123- synchronized (this ) {
124- emitting = false ;
125- }
126160 }
127161
128162 @ Override
129- public void onNext (T t ) {
130- FastList list ;
131-
163+ public void onCompleted () {
164+ if (terminated ) {
165+ return ;
166+ }
132167 synchronized (this ) {
133168 if (terminated ) {
134169 return ;
135170 }
171+ terminated = true ;
136172 if (emitting ) {
137- if (queue == null ) {
138- queue = new FastList ();
173+ FastList list = queue ;
174+ if (list == null ) {
175+ list = new FastList ();
176+ queue = list ;
139177 }
140- queue .add (t != null ? t : NULL_SENTINEL );
141- // another thread is emitting so we add to the queue and return
178+ list .add (nl .completed ());
142179 return ;
143180 }
144- // we can emit
145181 emitting = true ;
146- // reference to the list to drain before emitting our value
147- list = queue ;
148- queue = null ;
149- }
150-
151- // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
152- boolean skipFinal = false ;
153- try {
154- int iter = MAX_DRAIN_ITERATION ;
155- do {
156- drainQueue (list );
157- if (iter == MAX_DRAIN_ITERATION ) {
158- // after the first draining we emit our own value
159- actual .onNext (t );
160- }
161- --iter ;
162- if (iter > 0 ) {
163- synchronized (this ) {
164- list = queue ;
165- queue = null ;
166- if (list == null ) {
167- emitting = false ;
168- skipFinal = true ;
169- return ;
170- }
171- }
172- }
173- } while (iter > 0 );
174- } finally {
175- if (!skipFinal ) {
176- synchronized (this ) {
177- if (terminated ) {
178- list = queue ;
179- queue = null ;
180- } else {
181- emitting = false ;
182- list = null ;
183- }
184- }
185- }
186- }
187-
188- // this will only drain if terminated (done here outside of synchronized block)
189- drainQueue (list );
190- }
191-
192- void drainQueue (FastList list ) {
193- if (list == null || list .size == 0 ) {
194- return ;
195- }
196- for (Object v : list .array ) {
197- if (v == null ) {
198- break ;
199- }
200- if (v == NULL_SENTINEL ) {
201- actual .onNext (null );
202- } else if (v == COMPLETE_SENTINEL ) {
203- actual .onCompleted ();
204- } else if (v .getClass () == ErrorSentinel .class ) {
205- actual .onError (((ErrorSentinel ) v ).e );
206- } else {
207- @ SuppressWarnings ("unchecked" )
208- T t = (T )v ;
209- actual .onNext (t );
210- }
211182 }
183+ actual .onCompleted ();
212184 }
213185}
0 commit comments