11package rx .observers ;
22
3+ import java .util .Collections ;
4+ import java .util .HashSet ;
5+ import java .util .Set ;
36import java .util .concurrent .atomic .AtomicInteger ;
47import java .util .concurrent .atomic .AtomicReference ;
58
@@ -14,6 +17,11 @@ public SerializedObserver(Observer<T> s) {
1417 this .s = s ;
1518 }
1619
20+ final AtomicInteger received = new AtomicInteger ();
21+ final AtomicInteger counter = new AtomicInteger ();
22+ final AtomicInteger offered = new AtomicInteger ();
23+ static AtomicInteger decremented = new AtomicInteger ();
24+
1725 @ Override
1826 public void onCompleted () {
1927 State current = null ;
@@ -26,7 +34,6 @@ public void onCompleted() {
2634 }
2735 newState = current .complete ();
2836 } while (!state .compareAndSet (current , newState ));
29- System .out .println ("********** onCompleted" );
3037 terminateIfNecessary (newState );
3138 }
3239
@@ -42,43 +49,54 @@ public void onError(Throwable e) {
4249 }
4350 newState = current .error (e );
4451 } while (!state .compareAndSet (current , newState ));
45- System .out .println ("********** onError" );
4652 terminateIfNecessary (newState );
4753 }
4854
55+ AtomicInteger conc = new AtomicInteger ();
56+ AtomicInteger lost = new AtomicInteger ();
57+ Set <Object > items = Collections .synchronizedSet (new HashSet <Object >());
58+
4959 @ SuppressWarnings ("unchecked" )
5060 @ Override
5161 public void onNext (T t ) {
5262 State current = null ;
5363 State newState = null ;
64+
65+ int contention = 0 ;
66+ State orig = null ;
5467 do {
5568 current = state .get ();
69+ if (orig == null ) {
70+ orig = current ;
71+ }
5672 if (current .isTerminated ()) {
5773 // already received terminal state
5874 return ;
5975 }
6076 newState = current .offerItem (t );
77+ contention ++;
6178 } while (!state .compareAndSet (current , newState ));
6279
80+ do {
81+ current = state .get ();
82+ newState = current .startProcessing ();
83+ } while (!state .compareAndSet (current , newState ));
6384 if (newState .shouldProcess ()) {
64- // this thread wins and will emit then drain queue if it concurrently gets added to
65- s .onNext (t );
66-
67- // drain queue if exists
68- // we do "if" instead of "while" so we don't starve one thread
85+ // drain queue
6986 Object [] items = newState .queue ;
7087 for (int i = 0 ; i < items .length ; i ++) {
7188 s .onNext ((T ) items [i ]);
89+ counter .incrementAndGet ();
7290 }
7391
7492 // finish processing to let this thread move on
7593 do {
7694 current = state .get ();
77- newState = current .finishProcessing (items .length + 1 ); // the + 1 is for the first onNext of itself
95+ newState = current .finishProcessing (items .length );
7896 } while (!state .compareAndSet (current , newState ));
79- System .out .println ("********** finishProcessing" );
80- terminateIfNecessary (newState );
97+
8198 }
99+ terminateIfNecessary (newState );
82100 }
83101
84102 @ SuppressWarnings ("unchecked" )
@@ -89,6 +107,7 @@ private void terminateIfNecessary(State current) {
89107 current = state .get ();
90108 newState = current .startTermination ();
91109 } while (!state .compareAndSet (current , newState ));
110+
92111 if (newState .shouldProcess ()) {
93112 // drain any items left
94113 for (int i = 0 ; i < newState .queue .length ; i ++) {
@@ -114,9 +133,9 @@ public static class State {
114133 final Throwable onError ;
115134
116135 private final static Object [] EMPTY = new Object [0 ];
136+ private final static Object [] PROCESS_SELF = new Object [1 ];
117137
118138 private final static State NON_TERMINATED_EMPTY = new State (false , false , 0 , false , null , EMPTY );
119- private final static State NON_TERMINATED_PROCESS_SELF = new State (true , true , 1 , false , null , EMPTY );
120139
121140 public State (boolean shouldProcess , boolean isSomeoneProcessing , int queueSize , boolean onComplete , Throwable onError , Object [] queue ) {
122141 this .shouldProcess = shouldProcess ;
@@ -145,59 +164,59 @@ public State complete() {
145164
146165 public State error (Throwable e ) {
147166 // immediately empty the queue and emit error as soon as possible
148- return new State (false , isSomeoneProcessing , queueSize , onComplete , e , EMPTY );
167+ return new State (false , isSomeoneProcessing , 0 , onComplete , e , EMPTY );
149168 }
150169
151170 public State startTermination () {
152171 if (isSomeoneProcessing ) {
153- System .out .println ("start terminate and DO NOT process => queue size: " + (queueSize + 1 ));
154172 return new State (false , isSomeoneProcessing , queueSize , onComplete , onError , queue );
155173 } else {
156- System .out .println ("start terminate and process => queue size: " + (queueSize + 1 ));
157- return new State (true , isSomeoneProcessing , queueSize , onComplete , onError , queue );
174+ return new State (true , true , queueSize , onComplete , onError , queue );
158175 }
159176 }
160177
161- AtomicInteger max = new AtomicInteger ();
162-
163178 public State offerItem (Object item ) {
164- if (queueSize == 0 ) {
165- // no concurrent requests so don't queue, we'll process immediately
166- if (isTerminated ()) {
167- // return count of 0 meaning don't emit as we are terminated
168- return new State (false , false , 0 , onComplete , onError , EMPTY );
169- } else {
170- return NON_TERMINATED_PROCESS_SELF ;
171- }
179+ if (isTerminated ()) {
180+ // return count of 0 meaning don't emit as we are terminated
181+ return new State (false , isSomeoneProcessing , 0 , onComplete , onError , EMPTY );
172182 } else {
173- // there are items queued so we need to queue
174183 int idx = queue .length ;
175184 Object [] newQueue = new Object [idx + 1 ];
176185 System .arraycopy (queue , 0 , newQueue , 0 , idx );
177186 newQueue [idx ] = item ;
178187
179- if (isSomeoneProcessing ) {
180- // we just add to queue
181- return new State (false , isSomeoneProcessing , queueSize + 1 , onComplete , onError , newQueue );
182- } else {
183- // we add to queue and claim work
184- return new State (false , true , queueSize + 1 , onComplete , onError , newQueue );
185- }
188+ // we just add to queue
189+ return new State (false , isSomeoneProcessing , queueSize + 1 , onComplete , onError , newQueue );
190+ }
191+ }
192+
193+ public State startProcessing () {
194+ if (isSomeoneProcessing ) {
195+ return new State (false , true , queueSize , onComplete , onError , queue );
196+ } else {
197+ return new State (true , true , queueSize , onComplete , onError , queue );
186198 }
187199 }
188200
189201 public State finishProcessing (int numOnNextSent ) {
190- int numOnNextFromQueue = numOnNextSent - 1 ; // we remove the "self" onNext as it doesn't affect the queue
191- int size = queueSize - numOnNextFromQueue ;
192- System .out .println ("finishProcessing => queue size: " + size + " after processing: " + numOnNextSent );
193- if (size > 1 || isTerminated ()) {
194- Object [] newQueue = new Object [queue .length - numOnNextFromQueue ];
195- System .arraycopy (queue , numOnNextFromQueue , newQueue , 0 , newQueue .length );
202+ int size = queueSize - numOnNextSent ;
203+ if (size > 0 || isTerminated ()) {
204+ // if size == 0 but we are terminated then it's an empty queue
205+ Object [] newQueue = EMPTY ;
206+ if (size > 0 ) {
207+ newQueue = new Object [queue .length - numOnNextSent ];
208+ System .arraycopy (queue , numOnNextSent , newQueue , 0 , newQueue .length );
209+ }
196210 return new State (false , false , size , onComplete , onError , newQueue );
197211 } else {
198212 return NON_TERMINATED_EMPTY ;
199213 }
200214 }
201215
216+ @ Override
217+ public String toString () {
218+ return "State => shouldProcess: " + shouldProcess + " processing: " + isSomeoneProcessing + " queueSize: " + queueSize + " queue: " + queue .length + " terminated: " + isTerminated ();
219+ }
220+
202221 }
203222}
0 commit comments