1+ /**
2+ * Copyright 2016 Netflix, Inc.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+ * compliance with the License. You may obtain a copy of the License at
6+ *
7+ * http://www.apache.org/licenses/LICENSE-2.0
8+ *
9+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
10+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+ * the License for the specific language governing permissions and limitations under the License.
12+ */
13+
14+ package io .reactivex .internal .operators .flowable ;
15+
16+ import io .reactivex .Flowable ;
17+ import io .reactivex .functions .Action ;
18+ import io .reactivex .internal .subscriptions .BooleanSubscription ;
19+ import io .reactivex .subscribers .DefaultSubscriber ;
20+ import io .reactivex .subscribers .TestSubscriber ;
21+ import org .junit .Test ;
22+ import org .reactivestreams .Publisher ;
23+ import org .reactivestreams .Subscriber ;
24+
25+ import java .util .concurrent .atomic .AtomicInteger ;
26+
27+ import static io .reactivex .BackpressureOverflowStrategy .DROP_LATEST ;
28+ import static io .reactivex .BackpressureOverflowStrategy .DROP_OLDEST ;
29+ import static io .reactivex .internal .functions .Functions .EMPTY_ACTION ;
30+ import static org .junit .Assert .assertEquals ;
31+
32+ public class FlowableOnBackpressureBufferStrategyTest {
33+
34+ @ Test (timeout = 2000 )
35+ public void backpressureWithBufferDropOldest () throws InterruptedException {
36+ int bufferSize = 3 ;
37+ final AtomicInteger droppedCount = new AtomicInteger (0 );
38+ Action incrementOnDrop = new Action () {
39+ @ Override
40+ public void run () throws Exception {
41+ droppedCount .incrementAndGet ();
42+ }
43+ };
44+ TestSubscriber <Long > ts = createTestSubscriber ();
45+ Flowable .fromPublisher (send500ValuesAndComplete .onBackpressureBuffer (bufferSize , incrementOnDrop , DROP_OLDEST ))
46+ .subscribe (ts );
47+ // we request 10 but only 3 should come from the buffer
48+ ts .request (10 );
49+ ts .awaitTerminalEvent ();
50+ assertEquals (bufferSize , ts .values ().size ());
51+ ts .assertNoErrors ();
52+ assertEquals (497 , ts .values ().get (0 ).intValue ());
53+ assertEquals (498 , ts .values ().get (1 ).intValue ());
54+ assertEquals (499 , ts .values ().get (2 ).intValue ());
55+ assertEquals (droppedCount .get (), 500 - bufferSize );
56+ }
57+
58+ private TestSubscriber <Long > createTestSubscriber () {
59+ return new TestSubscriber <Long >(new DefaultSubscriber <Long >() {
60+
61+ @ Override
62+ protected void onStart () {
63+ }
64+
65+ @ Override
66+ public void onComplete () {
67+ }
68+
69+ @ Override
70+ public void onError (Throwable e ) {
71+ }
72+
73+ @ Override
74+ public void onNext (Long t ) {
75+ }
76+
77+ }, 0L );
78+ }
79+
80+ @ Test (timeout = 2000 )
81+ public void backpressureWithBufferDropLatest () throws InterruptedException {
82+ int bufferSize = 3 ;
83+ final AtomicInteger droppedCount = new AtomicInteger (0 );
84+ Action incrementOnDrop = new Action () {
85+ @ Override
86+ public void run () throws Exception {
87+ droppedCount .incrementAndGet ();
88+ }
89+ };
90+ TestSubscriber <Long > ts = createTestSubscriber ();
91+ Flowable .fromPublisher (send500ValuesAndComplete .onBackpressureBuffer (bufferSize , incrementOnDrop , DROP_LATEST ))
92+ .subscribe (ts );
93+ // we request 10 but only 3 should come from the buffer
94+ ts .request (10 );
95+ ts .awaitTerminalEvent ();
96+ assertEquals (bufferSize , ts .values ().size ());
97+ ts .assertNoErrors ();
98+ assertEquals (0 , ts .values ().get (0 ).intValue ());
99+ assertEquals (1 , ts .values ().get (1 ).intValue ());
100+ assertEquals (499 , ts .values ().get (2 ).intValue ());
101+ assertEquals (droppedCount .get (), 500 - bufferSize );
102+ }
103+
104+ private static final Flowable <Long > send500ValuesAndComplete = Flowable .unsafeCreate (new Publisher <Long >() {
105+ @ Override
106+ public void subscribe (Subscriber <? super Long > s ) {
107+ BooleanSubscription bs = new BooleanSubscription ();
108+ s .onSubscribe (bs );
109+ long i = 0 ;
110+ while (!bs .isCancelled () && i < 500 ) {
111+ s .onNext (i ++);
112+ }
113+ if (!bs .isCancelled ()){
114+ s .onComplete ();
115+ }
116+ }
117+ });
118+
119+
120+ @ Test (expected = IllegalArgumentException .class )
121+ public void backpressureBufferNegativeCapacity () throws InterruptedException {
122+ Flowable .empty ().onBackpressureBuffer (-1 , EMPTY_ACTION , DROP_OLDEST );
123+ }
124+
125+ @ Test (expected = IllegalArgumentException .class )
126+ public void backpressureBufferZeroCapacity () throws InterruptedException {
127+ Flowable .empty ().onBackpressureBuffer (0 , EMPTY_ACTION , DROP_OLDEST );
128+ }
129+
130+ }
0 commit comments