Skip to content

Commit f6fd5de

Browse files
akarnokdakarnokd
authored andcommitted
Factored out the backpressure management into an experimental class and
reimplemented Buffer and Block strategies with it.
1 parent c38a780 commit f6fd5de

File tree

3 files changed

+351
-207
lines changed

3 files changed

+351
-207
lines changed

src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java

Lines changed: 26 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import java.util.concurrent.BlockingQueue;
2121

2222
import rx.Observable.Operator;
23-
import rx.Producer;
2423
import rx.Subscriber;
24+
import rx.internal.util.BackpressureDrainManager;
2525

2626
/**
2727
* Operator that blocks the producer thread in case a backpressure is needed.
@@ -38,45 +38,25 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
3838
return s;
3939
}
4040

41-
static final class BlockingSubscriber<T> extends Subscriber<T> {
41+
static final class BlockingSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback {
4242
final NotificationLite<T> nl = NotificationLite.instance();
4343
final BlockingQueue<Object> queue;
4444
final Subscriber<? super T> child;
45-
/** Guarded by this. */
46-
long requestedCount;
47-
/** Guarded by this. */
48-
boolean emitting;
49-
volatile boolean terminated;
50-
/** Set before terminated, read after terminated. */
51-
Throwable exception;
45+
final BackpressureDrainManager manager;
5246
public BlockingSubscriber(int max, Subscriber<? super T> child) {
5347
this.queue = new ArrayBlockingQueue<Object>(max);
5448
this.child = child;
49+
this.manager = new BackpressureDrainManager(this);
5550
}
5651
void init() {
5752
child.add(this);
58-
child.setProducer(new Producer() {
59-
@Override
60-
public void request(long n) {
61-
if (n == 0) {
62-
return;
63-
}
64-
synchronized (BlockingSubscriber.this) {
65-
if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) {
66-
requestedCount = Long.MAX_VALUE;
67-
} else {
68-
requestedCount += n;
69-
}
70-
}
71-
drain();
72-
}
73-
});
53+
child.setProducer(manager);
7454
}
7555
@Override
7656
public void onNext(T t) {
7757
try {
7858
queue.put(nl.next(t));
79-
drain();
59+
manager.drain();
8060
} catch (InterruptedException ex) {
8161
if (!isUnsubscribed()) {
8262
onError(ex);
@@ -85,91 +65,31 @@ public void onNext(T t) {
8565
}
8666
@Override
8767
public void onError(Throwable e) {
88-
if (!terminated) {
89-
exception = e;
90-
terminated = true;
91-
drain();
92-
}
68+
manager.terminateAndDrain(e);
9369
}
9470
@Override
9571
public void onCompleted() {
96-
terminated = true;
97-
drain();
72+
manager.terminateAndDrain();
9873
}
99-
void drain() {
100-
long n;
101-
boolean term;
102-
synchronized (this) {
103-
if (emitting) {
104-
return;
105-
}
106-
emitting = true;
107-
n = requestedCount;
108-
term = terminated;
109-
}
110-
boolean skipFinal = false;
111-
try {
112-
Subscriber<? super T> child = this.child;
113-
BlockingQueue<Object> queue = this.queue;
114-
while (true) {
115-
int emitted = 0;
116-
while (n > 0 || term) {
117-
Object o;
118-
if (term) {
119-
o = queue.peek();
120-
if (o == null) {
121-
Throwable e = exception;
122-
if (e != null) {
123-
child.onError(e);
124-
} else {
125-
child.onCompleted();
126-
}
127-
skipFinal = true;
128-
return;
129-
}
130-
if (n == 0) {
131-
break;
132-
}
133-
}
134-
o = queue.poll();
135-
if (o == null) {
136-
break;
137-
} else {
138-
child.onNext(nl.getValue(o));
139-
n--;
140-
emitted++;
141-
}
142-
}
143-
synchronized (this) {
144-
term = terminated;
145-
boolean more = queue.peek() != null;
146-
// if no backpressure below
147-
if (requestedCount == Long.MAX_VALUE) {
148-
// no new data arrived since the last poll
149-
if (!more && !term) {
150-
skipFinal = true;
151-
emitting = false;
152-
return;
153-
}
154-
n = Long.MAX_VALUE;
155-
} else {
156-
requestedCount -= emitted;
157-
n = requestedCount;
158-
if ((n == 0 || !more) && (!term || more)) {
159-
skipFinal = true;
160-
emitting = false;
161-
return;
162-
}
163-
}
164-
}
165-
}
166-
} finally {
167-
if (!skipFinal) {
168-
synchronized (this) {
169-
emitting = false;
170-
}
171-
}
74+
@Override
75+
public boolean accept(Object value) {
76+
return nl.accept(child, value);
77+
}
78+
@Override
79+
public void complete(Throwable exception) {
80+
if (exception != null) {
81+
child.onError(exception);
82+
} else {
83+
child.onCompleted();
17284
}
17385
}
86+
@Override
87+
public Object peek() {
88+
return queue.peek();
89+
}
90+
@Override
91+
public Object poll() {
92+
return queue.poll();
93+
}
17494
}
17595
}

0 commit comments

Comments
 (0)