1919import java .util .concurrent .atomic .AtomicBoolean ;
2020import java .util .concurrent .atomic .AtomicLong ;
2121
22+ import rx .BackpressureOverflow ;
2223import rx .Observable .Operator ;
2324import rx .Producer ;
2425import rx .Subscriber ;
2728import rx .functions .Action0 ;
2829import rx .internal .util .BackpressureDrainManager ;
2930
31+ import static rx .BackpressureOverflow .*;
32+
3033public class OperatorOnBackpressureBuffer <T > implements Operator <T , T > {
3134
3235 private final Long capacity ;
3336 private final Action0 onOverflow ;
37+ private final BackpressureOverflow .Strategy overflowStrategy ;
3438
3539 private static class Holder {
3640 static final OperatorOnBackpressureBuffer <?> INSTANCE = new OperatorOnBackpressureBuffer <Object >();
3741 }
38-
42+
3943 @ SuppressWarnings ("unchecked" )
4044 public static <T > OperatorOnBackpressureBuffer <T > instance () {
4145 return (OperatorOnBackpressureBuffer <T >) Holder .INSTANCE ;
@@ -44,33 +48,65 @@ public static <T> OperatorOnBackpressureBuffer<T> instance() {
4448 OperatorOnBackpressureBuffer () {
4549 this .capacity = null ;
4650 this .onOverflow = null ;
51+ this .overflowStrategy = ON_OVERFLOW_DEFAULT ;
4752 }
4853
54+ /**
55+ * Construct a new instance that will handle overflows with {@code ON_OVERFLOW_DEFAULT}, providing the
56+ * following behavior config:
57+ *
58+ * @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
59+ */
4960 public OperatorOnBackpressureBuffer (long capacity ) {
50- this (capacity , null );
61+ this (capacity , null , ON_OVERFLOW_DEFAULT );
5162 }
5263
64+ /**
65+ * Construct a new instance that will handle overflows with {@code ON_OVERFLOW_DEFAULT}, providing the
66+ * following behavior config:
67+ *
68+ * @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
69+ * @param onOverflow the {@code Action0} to execute when the buffer overflows, may be null.
70+ */
5371 public OperatorOnBackpressureBuffer (long capacity , Action0 onOverflow ) {
72+ this (capacity , onOverflow , ON_OVERFLOW_DEFAULT );
73+ }
74+
75+ /**
76+ * Construct a new instance feeding the following behavior config:
77+ *
78+ * @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
79+ * @param onOverflow the {@code Action0} to execute when the buffer overflows, may be null.
80+ * @param overflowStrategy the {@code BackpressureOverflow.Strategy} to handle overflows, it must not be null.
81+ */
82+ public OperatorOnBackpressureBuffer (long capacity , Action0 onOverflow ,
83+ BackpressureOverflow .Strategy overflowStrategy ) {
5484 if (capacity <= 0 ) {
5585 throw new IllegalArgumentException ("Buffer capacity must be > 0" );
5686 }
87+ if (overflowStrategy == null ) {
88+ throw new NullPointerException ("The BackpressureOverflow strategy must not be null" );
89+ }
5790 this .capacity = capacity ;
5891 this .onOverflow = onOverflow ;
92+ this .overflowStrategy = overflowStrategy ;
5993 }
6094
6195 @ Override
6296 public Subscriber <? super T > call (final Subscriber <? super T > child ) {
6397
6498 // don't pass through subscriber as we are async and doing queue draining
6599 // a parent being unsubscribed should not affect the children
66- BufferSubscriber <T > parent = new BufferSubscriber <T >(child , capacity , onOverflow );
100+ BufferSubscriber <T > parent = new BufferSubscriber <T >(child , capacity , onOverflow ,
101+ overflowStrategy );
67102
68103 // if child unsubscribes it should unsubscribe the parent, but not the other way around
69104 child .add (parent );
70105 child .setProducer (parent .manager ());
71106
72107 return parent ;
73108 }
109+
74110 private static final class BufferSubscriber <T > extends Subscriber <T > implements BackpressureDrainManager .BackpressureQueueCallback {
75111 // TODO get a different queue implementation
76112 private final ConcurrentLinkedQueue <Object > queue = new ConcurrentLinkedQueue <Object >();
@@ -81,14 +117,18 @@ private static final class BufferSubscriber<T> extends Subscriber<T> implements
81117 private final BackpressureDrainManager manager ;
82118 private final NotificationLite <T > on = NotificationLite .instance ();
83119 private final Action0 onOverflow ;
120+ private final BackpressureOverflow .Strategy overflowStrategy ;
84121
85- public BufferSubscriber (final Subscriber <? super T > child , Long capacity , Action0 onOverflow ) {
122+ public BufferSubscriber (final Subscriber <? super T > child , Long capacity , Action0 onOverflow ,
123+ BackpressureOverflow .Strategy overflowStrategy ) {
86124 this .child = child ;
87125 this .baseCapacity = capacity ;
88126 this .capacity = capacity != null ? new AtomicLong (capacity ) : null ;
89127 this .onOverflow = onOverflow ;
90128 this .manager = new BackpressureDrainManager (this );
129+ this .overflowStrategy = overflowStrategy ;
91130 }
131+
92132 @ Override
93133 public void onStart () {
94134 request (Long .MAX_VALUE );
@@ -141,7 +181,7 @@ public Object poll() {
141181 }
142182 return value ;
143183 }
144-
184+
145185 private boolean assertCapacity () {
146186 if (capacity == null ) {
147187 return true ;
@@ -151,24 +191,30 @@ private boolean assertCapacity() {
151191 do {
152192 currCapacity = capacity .get ();
153193 if (currCapacity <= 0 ) {
154- if (saturated .compareAndSet (false , true )) {
155- unsubscribe ();
156- child .onError (new MissingBackpressureException (
157- "Overflowed buffer of "
158- + baseCapacity ));
159- if (onOverflow != null ) {
160- try {
161- onOverflow .call ();
162- } catch (Throwable e ) {
163- Exceptions .throwIfFatal (e );
164- manager .terminateAndDrain (e );
165- // this line not strictly necessary but nice for clarity
166- // and in case of future changes to code after this catch block
167- return false ;
168- }
194+ boolean hasCapacity = false ;
195+ try {
196+ // ok if we're allowed to drop, and there is indeed an item to discard
197+ hasCapacity = overflowStrategy .mayAttemptDrop () && poll () != null ;
198+ } catch (MissingBackpressureException e ) {
199+ if (saturated .compareAndSet (false , true )) {
200+ unsubscribe ();
201+ child .onError (e );
169202 }
170203 }
171- return false ;
204+ if (onOverflow != null ) {
205+ try {
206+ onOverflow .call ();
207+ } catch (Throwable e ) {
208+ Exceptions .throwIfFatal (e );
209+ manager .terminateAndDrain (e );
210+ // this line not strictly necessary but nice for clarity
211+ // and in case of future changes to code after this catch block
212+ return false ;
213+ }
214+ }
215+ if (!hasCapacity ) {
216+ return false ;
217+ }
172218 }
173219 // ensure no other thread stole our slot, or retry
174220 } while (!capacity .compareAndSet (currCapacity , currCapacity - 1 ));
0 commit comments