3737 * @param <K> the key type
3838 * @param <T> the source and group value type
3939 */
40- public final class OperatorGroupBy <K , T > implements Operator <GroupedObservable <K , T >, T > {
40+ public final class OperatorGroupBy <T , K , R > implements Operator <GroupedObservable <K , R >, T > {
4141
4242 final Func1 <? super T , ? extends K > keySelector ;
43+ final Func1 <? super T , ? extends R > elementSelector ;
4344
45+ @ SuppressWarnings ("unchecked" )
4446 public OperatorGroupBy (final Func1 <? super T , ? extends K > keySelector ) {
47+ this (keySelector , (Func1 <T , R >)IDENTITY );
48+ }
49+
50+ public OperatorGroupBy (Func1 <? super T , ? extends K > keySelector , Func1 <? super T , ? extends R > elementSelector ) {
4551 this .keySelector = keySelector ;
52+ this .elementSelector = elementSelector ;
4653 }
4754
4855 @ Override
49- public Subscriber <? super T > call (final Subscriber <? super GroupedObservable <K , T >> child ) {
50- return new GroupBySubscriber <K , T >(keySelector , child );
56+ public Subscriber <? super T > call (final Subscriber <? super GroupedObservable <K , R >> child ) {
57+ return new GroupBySubscriber <K , T , R >(keySelector , elementSelector , child );
5158 }
52- static final class GroupBySubscriber <K , T > extends Subscriber <T > {
59+ static final class GroupBySubscriber <K , T , R > extends Subscriber <T > {
5360 final Func1 <? super T , ? extends K > keySelector ;
54- final Subscriber <? super GroupedObservable <K , T >> child ;
55- public GroupBySubscriber (Func1 <? super T , ? extends K > keySelector , Subscriber <? super GroupedObservable <K , T >> child ) {
61+ final Func1 <? super T , ? extends R > elementSelector ;
62+ final Subscriber <? super GroupedObservable <K , R >> child ;
63+ public GroupBySubscriber (Func1 <? super T , ? extends K > keySelector , Func1 <? super T , ? extends R > elementSelector , Subscriber <? super GroupedObservable <K , R >> child ) {
5664 // a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
5765 // and will unsubscribe on this parent if they are all unsubscribed
5866 super ();
5967 this .keySelector = keySelector ;
68+ this .elementSelector = elementSelector ;
6069 this .child = child ;
6170 }
6271 private final Map <K , BufferUntilSubscriber <T >> groups = new HashMap <K , BufferUntilSubscriber <T >>();
@@ -124,10 +133,10 @@ public void onNext(T t) {
124133 group = BufferUntilSubscriber .create ();
125134 final BufferUntilSubscriber <T > _group = group ;
126135
127- GroupedObservable <K , T > go = new GroupedObservable <K , T >(key , new OnSubscribe <T >() {
136+ GroupedObservable <K , R > go = new GroupedObservable <K , R >(key , new OnSubscribe <R >() {
128137
129138 @ Override
130- public void call (final Subscriber <? super T > o ) {
139+ public void call (final Subscriber <? super R > o ) {
131140 // number of children we have running
132141 COUNTER_UPDATER .incrementAndGet (GroupBySubscriber .this );
133142 o .add (Subscriptions .create (new Action0 () {
@@ -153,7 +162,7 @@ public void onError(Throwable e) {
153162
154163 @ Override
155164 public void onNext (T t ) {
156- o .onNext (t );
165+ o .onNext (elementSelector . call ( t ) );
157166 }
158167
159168 });
@@ -185,4 +194,13 @@ private void completeInner() {
185194 }
186195
187196 }
197+
198+ private final static Func1 <Object , Object > IDENTITY = new Func1 <Object , Object >() {
199+
200+ @ Override
201+ public Object call (Object t ) {
202+ return t ;
203+ }
204+
205+ };
188206}
0 commit comments