Skip to content

Commit 071aa4c

Browse files
author
Aaron Tull
committed
Adding backpressure to OnSubscribeRedo
1 parent a7953e6 commit 071aa4c

File tree

2 files changed

+52
-11
lines changed

2 files changed

+52
-11
lines changed

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,15 @@
3434
import static rx.Observable.create;
3535

3636
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.concurrent.atomic.AtomicLong;
38+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
39+
import java.util.concurrent.atomic.AtomicReference;
3740

3841
import rx.Notification;
3942
import rx.Observable;
4043
import rx.Observable.OnSubscribe;
4144
import rx.Observable.Operator;
45+
import rx.Producer;
4246
import rx.Scheduler;
4347
import rx.Subscriber;
4448
import rx.functions.Action0;
@@ -154,32 +158,40 @@ public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observa
154158
}
155159

156160
private Observable<T> source;
157-
private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f;
161+
private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> controlHandlerFunction;
158162
private boolean stopOnComplete;
159163
private boolean stopOnError;
160164
private final Scheduler scheduler;
161165
private final AtomicBoolean isLocked = new AtomicBoolean(true);
166+
private final AtomicBoolean isStarted = new AtomicBoolean(false);
167+
// incremented when requests are made, decremented when requests are fulfilled
168+
private final AtomicLong consumerCapacity = new AtomicLong(0l);
169+
private final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
162170

163171
private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
164172
Scheduler scheduler) {
165173
this.source = source;
166-
this.f = f;
174+
this.controlHandlerFunction = f;
167175
this.stopOnComplete = stopOnComplete;
168176
this.stopOnError = stopOnError;
169177
this.scheduler = scheduler;
170178
}
171179

172180
@Override
173181
public void call(final Subscriber<? super T> child) {
182+
isStarted.set(false);
183+
isLocked.set(true);
184+
consumerCapacity.set(0l);
185+
currentProducer.set(null);
186+
174187
final Scheduler.Worker inner = scheduler.createWorker();
175188
child.add(inner);
176189

177190
final CompositeSubscription sourceSubscriptions = new CompositeSubscription();
178191
child.add(sourceSubscriptions);
179-
192+
180193
final PublishSubject<Notification<?>> terminals = PublishSubject.create();
181194

182-
183195
final Action0 subscribeToSource = new Action0() {
184196
@Override
185197
public void call() {
@@ -198,8 +210,15 @@ public void onError(Throwable e) {
198210

199211
@Override
200212
public void onNext(T v) {
213+
consumerCapacity.decrementAndGet();
201214
child.onNext(v);
202215
}
216+
217+
@Override
218+
public void setProducer(Producer producer) {
219+
currentProducer.set(producer);
220+
producer.request(consumerCapacity.get());
221+
}
203222
};
204223
// new subscription each time so if it unsubscribes itself it does not prevent retries
205224
// by unsubscribing the child subscription
@@ -208,8 +227,10 @@ public void onNext(T v) {
208227
}
209228
};
210229

211-
final Observable<?> restarts = f.call(
212-
// lifting in a custom operator to kind of do a merge/map/filter thing.
230+
// the observable received by the control handler function will receive notifications of onCompleted in the case of 'repeat'
231+
// type operators or notifications of onError for 'retry' this is done by lifting in a custom operator to selectively divert
232+
// the retry/repeat relevant values to the control handler
233+
final Observable<?> restarts = controlHandlerFunction.call(
213234
terminals.lift(new Operator<Notification<?>, Notification<?>>() {
214235
@Override
215236
public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {
@@ -233,6 +254,11 @@ public void onNext(Notification<?> t) {
233254
filteredTerminals.onNext(t);
234255
}
235256
}
257+
258+
@Override
259+
public void setProducer(Producer producer) {
260+
producer.request(Long.MAX_VALUE);
261+
}
236262
};
237263
}
238264
}));
@@ -255,15 +281,31 @@ public void onError(Throwable e) {
255281
@Override
256282
public void onNext(Object t) {
257283
if (!isLocked.get() && !child.isUnsubscribed()) {
258-
// if (!child.isUnsubscribed()) {
259284
child.add(inner.schedule(subscribeToSource));
260285
}
261286
}
287+
288+
@Override
289+
public void setProducer(Producer producer) {
290+
producer.request(Long.MAX_VALUE);
291+
}
262292
});
263293
}
264294
}));
265-
if (!child.isUnsubscribed()) {
266-
child.add(inner.schedule(subscribeToSource));
267-
}
295+
296+
child.setProducer(new Producer() {
297+
298+
@Override
299+
public void request(long n) {
300+
if (isStarted.compareAndSet(false, true)) {
301+
consumerCapacity.set(n);
302+
if (!child.isUnsubscribed()) child.add(inner.schedule(subscribeToSource));
303+
} else if (currentProducer.get() != null) {
304+
consumerCapacity.getAndAdd(n);
305+
currentProducer.get().request(n);
306+
}
307+
}
308+
});
309+
268310
}
269311
}

rxjava-core/src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import rx.Observable.OnSubscribe;
3838
import rx.Notification;
3939
import rx.Observer;
40-
import rx.Scheduler.Worker;
4140
import rx.Subscriber;
4241
import rx.Subscription;
4342
import rx.functions.Action0;

0 commit comments

Comments
 (0)