File tree Expand file tree Collapse file tree 1 file changed +14
-15
lines changed
rxjava-core/src/main/java/rx/observers Expand file tree Collapse file tree 1 file changed +14
-15
lines changed Original file line number Diff line number Diff line change @@ -63,25 +63,24 @@ public void onNext(T t) {
6363 } while (!state .compareAndSet (current , newState ));
6464
6565 if (newState .shouldProcess ()) {
66- if (newState == State .PROCESS_SELF ) {
67- s .onNext (t );
68-
69- // finish processing to let this thread move on
70- do {
71- current = state .get ();
72- newState = current .finishProcessing (1 );
73- } while (!state .compareAndSet (current , newState ));
74- } else {
75- // drain queue
76- Object [] items = newState .queue ;
77- for (int i = 0 ; i < items .length ; i ++) {
78- s .onNext ((T ) items [i ]);
66+ int numItemsProcessed = 0 ;
67+ try {
68+ if (newState == State .PROCESS_SELF ) {
69+ s .onNext (t );
70+ numItemsProcessed ++;
71+ } else {
72+ // drain queue
73+ Object [] items = newState .queue ;
74+ for (int i = 0 ; i < items .length ; i ++) {
75+ s .onNext ((T ) items [i ]);
76+ numItemsProcessed ++;
77+ }
7978 }
80-
79+ } finally {
8180 // finish processing to let this thread move on
8281 do {
8382 current = state .get ();
84- newState = current .finishProcessing (items . length );
83+ newState = current .finishProcessing (numItemsProcessed );
8584 } while (!state .compareAndSet (current , newState ));
8685 }
8786 }
You can’t perform that action at this time.
0 commit comments