Skip to content

Commit 2f06736

Browse files
committed
Merge pull request #1955 from akarnokd/OnBackpressureBlockFix
OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs
2 parents e934c81 + 047cc28 commit 2f06736

File tree

5 files changed

+447
-194
lines changed

5 files changed

+447
-194
lines changed

src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java

Lines changed: 26 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import java.util.concurrent.BlockingQueue;
2121

2222
import rx.Observable.Operator;
23-
import rx.Producer;
2423
import rx.Subscriber;
24+
import rx.internal.util.BackpressureDrainManager;
2525

2626
/**
2727
* Operator that blocks the producer thread in case a backpressure is needed.
@@ -38,42 +38,25 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
3838
return s;
3939
}
4040

41-
static final class BlockingSubscriber<T> extends Subscriber<T> {
41+
static final class BlockingSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback {
4242
final NotificationLite<T> nl = NotificationLite.instance();
4343
final BlockingQueue<Object> queue;
4444
final Subscriber<? super T> child;
45-
/** Guarded by this. */
46-
long requestedCount;
47-
/** Guarded by this. */
48-
boolean emitting;
49-
volatile boolean terminated;
50-
/** Set before terminated, read after terminated. */
51-
Throwable exception;
45+
final BackpressureDrainManager manager;
5246
public BlockingSubscriber(int max, Subscriber<? super T> child) {
5347
this.queue = new ArrayBlockingQueue<Object>(max);
5448
this.child = child;
49+
this.manager = new BackpressureDrainManager(this);
5550
}
5651
void init() {
5752
child.add(this);
58-
child.setProducer(new Producer() {
59-
@Override
60-
public void request(long n) {
61-
synchronized (BlockingSubscriber.this) {
62-
if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) {
63-
requestedCount = Long.MAX_VALUE;
64-
} else {
65-
requestedCount += n;
66-
}
67-
}
68-
drain();
69-
}
70-
});
53+
child.setProducer(manager);
7154
}
7255
@Override
7356
public void onNext(T t) {
7457
try {
7558
queue.put(nl.next(t));
76-
drain();
59+
manager.drain();
7760
} catch (InterruptedException ex) {
7861
if (!isUnsubscribed()) {
7962
onError(ex);
@@ -82,76 +65,31 @@ public void onNext(T t) {
8265
}
8366
@Override
8467
public void onError(Throwable e) {
85-
if (!terminated) {
86-
exception = e;
87-
terminated = true;
88-
drain();
89-
}
68+
manager.terminateAndDrain(e);
9069
}
9170
@Override
9271
public void onCompleted() {
93-
terminated = true;
94-
drain();
72+
manager.terminateAndDrain();
9573
}
96-
void drain() {
97-
long n;
98-
synchronized (this) {
99-
if (emitting) {
100-
return;
101-
}
102-
emitting = true;
103-
n = requestedCount;
104-
}
105-
boolean skipFinal = false;
106-
try {
107-
while (true) {
108-
int emitted = 0;
109-
while (n > 0) {
110-
Object o = queue.poll();
111-
if (o == null) {
112-
if (terminated) {
113-
if (exception != null) {
114-
child.onError(exception);
115-
} else {
116-
child.onCompleted();
117-
}
118-
return;
119-
}
120-
break;
121-
} else {
122-
child.onNext(nl.getValue(o));
123-
n--;
124-
emitted++;
125-
}
126-
}
127-
synchronized (this) {
128-
// if no backpressure below
129-
if (requestedCount == Long.MAX_VALUE) {
130-
// no new data arrived since the last poll
131-
if (queue.peek() == null) {
132-
skipFinal = true;
133-
emitting = false;
134-
return;
135-
}
136-
n = Long.MAX_VALUE;
137-
} else {
138-
if (emitted == 0) {
139-
skipFinal = true;
140-
emitting = false;
141-
return;
142-
}
143-
requestedCount -= emitted;
144-
n = requestedCount;
145-
}
146-
}
147-
}
148-
} finally {
149-
if (!skipFinal) {
150-
synchronized (this) {
151-
emitting = false;
152-
}
153-
}
74+
@Override
75+
public boolean accept(Object value) {
76+
return nl.accept(child, value);
77+
}
78+
@Override
79+
public void complete(Throwable exception) {
80+
if (exception != null) {
81+
child.onError(exception);
82+
} else {
83+
child.onCompleted();
15484
}
15585
}
86+
@Override
87+
public Object peek() {
88+
return queue.peek();
89+
}
90+
@Override
91+
public Object poll() {
92+
return queue.poll();
93+
}
15694
}
15795
}

src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java

Lines changed: 91 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.Queue;
1918
import java.util.concurrent.ConcurrentLinkedQueue;
2019
import java.util.concurrent.atomic.AtomicBoolean;
2120
import java.util.concurrent.atomic.AtomicLong;
@@ -25,11 +24,10 @@
2524
import rx.Subscriber;
2625
import rx.exceptions.MissingBackpressureException;
2726
import rx.functions.Action0;
27+
import rx.internal.util.BackpressureDrainManager;
2828

2929
public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> {
3030

31-
private final NotificationLite<T> on = NotificationLite.instance();
32-
3331
private final Long capacity;
3432
private final Action0 onOverflow;
3533

@@ -52,122 +50,114 @@ public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) {
5250

5351
@Override
5452
public Subscriber<? super T> call(final Subscriber<? super T> child) {
55-
// TODO get a different queue implementation
56-
final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
57-
final AtomicLong capacity = (this.capacity == null) ? null : new AtomicLong(this.capacity);
58-
final AtomicLong wip = new AtomicLong();
59-
final AtomicLong requested = new AtomicLong();
60-
61-
child.setProducer(new Producer() {
62-
63-
@Override
64-
public void request(long n) {
65-
if (requested.getAndAdd(n) == 0) {
66-
pollQueue(wip, requested, capacity, queue, child);
67-
}
68-
}
6953

70-
});
7154
// don't pass through subscriber as we are async and doing queue draining
7255
// a parent being unsubscribed should not affect the children
73-
Subscriber<T> parent = new Subscriber<T>() {
56+
BufferSubscriber<T> parent = new BufferSubscriber<T>(child, capacity, onOverflow);
7457

75-
private AtomicBoolean saturated = new AtomicBoolean(false);
58+
// if child unsubscribes it should unsubscribe the parent, but not the other way around
59+
child.add(parent);
60+
child.setProducer(parent.manager());
7661

77-
@Override
78-
public void onStart() {
79-
request(Long.MAX_VALUE);
80-
}
62+
return parent;
63+
}
64+
private static final class BufferSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback {
65+
// TODO get a different queue implementation
66+
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
67+
private final Long baseCapacity;
68+
private final AtomicLong capacity;
69+
private final Subscriber<? super T> child;
70+
private final AtomicBoolean saturated = new AtomicBoolean(false);
71+
private final BackpressureDrainManager manager;
72+
private final NotificationLite<T> on = NotificationLite.instance();
73+
private final Action0 onOverflow;
74+
75+
public BufferSubscriber(final Subscriber<? super T> child, Long capacity, Action0 onOverflow) {
76+
this.child = child;
77+
this.baseCapacity = capacity;
78+
this.capacity = capacity != null ? new AtomicLong(capacity) : null;
79+
this.onOverflow = onOverflow;
80+
this.manager = new BackpressureDrainManager(this);
81+
}
82+
@Override
83+
public void onStart() {
84+
request(Long.MAX_VALUE);
85+
}
8186

82-
@Override
83-
public void onCompleted() {
84-
if (!saturated.get()) {
85-
queue.offer(on.completed());
86-
pollQueue(wip, requested, capacity, queue, child);
87-
}
87+
@Override
88+
public void onCompleted() {
89+
if (!saturated.get()) {
90+
manager.terminateAndDrain();
8891
}
92+
}
8993

90-
@Override
91-
public void onError(Throwable e) {
92-
if (!saturated.get()) {
93-
queue.offer(on.error(e));
94-
pollQueue(wip, requested, capacity, queue, child);
95-
}
94+
@Override
95+
public void onError(Throwable e) {
96+
if (!saturated.get()) {
97+
manager.terminateAndDrain(e);
9698
}
99+
}
97100

98-
@Override
99-
public void onNext(T t) {
100-
if (!assertCapacity()) {
101-
return;
102-
}
103-
queue.offer(on.next(t));
104-
pollQueue(wip, requested, capacity, queue, child);
101+
@Override
102+
public void onNext(T t) {
103+
if (!assertCapacity()) {
104+
return;
105105
}
106+
queue.offer(on.next(t));
107+
manager.drain();
108+
}
106109

107-
private boolean assertCapacity() {
108-
if (capacity == null) {
109-
return true;
110-
}
111-
112-
long currCapacity;
113-
do {
114-
currCapacity = capacity.get();
115-
if (currCapacity <= 0) {
116-
if (saturated.compareAndSet(false, true)) {
117-
unsubscribe();
118-
child.onError(new MissingBackpressureException(
119-
"Overflowed buffer of "
120-
+ OperatorOnBackpressureBuffer.this.capacity));
121-
if (onOverflow != null) {
122-
onOverflow.call();
123-
}
124-
}
125-
return false;
126-
}
127-
// ensure no other thread stole our slot, or retry
128-
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
129-
return true;
110+
@Override
111+
public boolean accept(Object value) {
112+
return on.accept(child, value);
113+
}
114+
@Override
115+
public void complete(Throwable exception) {
116+
if (exception != null) {
117+
child.onError(exception);
118+
} else {
119+
child.onCompleted();
130120
}
131-
};
132-
133-
// if child unsubscribes it should unsubscribe the parent, but not the other way around
134-
child.add(parent);
121+
}
122+
@Override
123+
public Object peek() {
124+
return queue.peek();
125+
}
126+
@Override
127+
public Object poll() {
128+
Object value = queue.poll();
129+
if (capacity != null && value != null) {
130+
capacity.incrementAndGet();
131+
}
132+
return value;
133+
}
135134

136-
return parent;
137-
}
135+
private boolean assertCapacity() {
136+
if (capacity == null) {
137+
return true;
138+
}
138139

139-
private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue<Object> queue, Subscriber<? super T> child) {
140-
// TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
141-
if (requested.get() > 0) {
142-
// only one draining at a time
143-
try {
144-
/*
145-
* This needs to protect against concurrent execution because `request` and `on*` events can come concurrently.
146-
*/
147-
if (wip.getAndIncrement() == 0) {
148-
while (true) {
149-
if (requested.getAndDecrement() != 0) {
150-
Object o = queue.poll();
151-
if (o == null) {
152-
// nothing in queue
153-
requested.incrementAndGet();
154-
return;
155-
}
156-
if (capacity != null) { // it's bounded
157-
capacity.incrementAndGet();
158-
}
159-
on.accept(child, o);
160-
} else {
161-
// we hit the end ... so increment back to 0 again
162-
requested.incrementAndGet();
163-
return;
140+
long currCapacity;
141+
do {
142+
currCapacity = capacity.get();
143+
if (currCapacity <= 0) {
144+
if (saturated.compareAndSet(false, true)) {
145+
unsubscribe();
146+
child.onError(new MissingBackpressureException(
147+
"Overflowed buffer of "
148+
+ baseCapacity));
149+
if (onOverflow != null) {
150+
onOverflow.call();
164151
}
165152
}
153+
return false;
166154
}
167-
168-
} finally {
169-
wip.decrementAndGet();
170-
}
155+
// ensure no other thread stole our slot, or retry
156+
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
157+
return true;
158+
}
159+
protected Producer manager() {
160+
return manager;
171161
}
172162
}
173163
}

0 commit comments

Comments
 (0)