1717
1818import java .util .Queue ;
1919import java .util .concurrent .ConcurrentLinkedQueue ;
20+ import java .util .concurrent .atomic .AtomicBoolean ;
2021import java .util .concurrent .atomic .AtomicLong ;
2122
2223import rx .Observable .Operator ;
2324import rx .Producer ;
2425import rx .Subscriber ;
26+ import rx .exceptions .MissingBackpressureException ;
27+ import rx .functions .Action0 ;
2528
2629public class OperatorOnBackpressureBuffer <T > implements Operator <T , T > {
2730
2831 private final NotificationLite <T > on = NotificationLite .instance ();
2932
33+ private final Long capacity ;
34+ private final Action0 onOverflow ;
35+
36+ public OperatorOnBackpressureBuffer () {
37+ this .capacity = null ;
38+ this .onOverflow = null ;
39+ }
40+
41+ public OperatorOnBackpressureBuffer (long capacity ) {
42+ this (capacity , null );
43+ }
44+
45+ public OperatorOnBackpressureBuffer (long capacity , Action0 onOverflow ) {
46+ if (capacity <= 0 ) {
47+ throw new IllegalArgumentException ("Buffer capacity must be > 0" );
48+ }
49+ this .capacity = capacity ;
50+ this .onOverflow = onOverflow ;
51+ }
52+
3053 @ Override
3154 public Subscriber <? super T > call (final Subscriber <? super T > child ) {
3255 // TODO get a different queue implementation
33- // TODO start with size hint
3456 final ConcurrentLinkedQueue <Object > queue = new ConcurrentLinkedQueue <Object >();
57+ final AtomicLong capacity = (this .capacity == null ) ? null : new AtomicLong (this .capacity );
3558 final AtomicLong wip = new AtomicLong ();
3659 final AtomicLong requested = new AtomicLong ();
3760
@@ -40,37 +63,71 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
4063 @ Override
4164 public void request (long n ) {
4265 if (requested .getAndAdd (n ) == 0 ) {
43- pollQueue (wip , requested , queue , child );
66+ pollQueue (wip , requested , capacity , queue , child );
4467 }
4568 }
4669
4770 });
4871 // don't pass through subscriber as we are async and doing queue draining
4972 // a parent being unsubscribed should not affect the children
5073 Subscriber <T > parent = new Subscriber <T >() {
74+
75+ private AtomicBoolean saturated = new AtomicBoolean (false );
76+
5177 @ Override
5278 public void onStart () {
5379 request (Long .MAX_VALUE );
5480 }
5581
5682 @ Override
5783 public void onCompleted () {
58- queue .offer (on .completed ());
59- pollQueue (wip , requested , queue , child );
84+ if (!saturated .get ()) {
85+ queue .offer (on .completed ());
86+ pollQueue (wip , requested , capacity , queue , child );
87+ }
6088 }
6189
6290 @ Override
6391 public void onError (Throwable e ) {
64- queue .offer (on .error (e ));
65- pollQueue (wip , requested , queue , child );
92+ if (!saturated .get ()) {
93+ queue .offer (on .error (e ));
94+ pollQueue (wip , requested , capacity , queue , child );
95+ }
6696 }
6797
6898 @ Override
6999 public void onNext (T t ) {
100+ if (!assertCapacity ()) {
101+ return ;
102+ }
70103 queue .offer (on .next (t ));
71- pollQueue (wip , requested , queue , child );
104+ pollQueue (wip , requested , capacity , queue , child );
72105 }
73106
107+ private boolean assertCapacity () {
108+ if (capacity == null ) {
109+ return true ;
110+ }
111+
112+ long currCapacity ;
113+ do {
114+ currCapacity = capacity .get ();
115+ if (currCapacity <= 0 ) {
116+ if (saturated .compareAndSet (false , true )) {
117+ unsubscribe ();
118+ child .onError (new MissingBackpressureException (
119+ "Overflowed buffer of "
120+ + OperatorOnBackpressureBuffer .this .capacity ));
121+ if (onOverflow != null ) {
122+ onOverflow .call ();
123+ }
124+ }
125+ return false ;
126+ }
127+ // ensure no other thread stole our slot, or retry
128+ } while (!capacity .compareAndSet (currCapacity , currCapacity - 1 ));
129+ return true ;
130+ }
74131 };
75132
76133 // if child unsubscribes it should unsubscribe the parent, but not the other way around
@@ -79,7 +136,7 @@ public void onNext(T t) {
79136 return parent ;
80137 }
81138
82- private void pollQueue (AtomicLong wip , AtomicLong requested , Queue <Object > queue , Subscriber <? super T > child ) {
139+ private void pollQueue (AtomicLong wip , AtomicLong requested , AtomicLong capacity , Queue <Object > queue , Subscriber <? super T > child ) {
83140 // TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
84141 if (requested .get () > 0 ) {
85142 // only one draining at a time
@@ -96,6 +153,9 @@ private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue
96153 requested .incrementAndGet ();
97154 return ;
98155 }
156+ if (capacity != null ) { // it's bounded
157+ capacity .incrementAndGet ();
158+ }
99159 on .accept (child , o );
100160 } else {
101161 // we hit the end ... so increment back to 0 again
0 commit comments