@@ -26,9 +26,8 @@ public void onCompleted() {
2626 }
2727 newState = current .complete ();
2828 } while (!state .compareAndSet (current , newState ));
29- if (newState .count == 0 ) {
30- s .onCompleted ();
31- }
29+ System .out .println ("********** onCompleted" );
30+ terminateIfNecessary (newState );
3231 }
3332
3433 @ Override
@@ -43,9 +42,8 @@ public void onError(Throwable e) {
4342 }
4443 newState = current .error (e );
4544 } while (!state .compareAndSet (current , newState ));
46- if (newState .count == 0 ) {
47- s .onError (e );
48- }
45+ System .out .println ("********** onError" );
46+ terminateIfNecessary (newState );
4947 }
5048
5149 @ SuppressWarnings ("unchecked" )
@@ -59,125 +57,147 @@ public void onNext(T t) {
5957 // already received terminal state
6058 return ;
6159 }
62- newState = current .increment (t );
60+ newState = current .offerItem (t );
6361 } while (!state .compareAndSet (current , newState ));
6462
65- if (newState .count == 1 ) {
63+ if (newState .shouldProcess () ) {
6664 // this thread wins and will emit then drain queue if it concurrently gets added to
67- try {
68- s .onNext (t );
69- } finally {
70- // decrement after finishing
71- do {
72- current = state .get ();
73- newState = current .decrement ();
74- } while (!state .compareAndSet (current , newState ));
75- }
65+ s .onNext (t );
7666
7767 // drain queue if exists
7868 // we do "if" instead of "while" so we don't starve one thread
79- if (newState .queue .length > 0 ) {
80- Object [] items = newState .queue ;
81- for (int i = 0 ; i < items .length ; i ++) {
82- s .onNext ((T ) items [i ]);
83- }
84- // clear state of queue
85- do {
86- current = state .get ();
87- newState = current .drain (items .length );
88- } while (!state .compareAndSet (current , newState ));
89- terminateIfNecessary (newState );
90- } else {
91- terminateIfNecessary (newState );
69+ Object [] items = newState .queue ;
70+ for (int i = 0 ; i < items .length ; i ++) {
71+ s .onNext ((T ) items [i ]);
9272 }
9373
74+ // finish processing to let this thread move on
75+ do {
76+ current = state .get ();
77+ newState = current .finishProcessing (items .length + 1 ); // the + 1 is for the first onNext of itself
78+ } while (!state .compareAndSet (current , newState ));
79+ System .out .println ("********** finishProcessing" );
80+ terminateIfNecessary (newState );
9481 }
9582 }
9683
97- private void terminateIfNecessary (State state ) {
98- if (state .isTerminated ()) {
99- if (state .onComplete ) {
100- s .onCompleted ();
101- } else {
102- s .onError (state .onError );
84+ @ SuppressWarnings ("unchecked" )
85+ private void terminateIfNecessary (State current ) {
86+ if (current .isTerminated ()) {
87+ State newState = null ;
88+ do {
89+ current = state .get ();
90+ newState = current .startTermination ();
91+ } while (!state .compareAndSet (current , newState ));
92+ if (newState .shouldProcess ()) {
93+ // drain any items left
94+ for (int i = 0 ; i < newState .queue .length ; i ++) {
95+ s .onNext ((T ) newState .queue [i ]);
96+ }
97+
98+ // now terminate
99+ if (newState .onComplete ) {
100+ s .onCompleted ();
101+ } else {
102+ s .onError (newState .onError );
103+ }
103104 }
104105 }
105106 }
106107
107108 public static class State {
108- final int count ;
109+ final boolean shouldProcess ;
110+ final boolean isSomeoneProcessing ;
111+ final int queueSize ;
109112 final Object [] queue ;
110113 final boolean onComplete ;
111114 final Throwable onError ;
112115
113116 private final static Object [] EMPTY = new Object [0 ];
114117
115- private final static State NON_TERMINATED_EMPTY = new State (0 , false , null , EMPTY );
116- private final static State NON_TERMINATED_SINGLE = new State (1 , false , null , EMPTY );
118+ private final static State NON_TERMINATED_EMPTY = new State (false , false , 0 , false , null , EMPTY );
119+ private final static State NON_TERMINATED_PROCESS_SELF = new State (true , true , 1 , false , null , EMPTY );
117120
118- public State (int count , boolean onComplete , Throwable onError , Object [] queue ) {
119- this .count = count ;
121+ public State (boolean shouldProcess , boolean isSomeoneProcessing , int queueSize , boolean onComplete , Throwable onError , Object [] queue ) {
122+ this .shouldProcess = shouldProcess ;
123+ this .isSomeoneProcessing = isSomeoneProcessing ;
124+ this .queueSize = queueSize ;
120125 this .queue = queue ;
121126 this .onComplete = onComplete ;
122127 this .onError = onError ;
123128 }
124129
125130 public static State createNew () {
126- return new State (0 , false , null , EMPTY );
131+ return new State (false , false , 0 , false , null , EMPTY );
132+ }
133+
134+ public boolean shouldProcess () {
135+ return shouldProcess ;
127136 }
128137
129138 public boolean isTerminated () {
130139 return onComplete || onError != null ;
131140 }
132141
133142 public State complete () {
134- return new State (count , true , onError , queue );
143+ return new State (false , isSomeoneProcessing , queueSize , true , onError , queue );
135144 }
136145
137146 public State error (Throwable e ) {
138- return new State (count , onComplete , e , queue );
147+ // immediately empty the queue and emit error as soon as possible
148+ return new State (false , isSomeoneProcessing , queueSize , onComplete , e , EMPTY );
149+ }
150+
151+ public State startTermination () {
152+ if (isSomeoneProcessing ) {
153+ System .out .println ("start terminate and DO NOT process => queue size: " + (queueSize + 1 ));
154+ return new State (false , isSomeoneProcessing , queueSize , onComplete , onError , queue );
155+ } else {
156+ System .out .println ("start terminate and process => queue size: " + (queueSize + 1 ));
157+ return new State (true , isSomeoneProcessing , queueSize , onComplete , onError , queue );
158+ }
139159 }
140160
141161 AtomicInteger max = new AtomicInteger ();
142162
143- public State increment (Object item ) {
144- if (count == 0 ) {
163+ public State offerItem (Object item ) {
164+ if (queueSize == 0 ) {
145165 // no concurrent requests so don't queue, we'll process immediately
146166 if (isTerminated ()) {
147167 // return count of 0 meaning don't emit as we are terminated
148- return new State (0 , onComplete , onError , EMPTY );
168+ return new State (false , false , 0 , onComplete , onError , EMPTY );
149169 } else {
150- return NON_TERMINATED_SINGLE ;
170+ return NON_TERMINATED_PROCESS_SELF ;
151171 }
152172 } else {
153- // concurrent requests so need to queue
173+ // there are items queued so we need to queue
154174 int idx = queue .length ;
155175 Object [] newQueue = new Object [idx + 1 ];
156176 System .arraycopy (queue , 0 , newQueue , 0 , idx );
157177 newQueue [idx ] = item ;
158178
159- if (max .get () < newQueue .length ) {
160- max .set (newQueue .length );
161- System .out .println ("max queue: " + newQueue .length );
179+ if (isSomeoneProcessing ) {
180+ // we just add to queue
181+ return new State (false , isSomeoneProcessing , queueSize + 1 , onComplete , onError , newQueue );
182+ } else {
183+ // we add to queue and claim work
184+ return new State (false , true , queueSize + 1 , onComplete , onError , newQueue );
162185 }
163-
164- return new State (count + 1 , onComplete , onError , newQueue );
165186 }
166187 }
167188
168- public State decrement () {
169- if (count > 1 || isTerminated ()) {
170- return new State (count - 1 , onComplete , onError , queue );
189+ public State finishProcessing (int numOnNextSent ) {
190+ int numOnNextFromQueue = numOnNextSent - 1 ; // we remove the "self" onNext as it doesn't affect the queue
191+ int size = queueSize - numOnNextFromQueue ;
192+ System .out .println ("finishProcessing => queue size: " + size + " after processing: " + numOnNextSent );
193+ if (size > 1 || isTerminated ()) {
194+ Object [] newQueue = new Object [queue .length - numOnNextFromQueue ];
195+ System .arraycopy (queue , numOnNextFromQueue , newQueue , 0 , newQueue .length );
196+ return new State (false , false , size , onComplete , onError , newQueue );
171197 } else {
172198 return NON_TERMINATED_EMPTY ;
173199 }
174200 }
175201
176- public State drain (int c ) {
177- Object [] newQueue = new Object [queue .length - c ];
178- System .arraycopy (queue , c , newQueue , 0 , newQueue .length );
179- return new State (count - c , onComplete , onError , newQueue );
180- }
181-
182202 }
183203}
0 commit comments