1616package rx .internal .producers ;
1717
1818import org .junit .*;
19+ import static org .junit .Assert .*;
1920
2021import rx .*;
2122import rx .exceptions .TestException ;
@@ -38,6 +39,7 @@ public void negativeRequestThrows() {
3839 public void nullProducerAccepted () {
3940 ProducerObserverArbiter <Integer > pa = new ProducerObserverArbiter <Integer >(Subscribers .empty ());
4041 pa .setProducer (null );
42+ pa .request (5 );
4143 }
4244
4345 public void failedRequestUnlocksEmitting () {
@@ -160,4 +162,78 @@ public void onNext(Integer t) {
160162 }
161163 }
162164
165+ @ Test
166+ public void onNextRequests () {
167+ @ SuppressWarnings ("rawtypes" )
168+ final ProducerObserverArbiter [] o = { null };
169+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >() {
170+ @ Override
171+ public void onNext (Integer t ) {
172+ o [0 ].request (1 );
173+ }
174+ };
175+ ProducerObserverArbiter <Integer > poa = new ProducerObserverArbiter <Integer >(ts );
176+ poa .request (1 );
177+ o [0 ] = poa ;
178+ try {
179+ poa .onNext (1 );
180+ } catch (TestException ex ) {
181+ // expected
182+ }
183+ assertEquals (1 , poa .requested );
184+ }
185+
186+ @ Test
187+ public void requestIsCapped () {
188+ ProducerObserverArbiter <Integer > poa = new ProducerObserverArbiter <Integer >(new TestSubscriber <Integer >());
189+
190+ poa .request (Long .MAX_VALUE - 1 );
191+ poa .request (2 );
192+
193+ assertEquals (Long .MAX_VALUE , Long .MAX_VALUE );
194+ }
195+
196+ @ Test
197+ public void onNextChangesProducerNull () {
198+ @ SuppressWarnings ("rawtypes" )
199+ final ProducerObserverArbiter [] o = { null };
200+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >() {
201+ @ Override
202+ public void onNext (Integer t ) {
203+ o [0 ].setProducer (null );
204+ }
205+ };
206+ ProducerObserverArbiter <Integer > poa = new ProducerObserverArbiter <Integer >(ts );
207+ poa .request (1 );
208+ o [0 ] = poa ;
209+ try {
210+ poa .onNext (1 );
211+ } catch (TestException ex ) {
212+ // expected
213+ }
214+ assertNull (poa .currentProducer );
215+ }
216+
217+ @ Test
218+ public void onNextChangesProducerNotNull () {
219+ @ SuppressWarnings ("rawtypes" )
220+ final ProducerObserverArbiter [] o = { null };
221+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >() {
222+ @ SuppressWarnings ("unchecked" )
223+ @ Override
224+ public void onNext (Integer t ) {
225+ o [0 ].setProducer (new SingleProducer <Integer >(o [0 ].child , 2 ));
226+ }
227+ };
228+ ProducerObserverArbiter <Integer > poa = new ProducerObserverArbiter <Integer >(ts );
229+ poa .request (1 );
230+ o [0 ] = poa ;
231+ try {
232+ poa .onNext (1 );
233+ } catch (TestException ex ) {
234+ // expected
235+ }
236+ assertNotNull (poa .currentProducer );
237+ }
238+
163239}
0 commit comments