Skip to content

Commit f9a9722

Browse files
Fix Merge Bug
Working on #1420
1 parent fc86f8c commit f9a9722

File tree

4 files changed

+42
-35
lines changed

4 files changed

+42
-35
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex
5353
private int wip;
5454
private boolean completed;
5555

56-
private SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;
56+
private volatile SubscriptionIndexedRingBuffer<InnerSubscriber<T>> childrenSubscribers;
5757

5858
private RxRingBuffer scalarValueQueue = null;
5959

@@ -244,11 +244,12 @@ private boolean drainQueuesIfNeeded() {
244244
emitted = drainScalarValueQueue();
245245
drainChildrenQueues();
246246
} finally {
247-
if (!releaseEmitLock()) {
248-
return true;
249-
}
247+
boolean moreToDrain = releaseEmitLock();
250248
// request outside of lock
251249
request(emitted);
250+
if (!moreToDrain) {
251+
return true;
252+
}
252253
// otherwise we'll loop and get whatever was added
253254
}
254255
} else {
@@ -350,27 +351,29 @@ public void onCompleted() {
350351
}
351352
if (c) {
352353
// complete outside of lock
353-
actual.onCompleted();
354+
drainAndComplete();
354355
}
355356
}
356357

357358
void completeInner(InnerSubscriber<T> s) {
358-
try {
359-
boolean sendOnComplete = false;
360-
synchronized (this) {
361-
wip--;
362-
if (wip == 0 && completed) {
363-
sendOnComplete = true;
364-
}
365-
}
366-
if (sendOnComplete) {
367-
actual.onCompleted();
359+
boolean sendOnComplete = false;
360+
synchronized (this) {
361+
wip--;
362+
if (wip == 0 && completed) {
363+
sendOnComplete = true;
368364
}
369-
} finally {
370-
childrenSubscribers.remove(s.sindex);
365+
}
366+
childrenSubscribers.remove(s.sindex);
367+
if (sendOnComplete) {
368+
drainAndComplete();
371369
}
372370
}
373371

372+
private void drainAndComplete() {
373+
drainQueuesIfNeeded(); // TODO need to confirm whether this is needed or not
374+
actual.onCompleted();
375+
}
376+
374377
}
375378

376379
private static final class MergeProducer<T> implements Producer {
@@ -402,9 +405,10 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
402405
final MergeSubscriber<T> parentSubscriber;
403406
final MergeProducer<T> producer;
404407
/** Make sure the inner termination events are delivered only once. */
405-
volatile int once;
408+
volatile int terminated;
406409
@SuppressWarnings("rawtypes")
407-
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "once");
410+
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");
411+
408412
private final RxRingBuffer q = RxRingBuffer.getSpmcInstance();
409413
/* protected by emitLock */
410414
int emitted = 0;
@@ -425,14 +429,14 @@ public void onNext(T t) {
425429
@Override
426430
public void onError(Throwable e) {
427431
// it doesn't go through queues, it immediately onErrors and tears everything down
428-
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
432+
if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
429433
parentSubscriber.onError(e);
430434
}
431435
}
432436

433437
@Override
434438
public void onCompleted() {
435-
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
439+
if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
436440
emit(null, true);
437441
}
438442
}

rxjava-core/src/main/java/rx/internal/util/IndexedRingBuffer.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -122,21 +122,16 @@ public int add(E e) {
122122
}
123123

124124
public E remove(int index) {
125-
try {
126-
E e;
127-
if (index < SIZE) {
128-
// fast-path when we are in the first section
129-
e = elements.array.getAndSet(index, null);
130-
} else {
131-
int sectionIndex = index % SIZE;
132-
e = getElementSection(index).array.getAndSet(sectionIndex, null);
133-
}
134-
pushRemovedIndex(index);
135-
return e;
136-
} catch (NullPointerException ne) {
137-
ne.printStackTrace();
138-
throw ne;
125+
E e;
126+
if (index < SIZE) {
127+
// fast-path when we are in the first section
128+
e = elements.array.getAndSet(index, null);
129+
} else {
130+
int sectionIndex = index % SIZE;
131+
e = getElementSection(index).array.getAndSet(sectionIndex, null);
139132
}
133+
pushRemovedIndex(index);
134+
return e;
140135
}
141136

142137
private IndexSection getIndexSection(int index) {

rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,13 @@ public int count() {
260260
}
261261
return queue.size();
262262
}
263+
264+
public boolean isEmpty() {
265+
if (queue == null) {
266+
return true;
267+
}
268+
return queue.isEmpty();
269+
}
263270

264271
public Object poll() {
265272
if (queue == null) {

rxjava-core/src/main/java/rx/internal/util/SubscriptionList.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public void unsubscribe() {
8686
}
8787
// we will only get here once
8888
unsubscribeFromAll(subscriptions);
89+
subscriptions = null;
8990
}
9091

9192
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {

0 commit comments

Comments
 (0)