@@ -3808,62 +3808,6 @@ trait Observable[+T]
38083808 asJavaObservable.subscribe(onNext, onError, onComplete)
38093809 }
38103810
3811- /**
3812- * Pivots a sequence of `(K1, Observable[(K2, Observable[U])])`s emitted by an `Observable` so as to swap the group
3813- * and and the set on which their items are grouped.
3814- * <p>
3815- * <img width="640" height="580" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.png">
3816- *
3817- * For example an `Observable` such as `this = Observable[(String, Observable[(Boolean, Observable[Integer])])`:
3818- * <ul>
3819- * <li>o1.odd: 1, 3, 5, 7, 9 on Thread 1</li>
3820- * <li>o1.even: 2, 4, 6, 8, 10 on Thread 1</li>
3821- * <li>o2.odd: 11, 13, 15, 17, 19 on Thread 2</li>
3822- * <li>o2.even: 12, 14, 16, 18, 20 on Thread 2</li>
3823- * </ul>
3824- * is pivoted to become `this = Observable[(Boolean, Observable[(String, Observable[Integer])])`:
3825- *
3826- * <ul>
3827- * <li>odd.o1: 1, 3, 5, 7, 9 on Thread 1</li>
3828- * <li>odd.o2: 11, 13, 15, 17, 19 on Thread 2</li>
3829- * <li>even.o1: 2, 4, 6, 8, 10 on Thread 1</li>
3830- * <li>even.o2: 12, 14, 16, 18, 20 on Thread 2</li>
3831- * </ul>
3832- * <p>
3833- * <img width="640" height="1140" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.ex.png">
3834- * <p>
3835- * <em>Note:</em> A `(K, Observable[_])` will cache the items it is to emit until such time as it
3836- * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
3837- * `(K, Observable[_])`s that do not concern you. Instead, you can signal to them that they may
3838- * discard their buffers by applying an operator like `take(0)` to them.
3839- *
3840- * @return an `Observable`containing a stream of nested `(K1, Observable[(K2, Observable[U])])`s with swapped
3841- * inner-outer keys.
3842- */
3843- def pivot [U , K1 , K2 ](implicit evidence : Observable [T ] <:< Observable [(K1 , Observable [(K2 , Observable [U ])])]): Observable [(K2 , Observable [(K1 , Observable [U ])])] = {
3844- import rx .observables .{GroupedObservable => JGroupedObservable }
3845- val f1 = new Func1 [(K1 , Observable [(K2 , Observable [U ])]), JGroupedObservable [K1 , JGroupedObservable [K2 , U ]]]() {
3846- override def call (t1 : (K1 , Observable [(K2 , Observable [U ])])): JGroupedObservable [K1 , JGroupedObservable [K2 , U ]] = {
3847- val jo = t1._2.asJavaObservable.asInstanceOf [rx.Observable [(K2 , Observable [U ])]].map[JGroupedObservable [K2 , U ]](new Func1 [(K2 , Observable [U ]), JGroupedObservable [K2 , U ]]() {
3848- override def call (t2 : (K2 , Observable [U ])): JGroupedObservable [K2 , U ] = {
3849- JGroupedObservable .from(t2._1, t2._2.asJavaObservable.asInstanceOf [rx.Observable [U ]])
3850- }
3851- })
3852- JGroupedObservable .from(t1._1, jo)
3853- }
3854- }
3855- val o1 : Observable [(K1 , Observable [(K2 , Observable [U ])])] = this
3856- val o2 = toScalaObservable[JGroupedObservable [K2 , JGroupedObservable [K1 , U ]]](rx.Observable .pivot(o1.asJavaObservable.map(f1)))
3857- o2.map {
3858- (jgo1 : JGroupedObservable [K2 , JGroupedObservable [K1 , U ]]) => {
3859- val jo = jgo1.map[(K1 , Observable [U ])](new Func1 [JGroupedObservable [K1 , U ], (K1 , Observable [U ])]() {
3860- override def call (jgo2 : JGroupedObservable [K1 , U ]): (K1 , Observable [U ]) = (jgo2.getKey, toScalaObservable[U ](jgo2))
3861- })
3862- (jgo1.getKey, toScalaObservable[(K1 , Observable [U ])](jo))
3863- }
3864- }
3865- }
3866-
38673811 /**
38683812 * Returns an Observable that counts the total number of items emitted by the source Observable and emits this count as a 64-bit Long.
38693813 *
0 commit comments