Skip to content

Commit 21578d5

Browse files
committed
Operator Window and other changes
1 parent d62ddb7 commit 21578d5

File tree

10 files changed

+1336
-1305
lines changed

10 files changed

+1336
-1305
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7238,7 +7238,7 @@ public final Observable<T> unsubscribeOn(Scheduler scheduler) {
72387238
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
72397239
*/
72407240
public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector) {
7241-
return create(OperationWindow.window(this, closingSelector));
7241+
return lift(new OperatorWindowWithObservable<T, TClosing>(closingSelector));
72427242
}
72437243

72447244
/**
@@ -7255,7 +7255,7 @@ public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observa
72557255
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
72567256
*/
72577257
public final Observable<Observable<T>> window(int count) {
7258-
return create(OperationWindow.window(this, count));
7258+
return lift(new OperatorWindowWithSize<T>(count, count));
72597259
}
72607260

72617261
/**
@@ -7275,7 +7275,7 @@ public final Observable<Observable<T>> window(int count) {
72757275
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
72767276
*/
72777277
public final Observable<Observable<T>> window(int count, int skip) {
7278-
return create(OperationWindow.window(this, count, skip));
7278+
return lift(new OperatorWindowWithSize<T>(count, skip));
72797279
}
72807280

72817281
/**
@@ -7297,7 +7297,7 @@ public final Observable<Observable<T>> window(int count, int skip) {
72977297
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
72987298
*/
72997299
public final Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit) {
7300-
return create(OperationWindow.window(this, timespan, timeshift, unit));
7300+
return lift(new OperatorWindowWithTime<T>(timespan, timeshift, unit, Integer.MAX_VALUE, Schedulers.computation()));
73017301
}
73027302

73037303
/**
@@ -7321,7 +7321,7 @@ public final Observable<Observable<T>> window(long timespan, long timeshift, Tim
73217321
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
73227322
*/
73237323
public final Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
7324-
return create(OperationWindow.window(this, timespan, timeshift, unit, scheduler));
7324+
return lift(new OperatorWindowWithTime<T>(timespan, timeshift, unit, Integer.MAX_VALUE, scheduler));
73257325
}
73267326

73277327
/**
@@ -7342,7 +7342,7 @@ public final Observable<Observable<T>> window(long timespan, long timeshift, Tim
73427342
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
73437343
*/
73447344
public final Observable<Observable<T>> window(long timespan, TimeUnit unit) {
7345-
return create(OperationWindow.window(this, timespan, unit));
7345+
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, Integer.MAX_VALUE, Schedulers.computation()));
73467346
}
73477347

73487348
/**
@@ -7367,7 +7367,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit) {
73677367
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
73687368
*/
73697369
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int count) {
7370-
return create(OperationWindow.window(this, timespan, unit, count));
7370+
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, count, Schedulers.computation()));
73717371
}
73727372

73737373
/**
@@ -7394,7 +7394,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int
73947394
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
73957395
*/
73967396
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
7397-
return create(OperationWindow.window(this, timespan, unit, count, scheduler));
7397+
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, count, scheduler));
73987398
}
73997399

74007400
/**
@@ -7417,7 +7417,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit, int
74177417
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
74187418
*/
74197419
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler) {
7420-
return create(OperationWindow.window(this, timespan, unit, scheduler));
7420+
return lift(new OperatorWindowWithTime<T>(timespan, timespan, unit, Integer.MAX_VALUE, scheduler));
74217421
}
74227422

74237423
/**
@@ -7437,7 +7437,7 @@ public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Sche
74377437
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-window">RxJava Wiki: window()</a>
74387438
*/
74397439
public final <TOpening, TClosing> Observable<Observable<T>> window(Observable<? extends TOpening> windowOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> closingSelector) {
7440-
return create(OperationWindow.window(this, windowOpenings, closingSelector));
7440+
return lift(new OperatorWindowWithStartEndObservable<T, TOpening, TClosing>(windowOpenings, closingSelector));
74417441
}
74427442

74437443
/**
@@ -7455,7 +7455,7 @@ public final <TOpening, TClosing> Observable<Observable<T>> window(Observable<?
74557455
* where the boundary of each window is determined by the items emitted from the {@code boundary} Observable
74567456
*/
74577457
public final <U> Observable<Observable<T>> window(Observable<U> boundary) {
7458-
return create(OperationWindow.window(this, boundary));
7458+
return lift(new OperatorWindowWithObservable<T, U>(boundary));
74597459
}
74607460

