1717
1818import java .util .Queue ;
1919import java .util .concurrent .ConcurrentLinkedQueue ;
20+ import java .util .concurrent .atomic .AtomicBoolean ;
2021import java .util .concurrent .atomic .AtomicLong ;
2122
2223import rx .Observable .Operator ;
2324import rx .Producer ;
2425import rx .Subscriber ;
26+ import rx .exceptions .MissingBackpressureException ;
27+ import rx .functions .Action0 ;
2528
2629public class OperatorOnBackpressureBuffer <T > implements Operator <T , T > {
2730
2831 private final NotificationLite <T > on = NotificationLite .instance ();
2932
33+ private final Long capacity ;
34+ private final Action0 onOverflow ;
35+
36+ public OperatorOnBackpressureBuffer () {
37+ this .capacity = null ;
38+ this .onOverflow = null ;
39+ }
40+
41+ public OperatorOnBackpressureBuffer (long capacity ) {
42+ this (capacity , null );
43+ }
44+
45+ public OperatorOnBackpressureBuffer (long capacity , Action0 onOverflow ) {
46+ if (capacity <= 0 ) {
47+ throw new IllegalArgumentException ("Buffer capacity must be > 0" );
48+ }
49+ this .capacity = capacity ;
50+ this .onOverflow = onOverflow ;
51+ }
52+
3053 @ Override
3154 public Subscriber <? super T > call (final Subscriber <? super T > child ) {
3255 // TODO get a different queue implementation
33- // TODO start with size hint
3456 final ConcurrentLinkedQueue <Object > queue = new ConcurrentLinkedQueue <Object >();
57+ final AtomicLong capacity = (this .capacity == null ) ? null : new AtomicLong (this .capacity );
3558 final AtomicLong wip = new AtomicLong ();
3659 final AtomicLong requested = new AtomicLong ();
3760
@@ -40,14 +63,17 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
4063 @ Override
4164 public void request (long n ) {
4265 if (requested .getAndAdd (n ) == 0 ) {
43- pollQueue (wip , requested , queue , child );
66+ pollQueue (wip , requested , capacity , queue , child );
4467 }
4568 }
4669
4770 });
4871 // don't pass through subscriber as we are async and doing queue draining
4972 // a parent being unsubscribed should not affect the children
5073 Subscriber <T > parent = new Subscriber <T >() {
74+
75+ private AtomicBoolean saturated = new AtomicBoolean (false );
76+
5177 @ Override
5278 public void onStart () {
5379 request (Long .MAX_VALUE );
@@ -56,21 +82,47 @@ public void onStart() {
5682 @ Override
5783 public void onCompleted () {
5884 queue .offer (on .completed ());
59- pollQueue (wip , requested , queue , child );
85+ pollQueue (wip , requested , capacity , queue , child );
6086 }
6187
6288 @ Override
6389 public void onError (Throwable e ) {
6490 queue .offer (on .error (e ));
65- pollQueue (wip , requested , queue , child );
91+ pollQueue (wip , requested , capacity , queue , child );
6692 }
6793
6894 @ Override
6995 public void onNext (T t ) {
96+ if (!ensureCapacity ()) {
97+ return ;
98+ }
7099 queue .offer (on .next (t ));
71- pollQueue (wip , requested , queue , child );
100+ pollQueue (wip , requested , capacity , queue , child );
72101 }
73102
103+ private boolean ensureCapacity () {
104+ if (capacity == null ) {
105+ return true ;
106+ }
107+
108+ long currCapacity ;
109+ do {
110+ currCapacity = capacity .get ();
111+ if (currCapacity <= 0 ) {
112+ if (saturated .compareAndSet (false , true )) {
113+ // ensure single completion contract
114+ child .onError (new MissingBackpressureException ("Overflowed buffer of " + OperatorOnBackpressureBuffer .this .capacity ));
115+ unsubscribe ();
116+ if (onOverflow != null ) {
117+ onOverflow .call ();
118+ }
119+ }
120+ return false ;
121+ }
122+ // ensure no other thread stole our slot, or retry
123+ } while (!capacity .compareAndSet (currCapacity , currCapacity - 1 ));
124+ return true ;
125+ }
74126 };
75127
76128 // if child unsubscribes it should unsubscribe the parent, but not the other way around
@@ -79,7 +131,7 @@ public void onNext(T t) {
79131 return parent ;
80132 }
81133
82- private void pollQueue (AtomicLong wip , AtomicLong requested , Queue <Object > queue , Subscriber <? super T > child ) {
134+ private void pollQueue (AtomicLong wip , AtomicLong requested , AtomicLong capacity , Queue <Object > queue , Subscriber <? super T > child ) {
83135 // TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
84136 if (requested .get () > 0 ) {
85137 // only one draining at a time
@@ -96,6 +148,9 @@ private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue
96148 requested .incrementAndGet ();
97149 return ;
98150 }
151+ if (capacity != null ) { // it's bounded
152+ capacity .incrementAndGet ();
153+ }
99154 on .accept (child , o );
100155 } else {
101156 // we hit the end ... so increment back to 0 again
0 commit comments