Skip to content

Commit 83aa857

Browse files
author
Christopher Grimm
committed
modified groupByUntil include the key of the group in the closings function.
1 parent 6c51edb commit 83aa857

File tree

4 files changed

+24
-15
lines changed

4 files changed

+24
-15
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,17 @@ class RxScalaDemo extends JUnitSuite {
242242
waitFor(firstMedalOfEachCountry)
243243
}
244244

245+
@Test def groupByUntilExample() {
246+
val numbers = Observable.interval(250 millis) take 14
247+
val grouped = numbers.groupByUntil[Long, Long](
248+
{case x => x % 2},
249+
{case (key, obs) => obs filter {case x => x == 7}}
250+
)
251+
val sequenced = (grouped map {case (key, obs) => obs.toSeq}).flatten
252+
sequenced subscribe {x => println(s"Emitted group: $x")}
253+
}
254+
255+
245256
@Test def olympicsExample() {
246257
val medals = Olympics.mountainBikeMedals.publish
247258
medals.subscribe(println(_))
@@ -449,4 +460,4 @@ class RxScalaDemo extends JUnitSuite {
449460
obs.toBlockingObservable.toIterable.last
450461
}
451462

452-
}
463+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,20 +1332,18 @@ trait Observable[+T]
13321332
* @param f
13331333
* a function that extracts the key from an item
13341334
* @param closings
1335-
* the function that accepts a created grouping and emits an observable which upon emitting a Closing,
1336-
* closes the group.
1335+
* the function that accepts the key of a given group and an observable representing that group, and returns
1336+
* an observable that emits a single Closing when the group should be closed.
13371337
* @tparam K
13381338
* the type of the keys returned by the discriminator function.
13391339
* @tparam Closing
13401340
* the type of the element emitted from the closings observable.
1341-
* @return an Observable that emits `(key, observable)`pairs, where `observable`
1341+
* @return an Observable that emits `(key, observable)` pairs, where `observable`
13421342
* contains all items for which `f` returned `key` before `closings` emits a value.
13431343
*/
1344-
def groupByUntil[K, Closing](f: T => K, closings: Observable[T]=>Observable[Closing]): Observable[(K, Observable[T])] = {
1345-
val closing = (o: rx.observables.GroupedObservable[K,T]) => closings(toScalaObservable[T](o)).asJavaObservable
1346-
val fclosing = new Func1[rx.observables.GroupedObservable[K, T], rx.Observable[_ <: Closing]] {
1347-
def call(o: rx.observables.GroupedObservable[K, T]) = closing(o)
1348-
}.asInstanceOf[Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[Closing]]]
1344+
def groupByUntil[K, Closing](f: T => K, closings: (K, Observable[T])=>Observable[Closing]): Observable[(K, Observable[T])] = {
1345+
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Closing]] =
1346+
(jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable
13491347
val o1 = asJavaObservable.groupByUntil[K, Closing](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
13501348
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
13511349
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7552,7 +7552,7 @@ public <U> Observable<T> skipUntil(Observable<U> other) {
75527552
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupByUntil()</a>
75537553
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211932.aspx">MSDN: Observable.GroupByUntil</a>
75547554
*/
7555-
public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super GroupedObservable<TKey, T>, ? extends Observable<TDuration>> durationSelector) {
7555+
public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super GroupedObservable<TKey, T>, ? extends Observable<? extends TDuration>> durationSelector) {
75567556
return groupByUntil(keySelector, Functions.<T>identity(), durationSelector);
75577557
}
75587558

@@ -7573,7 +7573,7 @@ public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Fun
75737573
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupByUntil()</a>
75747574
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229433.aspx">MSDN: Observable.GroupByUntil</a>
75757575
*/
7576-
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector) {
7576+
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<? extends TDuration>> durationSelector) {
75777577
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
75787578
}
75797579

rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ public class OperationGroupByUntil<TSource, TKey, TResult, TDuration> implements
4242
final Observable<TSource> source;
4343
final Func1<? super TSource, ? extends TKey> keySelector;
4444
final Func1<? super TSource, ? extends TResult> valueSelector;
45-
final Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector;
45+
final Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<? extends TDuration>> durationSelector;
4646
public OperationGroupByUntil(Observable<TSource> source,
4747
Func1<? super TSource, ? extends TKey> keySelector,
4848
Func1<? super TSource, ? extends TResult> valueSelector,
49-
Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector) {
49+
Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<? extends TDuration>> durationSelector) {
5050
this.source = source;
5151
this.keySelector = keySelector;
5252
this.valueSelector = valueSelector;
@@ -107,7 +107,7 @@ public void onNext(TSource args) {
107107
}
108108

109109
if (newGroup) {
110-
Observable<TDuration> duration;
110+
Observable<? extends TDuration> duration;
111111
try {
112112
duration = durationSelector.call(g);
113113
} catch (Throwable t) {
@@ -234,4 +234,4 @@ public void onCompleted() {
234234
}
235235

236236
}
237-
}
237+
}

0 commit comments

Comments
 (0)