74617461
/**

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 137 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.LinkedHashSet;
19+
import java.util.Set;
1820
import java.util.concurrent.ConcurrentLinkedQueue;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.concurrent.atomic.AtomicReference;
2023

2124
import rx.Observable;
2225
import rx.Observer;
2326
import rx.Subscriber;
27+
import rx.functions.Action0;
28+
import rx.observers.Subscribers;
29+
import rx.subscriptions.Subscriptions;
2430

2531
/**
2632
* A solution to the "time gap" problem that occurs with `groupBy` and `pivot` => https://github.com/Netflix/RxJava/issues/844
@@ -43,44 +49,151 @@
4349
public class BufferUntilSubscriber<T> extends Observable<T> implements Observer<T> {
4450

4551
public static <T> BufferUntilSubscriber<T> create() {
46-
return new BufferUntilSubscriber<T>(new AtomicReference<Observer<? super T>>(new BufferedObserver<T>()));
52+
State<T> state = new State<T>();
53+
return new BufferUntilSubscriber<T>(state);
4754
}
4855

49-
private final AtomicReference<Observer<? super T>> observerRef;
56+
/** The common state. */
57+
static final class State<T> {
58+
/** Lite notifications of type T. */
59+
final NotificationLite<T> nl = NotificationLite.instance();
60+
/** The first observer or the one which buffers until the first arrives. */
61+
final AtomicReference<Observer<? super T>> observerRef = new AtomicReference<Observer<? super T>>(new BufferedObserver<T>());
62+
/** How many subscribers. */
63+
final AtomicBoolean first = new AtomicBoolean();
64+
/** The rest of the subscribers without buffering. Guarded by this. */
65+
final Set<Subscriber<? super T>> subscribers = new LinkedHashSet<Subscriber<? super T>>();
66+
/** Guarded by this. */
67+
boolean done;
68+
/** Guarded by this. */
69+
Throwable exception;
70+
}
71+
72+
static final class OnSubscribeAction<T> implements OnSubscribe<T> {
73+
final State<T> state;
5074

51-
private BufferUntilSubscriber(final AtomicReference<Observer<? super T>> observerRef) {
52-
super(new OnSubscribe<T>() {
75+
public OnSubscribeAction(State<T> state) {
76+
this.state = state;
77+
}
5378

54-
@Override
55-
public void call(Subscriber<? super T> s) {
79+
@Override
80+
public void call(final Subscriber<? super T> s) {
81+
if (state.first.compareAndSet(false, true)) {
5682
// drain queued notifications before subscription
5783
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
58-
BufferedObserver<T> buffered = (BufferedObserver<T>) observerRef.get();
59-
Object o = null;
84+
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef.get();
85+
Object o;
6086
while ((o = buffered.buffer.poll()) != null) {
61-
emit(s, o);
87+
state.nl.accept(s, o);
6288
}
6389
// register real observer for pass-thru ... and drain any further events received on first notification
64-
observerRef.set(new PassThruObserver<T>(s, buffered.buffer, observerRef));
65-
}
90+
state.observerRef.set(new PassThruObserver<T>(s, buffered.buffer, state.observerRef));
91+
s.add(Subscriptions.create(new Action0() {
92+
@Override
93+
public void call() {
94+
state.observerRef.set(Subscribers.empty());
95+
}
96+
}));
97+
} else {
98+
Throwable e = null;
99+
boolean done;
100+
synchronized (state) {
101+
done = state.done;
102+
if (!done) {
103+
state.subscribers.add(s);
104+
} else {
105+
e = state.exception;
106+
}
107+
}
108+
if (done) {
109+
if (e != null) {
110+
s.onError(e);
111+
} else {
112+
s.onCompleted();
113+
}
114+
return;
115+
}
116+
s.add(Subscriptions.create(new Action0() {
66117

67-
});
68-
this.observerRef = observerRef;
118+
@Override
119+
public void call() {
120+
synchronized (state) {
121+
state.subscribers.remove(s);
122+
}
123+
}
124+
}));
125+
}
126+
}
127+
128+
}
129+
final State<T> state;
130+
131+
private BufferUntilSubscriber(State<T> state) {
132+
super(new OnSubscribeAction<T>(state));
133+
this.state = state;
69134
}
70135

71136
@Override
72137
public void onCompleted() {
73-
observerRef.get().onCompleted();
138+
state.observerRef.get().onCompleted();
139+
// notify the rest
140+
Subscriber<?>[] list;
141+
synchronized (state) {
142+
if (!state.done) {
143+
return;
144+
}
145+
state.done = true;
146+
if (state.subscribers.isEmpty()) {
147+
return;
148+
}
149+
list = state.subscribers.toArray(new Subscriber<?>[state.subscribers.size()]);
150+
state.subscribers.clear();
151+
}
152+
for (Subscriber<?> s : list) {
153+
s.onCompleted();
154+
}
74155
}
75156

76157
@Override
77158
public void onError(Throwable e) {
78-
observerRef.get().onError(e);
159+
state.observerRef.get().onError(e);
160+
// notify the rest
161+
Subscriber<?>[] list;
162+
synchronized (state) {
163+
if (!state.done) {
164+
return;
165+
}
166+
state.done = true;
167+
state.exception = e;
168+
if (state.subscribers.isEmpty()) {
169+
return;
170+
}
171+
list = state.subscribers.toArray(new Subscriber<?>[state.subscribers.size()]);
172+
state.subscribers.clear();
173+
}
174+
for (Subscriber<?> s : list) {
175+
s.onError(e);
176+
}
79177
}
80178

81179
@Override
180+
@SuppressWarnings({ "unchecked", "rawtypes" })
82181
public void onNext(T t) {
83-
observerRef.get().onNext(t);
182+
state.observerRef.get().onNext(t);
183+
// notify the rest
184+
Subscriber[] list;
185+
synchronized (state) {
186+
if (state.done) {
187+
return;
188+
}
189+
if (state.subscribers.isEmpty()) {
190+
return;
191+
}
192+
list = state.subscribers.toArray(new Subscriber[state.subscribers.size()]);
193+
}
194+
for (Subscriber s : list) {
195+
s.onNext(t);
196+
}
84197
}
85198

86199
/**
@@ -97,6 +210,7 @@ private static class PassThruObserver<T> extends Subscriber<T> {
97210
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
98211
private final ConcurrentLinkedQueue<Object> buffer;
99212
private final AtomicReference<Observer<? super T>> observerRef;
213+
private final NotificationLite<T> nl = NotificationLite.instance();
100214

101215
PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer, AtomicReference<Observer<? super T>> observerRef) {
102216
this.actual = actual;
@@ -123,67 +237,34 @@ public void onNext(T t) {
123237
}
124238

125239
private void drainIfNeededAndSwitchToActual() {
126-
Object o = null;
240+
Object o;
127241
while ((o = buffer.poll()) != null) {
128-
emit(this, o);
242+
nl.accept(this, o);
129243
}
130244
// now we can safely change over to the actual and get rid of the pass-thru
131-
observerRef.set(actual);
245+
observerRef.compareAndSet(this, actual);
132246
}
133247

134248
}
135249

136250
private static class BufferedObserver<T> extends Subscriber<T> {
137251
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
252+
private final NotificationLite<T> nl = NotificationLite.instance();
138253

139254
@Override
140255
public void onCompleted() {
141-
buffer.add(COMPLETE_SENTINEL);
256+
buffer.add(nl.completed());
142257
}
143258

144259
@Override
145260
public void onError(Throwable e) {
146-
buffer.add(new ErrorSentinel(e));
261+
buffer.add(nl.error(e));
147262
}
148263

149264
@Override
150265
public void onNext(T t) {
151-
if (t == null) {
152-
buffer.add(NULL_SENTINEL);
153-
} else {
154-
buffer.add(t);
155-
}
156-
}
157-
158-
}
159-
160-
private final static <T> void emit(Observer<T> s, Object v) {
161-
if (v instanceof Sentinel) {
162-
if (v == NULL_SENTINEL) {
163-
s.onNext(null);
164-
} else if (v == COMPLETE_SENTINEL) {
165-
s.onCompleted();
166-
} else if (v instanceof ErrorSentinel) {
167-
s.onError(((ErrorSentinel) v).e);
168-
}
169-
} else {
170-
s.onNext((T) v);
266+
buffer.add(nl.next(t));
171267
}
172-
}
173-
174-
private static class Sentinel {
175268

176269
}
177-
178-
private static Sentinel NULL_SENTINEL = new Sentinel();
179-
private static Sentinel COMPLETE_SENTINEL = new Sentinel();
180-
181-
private static class ErrorSentinel extends Sentinel {
182-
final Throwable e;
183-
184-
ErrorSentinel(Throwable e) {
185-
this.e = e;
186-
}
187-
}
188-
189270
}

0 commit comments

Comments
 (0)