@@ -1368,4 +1368,134 @@ public String apply(Integer t) throws Exception {
13681368 public void badRequest () {
13691369 TestHelper .assertBadRequestReported (Flowable .range (1 , 5 ).publish ());
13701370 }
1371+
1372+ @ Test
1373+ @ SuppressWarnings ("unchecked" )
1374+ public void splitCombineSubscriberChangeAfterOnNext () {
1375+ Flowable <Integer > source = Flowable .range (0 , 20 )
1376+ .doOnSubscribe (new Consumer <Subscription >() {
1377+ @ Override
1378+ public void accept (Subscription v ) throws Exception {
1379+ System .out .println ("Subscribed" );
1380+ }
1381+ })
1382+ .publish (10 )
1383+ .refCount ()
1384+ ;
1385+
1386+ Flowable <Integer > evenNumbers = source .filter (new Predicate <Integer >() {
1387+ @ Override
1388+ public boolean test (Integer v ) throws Exception {
1389+ return v % 2 == 0 ;
1390+ }
1391+ });
1392+
1393+ Flowable <Integer > oddNumbers = source .filter (new Predicate <Integer >() {
1394+ @ Override
1395+ public boolean test (Integer v ) throws Exception {
1396+ return v % 2 != 0 ;
1397+ }
1398+ });
1399+
1400+ final Single <Integer > getNextOdd = oddNumbers .first (0 );
1401+
1402+ TestSubscriber <List <Integer >> ts = evenNumbers .concatMap (new Function <Integer , Publisher <List <Integer >>>() {
1403+ @ Override
1404+ public Publisher <List <Integer >> apply (Integer v ) throws Exception {
1405+ return Single .zip (
1406+ Single .just (v ), getNextOdd ,
1407+ new BiFunction <Integer , Integer , List <Integer >>() {
1408+ @ Override
1409+ public List <Integer > apply (Integer a , Integer b ) throws Exception {
1410+ return Arrays .asList ( a , b );
1411+ }
1412+ }
1413+ )
1414+ .toFlowable ();
1415+ }
1416+ })
1417+ .takeWhile (new Predicate <List <Integer >>() {
1418+ @ Override
1419+ public boolean test (List <Integer > v ) throws Exception {
1420+ return v .get (0 ) < 20 ;
1421+ }
1422+ })
1423+ .test ();
1424+
1425+ ts
1426+ .assertResult (
1427+ Arrays .asList (0 , 1 ),
1428+ Arrays .asList (2 , 3 ),
1429+ Arrays .asList (4 , 5 ),
1430+ Arrays .asList (6 , 7 ),
1431+ Arrays .asList (8 , 9 ),
1432+ Arrays .asList (10 , 11 ),
1433+ Arrays .asList (12 , 13 ),
1434+ Arrays .asList (14 , 15 ),
1435+ Arrays .asList (16 , 17 ),
1436+ Arrays .asList (18 , 19 )
1437+ );
1438+ }
1439+
1440+ @ Test
1441+ @ SuppressWarnings ("unchecked" )
1442+ public void splitCombineSubscriberChangeAfterOnNextFused () {
1443+ Flowable <Integer > source = Flowable .range (0 , 20 )
1444+ .publish (10 )
1445+ .refCount ()
1446+ ;
1447+
1448+ Flowable <Integer > evenNumbers = source .filter (new Predicate <Integer >() {
1449+ @ Override
1450+ public boolean test (Integer v ) throws Exception {
1451+ return v % 2 == 0 ;
1452+ }
1453+ });
1454+
1455+ Flowable <Integer > oddNumbers = source .filter (new Predicate <Integer >() {
1456+ @ Override
1457+ public boolean test (Integer v ) throws Exception {
1458+ return v % 2 != 0 ;
1459+ }
1460+ });
1461+
1462+ final Single <Integer > getNextOdd = oddNumbers .first (0 );
1463+
1464+ TestSubscriber <List <Integer >> ts = evenNumbers .concatMap (new Function <Integer , Publisher <List <Integer >>>() {
1465+ @ Override
1466+ public Publisher <List <Integer >> apply (Integer v ) throws Exception {
1467+ return Single .zip (
1468+ Single .just (v ), getNextOdd ,
1469+ new BiFunction <Integer , Integer , List <Integer >>() {
1470+ @ Override
1471+ public List <Integer > apply (Integer a , Integer b ) throws Exception {
1472+ return Arrays .asList ( a , b );
1473+ }
1474+ }
1475+ )
1476+ .toFlowable ();
1477+ }
1478+ })
1479+ .takeWhile (new Predicate <List <Integer >>() {
1480+ @ Override
1481+ public boolean test (List <Integer > v ) throws Exception {
1482+ return v .get (0 ) < 20 ;
1483+ }
1484+ })
1485+ .test ();
1486+
1487+ ts
1488+ .assertResult (
1489+ Arrays .asList (0 , 1 ),
1490+ Arrays .asList (2 , 3 ),
1491+ Arrays .asList (4 , 5 ),
1492+ Arrays .asList (6 , 7 ),
1493+ Arrays .asList (8 , 9 ),
1494+ Arrays .asList (10 , 11 ),
1495+ Arrays .asList (12 , 13 ),
1496+ Arrays .asList (14 , 15 ),
1497+ Arrays .asList (16 , 17 ),
1498+ Arrays .asList (18 , 19 )
1499+ );
1500+ }
13711501}
0 commit comments