Skip to content

Commit 8bac6ba

Browse files
authored
Fix race condition with disposal in MemberSingle.tryEmit issue #58, pr #59
* fix race condition with disposal in MemberSingle.tryEmit #58 * organize imports
1 parent 21c91fb commit 8bac6ba

File tree

2 files changed

+17
-10
lines changed

2 files changed

+17
-10
lines changed

rxjava2-jdbc/src/test/java/org/davidmoten/rx/jdbc/SelectTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.davidmoten.rx.jdbc;
22

3-
import org.davidmoten.rx.jdbc.Select;
43
import org.junit.Test;
54

65
import com.github.davidmoten.junit.Asserts;

rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -259,21 +259,25 @@ private void scheduleReleasesNoDelay() {
259259
}
260260

261261
private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) {
262-
// get a fresh worker each time so we jump threads to
263-
// break the stack-trace (a long-enough chain of
264-
// checkout-checkins could otherwise provoke stack
265-
// overflow)
266-
262+
// note that tryEmit is protected by the drain method so will
263+
// not be run concurrently. We do have to be careful with
264+
// concurrent disposal of observers though.
265+
266+
267267
// advance counter to the next and choose an Observer to emit to (round robin)
268268

269269
int index = obs.index;
270+
// a precondition of this method is that obs.activeCount > 0 (enforced by drain method)
270271
MemberSingleObserver<T> o = obs.observers[index];
271272
MemberSingleObserver<T> oNext = o;
272-
// atomically bump up the index (if that entry has not been deleted in
273-
// the meantime by disposal)
273+
274+
// atomically bump up the index to select the next Observer by round-robin
275+
// (if that entry has not been deleted in the meantime by disposal). Need
276+
// to be careful too that ALL observers have not been deleted via a race
277+
// with disposal.
274278
while (true) {
275279
Observers<T> x = observers.get();
276-
if (x.index == index && x.observers[index] == o) {
280+
if (x.index == index && x.activeCount > 0 && x.observers[index] == o) {
277281
boolean[] active = new boolean[x.active.length];
278282
System.arraycopy(x.active, 0, active, 0, active.length);
279283
int nextIndex = (index + 1) % active.length;
@@ -292,6 +296,10 @@ private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) {
292296
return false;
293297
}
294298
}
299+
// get a fresh worker each time so we jump threads to
300+
// break the stack-trace (a long-enough chain of
301+
// checkout-checkins could otherwise provoke stack
302+
// overflow)
295303
Worker worker = scheduler.createWorker();
296304
worker.schedule(new Emitter<T>(worker, oNext, m));
297305
return true;
@@ -479,7 +487,7 @@ private static final class Observers<T> {
479487
final int requested;
480488

481489
Observers(MemberSingleObserver<T>[] observers, boolean[] active, int activeCount, int index, int requested) {
482-
Preconditions.checkArgument(observers.length > 0 || index == 0, "index must be -1 for zero length array");
490+
Preconditions.checkArgument(observers.length > 0 || index == 0, "index must be 0 for zero length array");
483491
Preconditions.checkArgument(observers.length == active.length);
484492
this.observers = observers;
485493
this.index = index;

0 commit comments

Comments
 (0)