@@ -1581,4 +1581,225 @@ public void call(GroupedObservable<Object, Integer> g) {
15811581 ts2 .assertNotCompleted ();
15821582 }
15831583
1584+ @ Test
1585+ public void testGroupedObservableCollection () {
1586+
1587+ final TestSubscriber <List <Integer >> inner1 = new TestSubscriber <List <Integer >>();
1588+ final TestSubscriber <List <Integer >> inner2 = new TestSubscriber <List <Integer >>();
1589+
1590+ TestSubscriber <List <Observable <List <Integer >>>> outer = new TestSubscriber <List <Observable <List <Integer >>>>(new Subscriber <List <Observable <List <Integer >>>>() {
1591+
1592+ @ Override
1593+ public void onCompleted () {
1594+ }
1595+
1596+ @ Override
1597+ public void onError (Throwable e ) {
1598+ }
1599+
1600+ @ Override
1601+ public void onNext (List <Observable <List <Integer >>> o ) {
1602+ o .get (0 ).subscribe (inner1 );
1603+ o .get (1 ).subscribe (inner2 );
1604+ }
1605+ });
1606+
1607+
1608+
1609+
1610+ Observable .range (0 , 10 )
1611+ .groupBy (new Func1 <Integer , Boolean >() {
1612+ @ Override
1613+ public Boolean call (Integer pair ) {
1614+ return pair % 2 == 1 ;
1615+ }
1616+ })
1617+ .map (new Func1 <GroupedObservable <Boolean , Integer >, Observable <List <Integer >>>() {
1618+ @ Override
1619+ public Observable <List <Integer >> call (GroupedObservable <Boolean , Integer > oddOrEven ) {
1620+ return oddOrEven .toList ();
1621+ }
1622+ })
1623+ .toList ()
1624+ .subscribe (outer );
1625+
1626+ inner1 .assertNoErrors ();
1627+ inner1 .assertCompleted ();
1628+ inner2 .assertNoErrors ();
1629+ inner2 .assertCompleted ();
1630+
1631+ inner1 .assertReceivedOnNext (Arrays .asList (Arrays .asList (0 ,2 ,4 ,6 ,8 )));
1632+ inner2 .assertReceivedOnNext (Arrays .asList (Arrays .asList (1 ,3 ,5 ,7 ,9 )));
1633+
1634+ outer .assertNoErrors ();
1635+ outer .assertCompleted ();
1636+ outer .assertValueCount (1 );
1637+
1638+ }
1639+
1640+ @ Test
1641+ public void testCollectedGroups () {
1642+
1643+ final TestSubscriber <List <Integer >> inner1 = new TestSubscriber <List <Integer >>();
1644+ final TestSubscriber <List <Integer >> inner2 = new TestSubscriber <List <Integer >>();
1645+
1646+ final List <TestSubscriber <List <Integer >>> inners = Arrays .asList (inner1 , inner2 );
1647+
1648+ TestSubscriber <Observable <List <Integer >>> outer = new TestSubscriber <Observable <List <Integer >>>(new Subscriber <Observable <List <Integer >>>() {
1649+ int toInner ;
1650+ @ Override
1651+ public void onCompleted () {
1652+ }
1653+
1654+ @ Override
1655+ public void onError (Throwable e ) {
1656+ }
1657+
1658+ @ Override
1659+ public void onNext (Observable <List <Integer >> o ) {
1660+ o .subscribe (inners .get (toInner ++));
1661+ }
1662+ });
1663+
1664+
1665+
1666+
1667+ Observable .range (0 , 10 )
1668+ .groupBy (new Func1 <Integer , Boolean >() {
1669+ @ Override
1670+ public Boolean call (Integer pair ) {
1671+ return pair % 2 == 1 ;
1672+ }
1673+ })
1674+ .map (new Func1 <GroupedObservable <Boolean ,Integer >, Observable <List <Integer >>>() {
1675+ @ Override
1676+ public Observable <List <Integer >> call (GroupedObservable <Boolean , Integer > booleanIntegerGroupedObservable ) {
1677+ return booleanIntegerGroupedObservable .toList ();
1678+ }
1679+ })
1680+ .subscribe (outer );
1681+
1682+ inner1 .assertNoErrors ();
1683+ inner1 .assertCompleted ();
1684+
1685+ inner1 .assertReceivedOnNext (Arrays .asList (Arrays .asList (0 ,2 ,4 ,6 ,8 )));
1686+ inner2 .assertReceivedOnNext (Arrays .asList (Arrays .asList (1 ,3 ,5 ,7 ,9 )));
1687+
1688+ outer .assertNoErrors ();
1689+ outer .assertCompleted ();
1690+ outer .assertValueCount (2 );
1691+
1692+ }
1693+
1694+ @ Test
1695+ public void testMappedCollectedGroups () {
1696+ // This is a little contrived.
1697+ final TestSubscriber <Integer > inner1 = new TestSubscriber <Integer >();
1698+ final TestSubscriber <Integer > inner2 = new TestSubscriber <Integer >();
1699+
1700+ TestSubscriber <Map <Integer , Observable <Integer >>> outer = new TestSubscriber <Map <Integer , Observable <Integer >>>(new Subscriber <Map <Integer , Observable <Integer >>>() {
1701+ @ Override
1702+ public void onCompleted () {
1703+
1704+ }
1705+
1706+ @ Override
1707+ public void onError (Throwable e ) {
1708+
1709+ }
1710+
1711+ @ Override
1712+ public void onNext (Map <Integer , Observable <Integer >> integerObservableMap ) {
1713+ integerObservableMap .get (0 ).subscribe (inner1 );
1714+ integerObservableMap .get (1 ).subscribe (inner2 );
1715+ }
1716+ });
1717+
1718+ Observable <Map <Integer , Observable <Integer >>> mapObservable =
1719+ Observable .range (0 , 10 )
1720+ .groupBy (new Func1 <Integer , Integer >() {
1721+ @ Override
1722+ public Integer call (Integer pair ) {
1723+ return pair % 2 ;
1724+ }
1725+ })
1726+ .toMap (new Func1 <GroupedObservable <Integer , Integer >, Integer >() {
1727+ @ Override
1728+ public Integer call (GroupedObservable <Integer , Integer > group ) {
1729+ return group .getKey ();
1730+ }
1731+ },
1732+ new Func1 <GroupedObservable <Integer , Integer >, Observable <Integer >>() {
1733+ @ Override
1734+ public Observable <Integer > call (GroupedObservable <Integer , Integer > integerGroup ) {
1735+ return integerGroup .map (
1736+ new Func1 <Integer , Integer >() {
1737+ @ Override
1738+ public Integer call (Integer integer ) {
1739+ return integer * 10 ;
1740+ }
1741+ });
1742+ }
1743+ }
1744+ );
1745+
1746+ mapObservable .subscribe (outer );
1747+
1748+ inner1 .assertNoErrors ();
1749+ inner1 .assertCompleted ();
1750+
1751+ inner1 .assertReceivedOnNext (Arrays .asList (0 , 20 , 40 , 60 , 80 ));
1752+ inner2 .assertReceivedOnNext (Arrays .asList (10 , 30 , 50 , 70 , 90 ));
1753+
1754+ outer .assertNoErrors ();
1755+ outer .assertCompleted ();
1756+ outer .assertValueCount (1 );
1757+
1758+ }
1759+
1760+ @ Test
1761+ public void testSkippedGroup () {
1762+
1763+ final TestSubscriber <Integer > inner1 = new TestSubscriber <Integer >();
1764+
1765+ TestSubscriber <GroupedObservable <Integer , Integer >> outer = new TestSubscriber <GroupedObservable <Integer , Integer >>(new Subscriber <GroupedObservable <Integer , Integer >>() {
1766+
1767+ @ Override
1768+ public void onCompleted () {
1769+ }
1770+
1771+ @ Override
1772+ public void onError (Throwable e ) {
1773+ }
1774+
1775+ @ Override
1776+ public void onNext (GroupedObservable <Integer , Integer > o ) {
1777+ if (o .getKey () == 1 ) {
1778+ o .subscribe (inner1 );
1779+ }
1780+ }
1781+ });
1782+
1783+
1784+
1785+
1786+ Observable .range (0 , 10 )
1787+ .groupBy (new Func1 <Integer , Integer >() {
1788+ @ Override
1789+ public Integer call (Integer pair ) {
1790+ return pair % 2 ;
1791+ }
1792+ })
1793+ .subscribe (outer );
1794+
1795+ inner1 .assertNoErrors ();
1796+ inner1 .assertCompleted ();
1797+
1798+ inner1 .assertReceivedOnNext (Arrays .asList (1 ,3 ,5 ,7 ,9 ));
1799+
1800+ outer .assertNoErrors ();
1801+ outer .assertCompleted ();
1802+ outer .assertValueCount (2 );
1803+
1804+ }
15841805}
0 commit comments