Skip to content

Commit 99e8400

Browse files
committed
TakeLastTimed with backpressure support
1 parent b58860b commit 99e8400

File tree

3 files changed

+126
-124
lines changed

3 files changed

+126
-124
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java

Lines changed: 1 addition & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717

1818
import java.util.ArrayDeque;
1919
import java.util.Deque;
20-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2120

2221
import rx.Observable.Operator;
23-
import rx.Producer;
2422
import rx.Subscriber;
2523

2624
/**
@@ -43,7 +41,7 @@ public OperatorTakeLast(int count) {
4341
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
4442
final Deque<Object> deque = new ArrayDeque<Object>();
4543
final NotificationLite<T> notification = NotificationLite.instance();
46-
final QueueProducer<T> producer = new QueueProducer<T>(notification, deque, subscriber);
44+
final TakeLastQueueProducer<T> producer = new TakeLastQueueProducer<T>(notification, deque, subscriber);
4745
subscriber.setProducer(producer);
4846

4947
return new Subscriber<T>(subscriber) {
@@ -81,106 +79,4 @@ public void onNext(T value) {
8179
}
8280
};
8381
}
84-
85-
private static final class QueueProducer<T> implements Producer {
86-
87-
private final NotificationLite<T> notification;
88-
private final Deque<Object> deque;
89-
private final Subscriber<? super T> subscriber;
90-
private volatile boolean emittingStarted = false;
91-
92-
public QueueProducer(NotificationLite<T> n, Deque<Object> q, Subscriber<? super T> subscriber) {
93-
this.notification = n;
94-
this.deque = q;
95-
this.subscriber = subscriber;
96-
}
97-
98-
private volatile long requested = 0;
99-
@SuppressWarnings("rawtypes")
100-
private static final AtomicLongFieldUpdater<QueueProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(QueueProducer.class, "requested");
101-
102-
void startEmitting() {
103-
if (!emittingStarted) {
104-
emittingStarted = true;
105-
emit(0); // start emitting
106-
}
107-
}
108-
109-
@Override
110-
public void request(long n) {
111-
if (requested == Long.MAX_VALUE) {
112-
return;
113-
}
114-
long _c;
115-
if (n == Long.MAX_VALUE) {
116-
_c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE);
117-
} else {
118-
_c = REQUESTED_UPDATER.getAndAdd(this, n);
119-
}
120-
if (!emittingStarted) {
121-
// we haven't started yet, so record what was requested and return
122-
return;
123-
}
124-
emit(_c);
125-
}
126-
127-
void emit(long previousRequested) {
128-
if (requested == Long.MAX_VALUE) {
129-
// fast-path without backpressure
130-
if (previousRequested == 0) {
131-
try {
132-
for (Object value : deque) {
133-
notification.accept(subscriber, value);
134-
}
135-
} catch (Throwable e) {
136-
subscriber.onError(e);
137-
} finally {
138-
deque.clear();
139-
}
140-
} else {
141-
// backpressure path will handle Long.MAX_VALUE and emit the rest events.
142-
}
143-
} else {
144-
// backpressure is requested
145-
if (previousRequested == 0) {
146-
while (true) {
147-
/*
148-
* This complicated logic is done to avoid touching the volatile `requested` value
149-
* during the loop itself. If it is touched during the loop the performance is impacted significantly.
150-
*/
151-
long numToEmit = requested;
152-
int emitted = 0;
153-
Object o;
154-
while (--numToEmit >= 0 && (o = deque.poll()) != null) {
155-
if (subscriber.isUnsubscribed()) {
156-
return;
157-
}
158-
if (notification.accept(subscriber, o)) {
159-
// terminal event
160-
return;
161-
} else {
162-
emitted++;
163-
}
164-
}
165-
for (;;) {
166-
long oldRequested = requested;
167-
long newRequested = oldRequested - emitted;
168-
if (oldRequested == Long.MAX_VALUE) {
169-
// became unbounded during the loop
170-
// continue the outer loop to emit the rest events.
171-
break;
172-
}
173-
if (REQUESTED_UPDATER.compareAndSet(this, oldRequested, newRequested)) {
174-
if (newRequested == 0) {
175-
// we're done emitting the number requested so return
176-
return;
177-
}
178-
break;
179-
}
180-
}
181-
}
182-
}
183-
}
184-
}
185-
}
18682
}

rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLastTimed.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import rx.Observable.Operator;
1919
import rx.Scheduler;
2020
import rx.Subscriber;
21-
import rx.schedulers.Timestamped;
2221

