Skip to content

Commit 6c51edb

Browse files
author
Christopher Grimm
committed
ported groupByUntil function.
1 parent 72f043e commit 6c51edb

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,32 @@ trait Observable[+T]
13251325
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
13261326
}
13271327

1328+
/**
1329+
* Groups the items emitted by this Observable according to a specified discriminator function and terminates these groups
1330+
* according to a function.
1331+
*
1332+
* @param f
1333+
* a function that extracts the key from an item
1334+
* @param closings
1335+
* the function that accepts a created grouping and emits an observable which upon emitting a Closing,
1336+
* closes the group.
1337+
* @tparam K
1338+
* the type of the keys returned by the discriminator function.
1339+
* @tparam Closing
1340+
* the type of the element emitted from the closings observable.
1341+
* @return an Observable that emits `(key, observable)`pairs, where `observable`
1342+
* contains all items for which `f` returned `key` before `closings` emits a value.
1343+
*/
1344+
def groupByUntil[K, Closing](f: T => K, closings: Observable[T]=>Observable[Closing]): Observable[(K, Observable[T])] = {
1345+
val closing = (o: rx.observables.GroupedObservable[K,T]) => closings(toScalaObservable[T](o)).asJavaObservable
1346+
val fclosing = new Func1[rx.observables.GroupedObservable[K, T], rx.Observable[_ <: Closing]] {
1347+
def call(o: rx.observables.GroupedObservable[K, T]) = closing(o)
1348+
}.asInstanceOf[Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[Closing]]]
1349+
val o1 = asJavaObservable.groupByUntil[K, Closing](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
1350+
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
1351+
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
1352+
}
1353+
13281354
/**
13291355
* Given an Observable that emits Observables, creates a single Observable that
13301356
* emits the items emitted by the most recently published of those Observables.

0 commit comments

Comments
 (0)