@@ -194,7 +194,7 @@ public void onError(Throwable e) {
194194 // If we already have items queued when a request comes in we vend those and decrement the outstanding request count
195195
196196 void requestFromGroupedObservable (long n , GroupState <K , T > group ) {
197- group .requested . getAndAdd ( n );
197+ BackpressureUtils . getAndAddRequest ( group .requested , n );
198198 if (group .count .getAndIncrement () == 0 ) {
199199 pollQueue (group );
200200 }
@@ -330,13 +330,19 @@ private void cleanupGroup(Object key) {
330330 private void emitItem (GroupState <K , T > groupState , Object item ) {
331331 Queue <Object > q = groupState .buffer ;
332332 AtomicLong keyRequested = groupState .requested ;
333+ //don't need to check for requested being Long.MAX_VALUE because this
334+ //field is capped at MAX_QUEUE_SIZE
333335 REQUESTED .decrementAndGet (this );
334336 // short circuit buffering
335337 if (keyRequested != null && keyRequested .get () > 0 && (q == null || q .isEmpty ())) {
336338 @ SuppressWarnings ("unchecked" )
337339 Observer <Object > obs = (Observer <Object >)groupState .getObserver ();
338340 nl .accept (obs , item );
339- keyRequested .decrementAndGet ();
341+ if (keyRequested .get () != Long .MAX_VALUE ) {
342+ // best endeavours check (no CAS loop here) because we mainly care about
343+ // the initial request being Long.MAX_VALUE and that value being conserved.
344+ keyRequested .decrementAndGet ();
345+ }
340346 } else {
341347 q .add (item );
342348 BUFFERED_COUNT .incrementAndGet (this );
@@ -381,7 +387,11 @@ private void drainIfPossible(GroupState<K, T> groupState) {
381387 @ SuppressWarnings ("unchecked" )
382388 Observer <Object > obs = (Observer <Object >)groupState .getObserver ();
383389 nl .accept (obs , t );
384- groupState .requested .decrementAndGet ();
390+ if (groupState .requested .get ()!=Long .MAX_VALUE ) {
391+ // best endeavours check (no CAS loop here) because we mainly care about
392+ // the initial request being Long.MAX_VALUE and that value being conserved.
393+ groupState .requested .decrementAndGet ();
394+ }
385395 BUFFERED_COUNT .decrementAndGet (this );
386396
387397 // if we have used up all the events we requested from upstream then figure out what to ask for this time based on the empty space in the buffer
0 commit comments