2322
import java.util.ArrayDeque;
2423
import java.util.Deque;
@@ -52,19 +51,24 @@ public OperatorTakeLastTimed(int count, long time, TimeUnit unit, Scheduler sche
5251

5352
@Override
5453
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
54+
final Deque<Object> buffer = new ArrayDeque<Object>();
55+
final Deque<Long> timestampBuffer = new ArrayDeque<Long>();
56+
final NotificationLite<T> notification = NotificationLite.instance();
57+
final TakeLastQueueProducer<T> producer = new TakeLastQueueProducer<T>(notification, buffer, subscriber);
58+
subscriber.setProducer(producer);
5559
return new Subscriber<T>(subscriber) {
5660

57-
private final Deque<Timestamped<T>> buffer = new ArrayDeque<Timestamped<T>>();
58-
5961
protected void runEvictionPolicy(long now) {
6062
// trim size
6163
while (count >= 0 && buffer.size() > count) {
64+
timestampBuffer.pollFirst();
6265
buffer.pollFirst();
6366
}
6467
// remove old entries
6568
while (!buffer.isEmpty()) {
66-
Timestamped<T> v = buffer.peekFirst();
67-
if (v.getTimestampMillis() < now - ageMillis) {
69+
long v = timestampBuffer.peekFirst();
70+
if (v < now - ageMillis) {
71+
timestampBuffer.pollFirst();
6872
buffer.pollFirst();
6973
} else {
7074
break;
@@ -82,33 +86,25 @@ public void onStart() {
8286
@Override
8387
public void onNext(T args) {
8488
long t = scheduler.now();
85-
buffer.add(new Timestamped<T>(t, args));
89+
timestampBuffer.add(t);
90+
buffer.add(notification.next(args));
8691
runEvictionPolicy(t);
8792
}
8893

8994
@Override
9095
public void onError(Throwable e) {
96+
timestampBuffer.clear();
9197
buffer.clear();
9298
subscriber.onError(e);
9399
}
94100

95101
@Override
96102
public void onCompleted() {
97103
runEvictionPolicy(scheduler.now());
98-
try {
99-
// TODO this can be made to support backpressure
100-
for (Timestamped<T> v : buffer) {
101-
subscriber.onNext(v.getValue());
102-
103-
}
104-
} catch (Throwable e) {
105-
onError(e);
106-
return;
107-
}
108-
buffer.clear();
109-
subscriber.onCompleted();
104+
timestampBuffer.clear();
105+
buffer.offer(notification.completed());
106+
producer.startEmitting();
110107
}
111108
};
112109
}
113-
114110
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package rx.internal.operators;
2+
3+
4+
import rx.Producer;
5+
import rx.Subscriber;
6+
7+
import java.util.Deque;
8+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
9+
10+
final class TakeLastQueueProducer<T> implements Producer {
11+
12+
private final NotificationLite<T> notification;
13+
private final Deque<Object> deque;
14+
private final Subscriber<? super T> subscriber;
15+
private volatile boolean emittingStarted = false;
16+
17+
public TakeLastQueueProducer(NotificationLite<T> n, Deque<Object> q, Subscriber<? super T> subscriber) {
18+
this.notification = n;
19+
this.deque = q;
20+
this.subscriber = subscriber;
21+
}
22+
23+
private volatile long requested = 0;
24+
@SuppressWarnings("rawtypes")
25+
private static final AtomicLongFieldUpdater<TakeLastQueueProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(TakeLastQueueProducer.class, "requested");
26+
27+
void startEmitting() {
28+
if (!emittingStarted) {
29+
emittingStarted = true;
30+
emit(0); // start emitting
31+
}
32+
}
33+
34+
@Override
35+
public void request(long n) {
36+
if (requested == Long.MAX_VALUE) {
37+
return;
38+
}
39+
long _c;
40+
if (n == Long.MAX_VALUE) {
41+
_c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE);
42+
} else {
43+
_c = REQUESTED_UPDATER.getAndAdd(this, n);
44+
}
45+
if (!emittingStarted) {
46+
// we haven't started yet, so record what was requested and return
47+
return;
48+
}
49+
emit(_c);
50+
}
51+
52+
void emit(long previousRequested) {
53+
if (requested == Long.MAX_VALUE) {
54+
// fast-path without backpressure
55+
if (previousRequested == 0) {
56+
try {
57+
for (Object value : deque) {
58+
notification.accept(subscriber, value);
59+
}
60+
} catch (Throwable e) {
61+
subscriber.onError(e);
62+
} finally {
63+
deque.clear();
64+
}
65+
} else {
66+
// backpressure path will handle Long.MAX_VALUE and emit the rest events.
67+
}
68+
} else {
69+
// backpressure is requested
70+
if (previousRequested == 0) {
71+
while (true) {
72+
/*
73+
* This complicated logic is done to avoid touching the volatile `requested` value
74+
* during the loop itself. If it is touched during the loop the performance is impacted significantly.
75+
*/
76+
long numToEmit = requested;
77+
int emitted = 0;
78+
Object o;
79+
while (--numToEmit >= 0 && (o = deque.poll()) != null) {
80+
if (subscriber.isUnsubscribed()) {
81+
return;
82+
}
83+
if (notification.accept(subscriber, o)) {
84+
// terminal event
85+
return;
86+
} else {
87+
emitted++;
88+
}
89+
}
90+
for (; ; ) {
91+
long oldRequested = requested;
92+
long newRequested = oldRequested - emitted;
93+
if (oldRequested == Long.MAX_VALUE) {
94+
// became unbounded during the loop
95+
// continue the outer loop to emit the rest events.
96+
break;
97+
}
98+
if (REQUESTED_UPDATER.compareAndSet(this, oldRequested, newRequested)) {
99+
if (newRequested == 0) {
100+
// we're done emitting the number requested so return
101+
return;
102+
}
103+
break;
104+
}
105+
}
106+
}
107+
}
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)