@@ -46,18 +46,18 @@ public OperatorGroupBy(final Func1<? super T, ? extends K> keySelector) {
4646 }
4747
4848 @ Override
49- public Subscriber <? super T > call (final Subscriber <? super GroupedObservable <K , T >> childObserver ) {
50- return new GroupBySubscriber <K , T >(keySelector , childObserver );
49+ public Subscriber <? super T > call (final Subscriber <? super GroupedObservable <K , T >> child ) {
50+ return new GroupBySubscriber <K , T >(keySelector , child );
5151 }
5252 static final class GroupBySubscriber <K , T > extends Subscriber <T > {
5353 final Func1 <? super T , ? extends K > keySelector ;
54- final Subscriber <? super GroupedObservable <K , T >> childObserver ;
55- public GroupBySubscriber (Func1 <? super T , ? extends K > keySelector , Subscriber <? super GroupedObservable <K , T >> childObserver ) {
54+ final Subscriber <? super GroupedObservable <K , T >> child ;
55+ public GroupBySubscriber (Func1 <? super T , ? extends K > keySelector , Subscriber <? super GroupedObservable <K , T >> child ) {
5656 // a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
5757 // and will unsubscribe on this parent if they are all unsubscribed
5858 super (new ChainedSubscription ());
5959 this .keySelector = keySelector ;
60- this .childObserver = childObserver ;
60+ this .child = child ;
6161 }
6262 private final Map <K , BufferUntilSubscriber <T >> groups = new HashMap <K , BufferUntilSubscriber <T >>();
6363 volatile int completionCounter ;
@@ -86,7 +86,7 @@ public void onCompleted() {
8686 if (completionCounter == 0 ) {
8787 // we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted
8888 if (EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
89- childObserver .onCompleted ();
89+ child .onCompleted ();
9090 }
9191 }
9292 }
@@ -96,23 +96,23 @@ public void onCompleted() {
9696 public void onError (Throwable e ) {
9797 if (TERMINATED_UPDATER .compareAndSet (this , 0 , 1 )) {
9898 // we immediately tear everything down if we receive an error
99- childObserver .onError (e );
99+ child .onError (e );
100100 }
101101 }
102102
103103 @ Override
104104 public void onNext (T t ) {
105105 try {
106106 final K key = keySelector .call (t );
107- BufferUntilSubscriber <T > gps = groups .get (key );
108- if (gps == null ) {
107+ BufferUntilSubscriber <T > group = groups .get (key );
108+ if (group == null ) {
109109 // this group doesn't exist
110- if (childObserver .isUnsubscribed ()) {
110+ if (child .isUnsubscribed ()) {
111111 // we have been unsubscribed on the outer so won't send any more groups
112112 return ;
113113 }
114- gps = BufferUntilSubscriber .create ();
115- final BufferUntilSubscriber <T > _gps = gps ;
114+ group = BufferUntilSubscriber .create ();
115+ final BufferUntilSubscriber <T > _group = group ;
116116
117117 GroupedObservable <K , T > go = new GroupedObservable <K , T >(key , new OnSubscribe <T >() {
118118
@@ -128,7 +128,7 @@ public void call() {
128128 }
129129
130130 }));
131- _gps .unsafeSubscribe (new Subscriber <T >(o ) {
131+ _group .unsafeSubscribe (new Subscriber <T >(o ) {
132132
133133 @ Override
134134 public void onCompleted () {
@@ -150,26 +150,26 @@ public void onNext(T t) {
150150 }
151151
152152 });
153- groups .put (key , gps );
154- childObserver .onNext (go );
153+ groups .put (key , group );
154+ child .onNext (go );
155155 }
156156 // we have the correct group so send value to it
157- gps .onNext (t );
157+ group .onNext (t );
158158 } catch (Throwable e ) {
159159 onError (OnErrorThrowable .addValueAsLastCause (e , t ));
160160 }
161161 }
162162
163163 private void completeInner () {
164164 // count can be < 0 because unsubscribe also calls this
165- if (COUNTER_UPDATER .decrementAndGet (this ) <= 0 && (terminated == 1 || childObserver .isUnsubscribed ())) {
165+ if (COUNTER_UPDATER .decrementAndGet (this ) <= 0 && (terminated == 1 || child .isUnsubscribed ())) {
166166 // completionEmitted ensures we only emit onCompleted once
167167 if (EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
168- if (childObserver .isUnsubscribed ()) {
168+ if (child .isUnsubscribed ()) {
169169 // if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
170170 unsubscribe ();
171171 }
172- childObserver .onCompleted ();
172+ child .onCompleted ();
173173 }
174174 }
175175 }
0 commit comments