File tree Expand file tree Collapse file tree 1 file changed +30
-21
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +30
-21
lines changed Original file line number Diff line number Diff line change @@ -218,15 +218,17 @@ private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? ext
218218 private void handleScalarSynchronousObservableWithoutRequestLimits (ScalarSynchronousObservable <? extends T > t ) {
219219 T value = t .get ();
220220 if (getEmitLock ()) {
221+ boolean moreToDrain ;
221222 try {
222223 actual .onNext (value );
223- return ;
224224 } finally {
225- if (releaseEmitLock ()) {
226- drainQueuesIfNeeded ();
227- }
228- request (1 );
225+ moreToDrain = releaseEmitLock ();
229226 }
227+ if (moreToDrain ) {
228+ drainQueuesIfNeeded ();
229+ }
230+ request (1 );
231+ return ;
230232 } else {
231233 initScalarValueQueueIfNeeded ();
232234 try {
@@ -241,22 +243,28 @@ private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchro
241243 private void handleScalarSynchronousObservableWithRequestLimits (ScalarSynchronousObservable <? extends T > t ) {
242244 if (getEmitLock ()) {
243245 boolean emitted = false ;
246+ boolean moreToDrain ;
247+ boolean isReturn = false ;
244248 try {
245249 long r = mergeProducer .requested ;
246250 if (r > 0 ) {
247251 emitted = true ;
248252 actual .onNext (t .get ());
249253 MergeProducer .REQUESTED .decrementAndGet (mergeProducer );
250254 // we handle this Observable without ever incrementing the wip or touching other machinery so just return here
251- return ;
255+ isReturn = true ;
252256 }
253257 } finally {
254- if (releaseEmitLock ()) {
255- drainQueuesIfNeeded ();
256- }
257- if (emitted ) {
258- request (1 );
259- }
258+ moreToDrain = releaseEmitLock ();
259+ }
260+ if (moreToDrain ) {
261+ drainQueuesIfNeeded ();
262+ }
263+ if (emitted ) {
264+ request (1 );
265+ }
266+ if (isReturn ) {
267+ return ;
260268 }
261269 }
262270
@@ -301,20 +309,21 @@ private boolean drainQueuesIfNeeded() {
301309 while (true ) {
302310 if (getEmitLock ()) {
303311 int emitted = 0 ;
312+ boolean moreToDrain ;
304313 try {
305314 emitted = drainScalarValueQueue ();
306315 drainChildrenQueues ();
307316 } finally {
308- boolean moreToDrain = releaseEmitLock ();
309- // request outside of lock
310- if (emitted > 0 ) {
311- request (emitted );
312- }
313- if (!moreToDrain ) {
314- return true ;
315- }
316- // otherwise we'll loop and get whatever was added
317+ moreToDrain = releaseEmitLock ();
318+ }
319+ // request outside of lock
320+ if (emitted > 0 ) {
321+ request (emitted );
322+ }
323+ if (!moreToDrain ) {
324+ return true ;
317325 }
326+ // otherwise we'll loop and get whatever was added
318327 } else {
319328 return false ;
320329 }
You can’t perform that action at this time.
0 commit comments