1717
1818import java .util .Queue ;
1919import java .util .concurrent .ConcurrentLinkedQueue ;
20- import java .util .concurrent .atomic .AtomicBoolean ;
2120import java .util .concurrent .atomic .AtomicLong ;
2221
2322import rx .Observable .Operator ;
2423import rx .Producer ;
2524import rx .Subscriber ;
26- import rx .exceptions .MissingBackpressureException ;
27- import rx .functions .Action0 ;
2825
2926public class OperatorOnBackpressureBuffer <T > implements Operator <T , T > {
3027
3128 private final NotificationLite <T > on = NotificationLite .instance ();
3229
33- private final Long capacity ;
34- private final Action0 onOverflow ;
35-
36- public OperatorOnBackpressureBuffer () {
37- this .capacity = null ;
38- this .onOverflow = null ;
39- }
40-
41- public OperatorOnBackpressureBuffer (long capacity ) {
42- this (capacity , null );
43- }
44-
45- public OperatorOnBackpressureBuffer (long capacity , Action0 onOverflow ) {
46- if (capacity <= 0 ) {
47- throw new IllegalArgumentException ("Buffer capacity must be > 0" );
48- }
49- this .capacity = capacity ;
50- this .onOverflow = onOverflow ;
51- }
52-
5330 @ Override
5431 public Subscriber <? super T > call (final Subscriber <? super T > child ) {
5532 // TODO get a different queue implementation
33+ // TODO start with size hint
5634 final ConcurrentLinkedQueue <Object > queue = new ConcurrentLinkedQueue <Object >();
57- final AtomicLong capacity = (this .capacity == null ) ? null : new AtomicLong (this .capacity );
5835 final AtomicLong wip = new AtomicLong ();
5936 final AtomicLong requested = new AtomicLong ();
6037
@@ -63,17 +40,14 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
6340 @ Override
6441 public void request (long n ) {
6542 if (requested .getAndAdd (n ) == 0 ) {
66- pollQueue (wip , requested , capacity , queue , child );
43+ pollQueue (wip , requested , queue , child );
6744 }
6845 }
6946
7047 });
7148 // don't pass through subscriber as we are async and doing queue draining
7249 // a parent being unsubscribed should not affect the children
7350 Subscriber <T > parent = new Subscriber <T >() {
74-
75- private AtomicBoolean saturated = new AtomicBoolean (false );
76-
7751 @ Override
7852 public void onStart () {
7953 request (Long .MAX_VALUE );
@@ -82,47 +56,21 @@ public void onStart() {
8256 @ Override
8357 public void onCompleted () {
8458 queue .offer (on .completed ());
85- pollQueue (wip , requested , capacity , queue , child );
59+ pollQueue (wip , requested , queue , child );
8660 }
8761
8862 @ Override
8963 public void onError (Throwable e ) {
9064 queue .offer (on .error (e ));
91- pollQueue (wip , requested , capacity , queue , child );
65+ pollQueue (wip , requested , queue , child );
9266 }
9367
9468 @ Override
9569 public void onNext (T t ) {
96- if (!ensureCapacity ()) {
97- return ;
98- }
9970 queue .offer (on .next (t ));
100- pollQueue (wip , requested , capacity , queue , child );
71+ pollQueue (wip , requested , queue , child );
10172 }
10273
103- private boolean ensureCapacity () {
104- if (capacity == null ) {
105- return true ;
106- }
107-
108- long currCapacity ;
109- do {
110- currCapacity = capacity .get ();
111- if (currCapacity <= 0 ) {
112- if (saturated .compareAndSet (false , true )) {
113- // ensure single completion contract
114- child .onError (new MissingBackpressureException ("Overflowed buffer of " + OperatorOnBackpressureBuffer .this .capacity ));
115- unsubscribe ();
116- if (onOverflow != null ) {
117- onOverflow .call ();
118- }
119- }
120- return false ;
121- }
122- // ensure no other thread stole our slot, or retry
123- } while (!capacity .compareAndSet (currCapacity , currCapacity - 1 ));
124- return true ;
125- }
12674 };
12775
12876 // if child unsubscribes it should unsubscribe the parent, but not the other way around
@@ -131,7 +79,7 @@ private boolean ensureCapacity() {
13179 return parent ;
13280 }
13381
134- private void pollQueue (AtomicLong wip , AtomicLong requested , AtomicLong capacity , Queue <Object > queue , Subscriber <? super T > child ) {
82+ private void pollQueue (AtomicLong wip , AtomicLong requested , Queue <Object > queue , Subscriber <? super T > child ) {
13583 // TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
13684 if (requested .get () > 0 ) {
13785 // only one draining at a time
@@ -148,9 +96,6 @@ private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity
14896 requested .incrementAndGet ();
14997 return ;
15098 }
151- if (capacity != null ) { // it's bounded
152- capacity .incrementAndGet ();
153- }
15499 on .accept (child , o );
155100 } else {
156101 // we hit the end ... so increment back to 0 again
0 commit comments