File tree Expand file tree Collapse file tree 1 file changed +19
-13
lines changed
rxjava-core/src/main/java/rx/observers Expand file tree Collapse file tree 1 file changed +19
-13
lines changed Original file line number Diff line number Diff line change @@ -128,6 +128,7 @@ public void onNext(T t) {
128128 }
129129
130130 // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
131+ boolean skipFinal = false ;
131132 try {
132133 int iter = MAX_DRAIN_ITERATION ;
133134 do {
@@ -141,27 +142,32 @@ public void onNext(T t) {
141142 synchronized (this ) {
142143 list = queue ;
143144 queue = null ;
144- }
145- if (list == null ) {
146- break ;
145+ if (list == null ) {
146+ emitting = false ;
147+ skipFinal = true ;
148+ return ;
149+ }
147150 }
148151 }
149152 } while (iter > 0 );
150153 } finally {
151- synchronized (this ) {
152- if (terminated ) {
153- list = queue ;
154- queue = null ;
155- } else {
156- emitting = false ;
157- list = null ;
154+ if (!skipFinal ) {
155+ synchronized (this ) {
156+ if (terminated ) {
157+ list = queue ;
158+ queue = null ;
159+ } else {
160+ emitting = false ;
161+ list = null ;
162+ }
158163 }
159164 }
160- // this will only drain if terminated (done here outside of synchronized block)
161- drainQueue (list );
162165 }
166+
167+ // this will only drain if terminated (done here outside of synchronized block)
168+ drainQueue (list );
163169 }
164-
170+
165171 void drainQueue (FastList list ) {
166172 if (list == null || list .size == 0 ) {
167173 return ;
You can’t perform that action at this time.
0 commit comments