@@ -1581,4 +1581,224 @@ public void call(GroupedObservable<Object, Integer> g) {
15811581 ts2 .assertNotCompleted ();
15821582 }
15831583
1584+ @ Test
1585+ public void testGroupedObservableCollection () {
1586+
1587+ final TestSubscriber <List <Integer >> inner1 = new TestSubscriber <List <Integer >>();
1588+ final TestSubscriber <List <Integer >> inner2 = new TestSubscriber <List <Integer >>();
1589+
1590+ TestSubscriber <List <Observable <List <Integer >>>> outer = new TestSubscriber <List <Observable <List <Integer >>>>(new Subscriber <List <Observable <List <Integer >>>>() {
1591+
1592+ @ Override
1593+ public void onCompleted () {
1594+ }
1595+
1596+ @ Override
1597+ public void onError (Throwable e ) {
1598+ }
1599+
1600+ @ Override
1601+ public void onNext (List <Observable <List <Integer >>> o ) {
1602+ o .get (0 ).subscribe (inner1 );
1603+ o .get (1 ).subscribe (inner2 );
1604+ }
1605+ });
1606+
1607+
1608+
1609+
1610+ Observable .range (0 , 10 )
1611+ .groupBy (new Func1 <Integer , Boolean >() {
1612+ @ Override
1613+ public Boolean call (Integer pair ) {
1614+ return pair % 2 == 1 ;
1615+ }
1616+ })
1617+ .map (new Func1 <GroupedObservable <Boolean , Integer >, Observable <List <Integer >>>() {
1618+ @ Override
1619+ public Observable <List <Integer >> call (GroupedObservable <Boolean , Integer > oddOrEven ) {
1620+ return oddOrEven .toList ();
1621+ }
1622+ })
1623+ .toList ()
1624+ .subscribe (outer );
1625+
1626+ inner1 .assertNoErrors ();
1627+ inner1 .assertCompleted ();
1628+ inner2 .assertNoErrors ();
1629+ inner2 .assertCompleted ();
1630+
1631+ inner1 .assertReceivedOnNext (Arrays .asList (Arrays .asList (0 ,2 ,4 ,6 ,8 )));
1632+ inner2 .assertReceivedOnNext (Arrays .asList (Arrays .asList (1 ,3 ,5 ,7 ,9 )));
1633+
1634+ outer .assertNoErrors ();
1635+ outer .assertCompleted ();
1636+ outer .assertValueCount (1 );
1637+
1638+ }
1639+
1640+ @ Test
1641+ public void testCollectedGroups () {
1642+
1643+ final TestSubscriber <List <Integer >> inner1 = new TestSubscriber <List <Integer >>();
1644+ final TestSubscriber <List <Integer >> inner2 = new TestSubscriber <List <Integer >>();
1645+
1646+ final List <TestSubscriber <List <Integer >>> inners = Arrays .asList (inner1 , inner2 );
1647+
1648+ TestSubscriber <Observable <List <Integer >>> outer = new TestSubscriber <Observable <List <Integer >>>(new Subscriber <Observable <List <Integer >>>() {
1649+ int toInner ;
1650+ @ Override
1651+ public void onCompleted () {
1652+ }
1653+
1654+ @ Override
1655+ public void onError (Throwable e ) {
1656+ }
1657+
1658+ @ Override
1659+ public void onNext (Observable <List <Integer >> o ) {
1660+ o .subscribe (inners .get (toInner ++));
1661+ }
1662+ });
1663+
1664+
1665+
1666+
1667+ Observable .range (0 , 10 )
1668+ .groupBy (new Func1 <Integer , Boolean >() {
1669+ @ Override
1670+ public Boolean call (Integer pair ) {
1671+ return pair % 2 == 1 ;
1672+ }
1673+ })
1674+ .map (new Func1 <GroupedObservable <Boolean ,Integer >, Observable <List <Integer >>>() {
1675+ @ Override
1676+ public Observable <List <Integer >> call (GroupedObservable <Boolean , Integer > booleanIntegerGroupedObservable ) {
1677+ return booleanIntegerGroupedObservable .toList ();
1678+ }
1679+ })
1680+ .subscribe (outer );
1681+
1682+ inner1 .assertNoErrors ();
1683+ inner1 .assertCompleted ();
1684+
1685+ inner1 .assertReceivedOnNext (Arrays .asList (Arrays .asList (0 ,2 ,4 ,6 ,8 )));
1686+ inner2 .assertReceivedOnNext (Arrays .asList (Arrays .asList (1 ,3 ,5 ,7 ,9 )));
1687+
1688+ outer .assertNoErrors ();
1689+ outer .assertCompleted ();
1690+ outer .assertValueCount (2 );
1691+
1692+ }
1693+
1694+ @ Test
1695+ public void testMappedCollectedGroups () {
1696+ // This is a little contrived.
1697+ final TestSubscriber <Integer > inner1 = new TestSubscriber <Integer >();
1698+ final TestSubscriber <Integer > inner2 = new TestSubscriber <Integer >();
1699+
1700+ TestSubscriber <Map <Integer , Observable <Integer >>> outer = new TestSubscriber <Map <Integer , Observable <Integer >>>(new Subscriber <Map <Integer , Observable <Integer >>>() {
1701+ @ Override
1702+ public void onCompleted () {
1703+
1704+ }
1705+
1706+ @ Override
1707+ public void onError (Throwable e ) {
1708+
1709+ }
1710+
1711+ @ Override
1712+ public void onNext (Map <Integer , Observable <Integer >> integerObservableMap ) {
1713+ integerObservableMap .get (0 ).subscribe (inner1 );
1714+ integerObservableMap .get (1 ).subscribe (inner2 );
1715+ }
1716+ });
1717+
1718+ Observable <Map <Integer , Observable <Integer >>> mapObservable = Observable .range (0 , 10 )
1719+ .groupBy (new Func1 <Integer , Integer >() {
1720+ @ Override
1721+ public Integer call (Integer pair ) {
1722+ return pair % 2 ;
1723+ }
1724+ })
1725+ .toMap (new Func1 <GroupedObservable <Integer , Integer >, Integer >() {
1726+ @ Override
1727+ public Integer call (GroupedObservable <Integer , Integer > group ) {
1728+ return group .getKey ();
1729+ }
1730+ },
1731+ new Func1 <GroupedObservable <Integer , Integer >, Observable <Integer >>() {
1732+ @ Override
1733+ public Observable <Integer > call (GroupedObservable <Integer , Integer > integerGroup ) {
1734+ return integerGroup .map (
1735+ new Func1 <Integer , Integer >() {
1736+ @ Override
1737+ public Integer call (Integer integer ) {
1738+ return integer * 10 ;
1739+ }
1740+ });
1741+ }
1742+ }
1743+ );
1744+
1745+ mapObservable .subscribe (outer );
1746+
1747+ inner1 .assertNoErrors ();
1748+ inner1 .assertCompleted ();
1749+
1750+ inner1 .assertReceivedOnNext (Arrays .asList (0 ,20 ,40 ,60 ,80 ));
1751+ inner2 .assertReceivedOnNext (Arrays .asList (10 ,30 ,50 ,70 ,90 ));
1752+
1753+ outer .assertNoErrors ();
1754+ outer .assertCompleted ();
1755+ outer .assertValueCount (1 );
1756+
1757+ }
1758+
1759+ @ Test
1760+ public void testSkippedGroup () {
1761+
1762+ final TestSubscriber <Integer > inner1 = new TestSubscriber <Integer >();
1763+
1764+ TestSubscriber <GroupedObservable <Integer , Integer >> outer = new TestSubscriber <GroupedObservable <Integer , Integer >>(new Subscriber <GroupedObservable <Integer , Integer >>() {
1765+
1766+ @ Override
1767+ public void onCompleted () {
1768+ }
1769+
1770+ @ Override
1771+ public void onError (Throwable e ) {
1772+ }
1773+
1774+ @ Override
1775+ public void onNext (GroupedObservable <Integer , Integer > o ) {
1776+ if (o .getKey () == 1 ) {
1777+ o .subscribe (inner1 );
1778+ }
1779+ }
1780+ });
1781+
1782+
1783+
1784+
1785+ Observable .range (0 , 10 )
1786+ .groupBy (new Func1 <Integer , Integer >() {
1787+ @ Override
1788+ public Integer call (Integer pair ) {
1789+ return pair % 2 ;
1790+ }
1791+ })
1792+ .subscribe (outer );
1793+
1794+ inner1 .assertNoErrors ();
1795+ inner1 .assertCompleted ();
1796+
1797+ inner1 .assertReceivedOnNext (Arrays .asList (1 ,3 ,5 ,7 ,9 ));
1798+
1799+ outer .assertNoErrors ();
1800+ outer .assertCompleted ();
1801+ outer .assertValueCount (2 );
1802+
1803+ }
15841804}
0 commit comments