@@ -182,38 +182,49 @@ public Collector(Observer<? super R> observer, Subscription cancel, int count) {
182
182
this .lock = new ReentrantLock ();
183
183
}
184
184
public void next (int index , T value ) {
185
+ Throwable err = null ;
185
186
lock .lock ();
186
187
try {
187
- values [index ] = value ;
188
- if (!hasValue .get (index )) {
189
- hasValue .set (index );
190
- hasCount ++;
191
- }
192
- if (hasCount == values .length ) {
193
- // clone: defensive copy due to varargs
194
- try {
195
- observer .onNext (combiner .call (values .clone ()));
196
- } catch (Throwable t ) {
197
- terminate ();
198
- observer .onError (t );
199
- cancel .unsubscribe ();
188
+ if (!isTerminated ()) {
189
+ values [index ] = value ;
190
+ if (!hasValue .get (index )) {
191
+ hasValue .set (index );
192
+ hasCount ++;
193
+ }
194
+ if (hasCount == values .length ) {
195
+ // clone: defensive copy due to varargs
196
+ try {
197
+ observer .onNext (combiner .call (values .clone ()));
198
+ } catch (Throwable t ) {
199
+ terminate ();
200
+ err = t ;
201
+ }
200
202
}
201
203
}
202
204
} finally {
203
205
lock .unlock ();
204
206
}
207
+ if (err != null ) {
208
+ // no need to lock here
209
+ observer .onError (err );
210
+ cancel .unsubscribe ();
211
+ }
205
212
}
206
213
public void error (int index , Throwable e ) {
214
+ boolean unsub = false ;
207
215
lock .lock ();
208
216
try {
209
217
if (!isTerminated ()) {
210
218
terminate ();
211
- observer .onError (e );
212
- cancel .unsubscribe ();
219
+ unsub = true ;
213
220
}
214
221
} finally {
215
222
lock .unlock ();
216
223
}
224
+ if (unsub ) {
225
+ observer .onError (e );
226
+ cancel .unsubscribe ();
227
+ }
217
228
}
218
229
boolean isTerminated () {
219
230
return completedCount == values .length + 1 ;
@@ -223,6 +234,7 @@ void terminate() {
223
234
Arrays .fill (values , null );
224
235
}
225
236
public void completed (int index ) {
237
+ boolean unsub = false ;
226
238
lock .lock ();
227
239
try {
228
240
if (!completed .get (index )) {
@@ -232,13 +244,16 @@ public void completed(int index) {
232
244
if ((!hasValue .get (index ) || completedCount == values .length )
233
245
&& !isTerminated ()) {
234
246
terminate ();
235
- observer .onCompleted ();
236
- cancel .unsubscribe ();
247
+ unsub = true ;
237
248
}
238
249
} finally {
239
250
lock .unlock ();
240
251
}
241
-
252
+ if (unsub ) {
253
+ // no need to hold a lock at this point
254
+ observer .onCompleted ();
255
+ cancel .unsubscribe ();
256
+ }
242
257
}
243
258
}
244
259
/**
0 commit comments