Skip to content

Commit 4bb777c

Browse files
akarnokdakarnokd
authored andcommitted
BehaviorSubject subscription timegap fix
1 parent cb60dd4 commit 4bb777c

File tree

3 files changed

+241
-23
lines changed

3 files changed

+241
-23
lines changed

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 173 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
*/
1616
package rx.subjects;
1717

18+
import java.util.ArrayList;
1819
import java.util.Collection;
20+
import java.util.List;
1921
import java.util.concurrent.atomic.AtomicReference;
2022

2123
import rx.Notification;
2224
import rx.Observer;
25+
import rx.Subscriber;
2326
import rx.functions.Action0;
2427
import rx.functions.Action1;
2528
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
@@ -66,65 +69,200 @@
6669
* @param <T>
6770
*/
6871
public final class BehaviorSubject<T> extends Subject<T, T> {
69-
72+
/**
73+
* Create a {@link BehaviorSubject} without a default value.
74+
* @param <T> the value type
75+
* @return the constructed {@link BehaviorSubject}
76+
*/
77+
public static <T> BehaviorSubject<T> create() {
78+
return create(null, false);
79+
}
7080
/**
7181
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it.
7282
*
83+
* @param <T> the value type
7384
* @param defaultValue
7485
* the value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events
7586
* @return the constructed {@link BehaviorSubject}
7687
*/
7788
public static <T> BehaviorSubject<T> create(T defaultValue) {
89+
return create(defaultValue, true);
90+
}
91+
private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
7892
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
7993
// set a default value so subscriptions will immediately receive this until a new notification is received
80-
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(Notification.createOnNext(defaultValue));
94+
final State<T> state = new State<T>();
95+
if (hasDefault) {
96+
state.lastNotification.set(Notification.createOnNext(defaultValue));
97+
}
8198

82-
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
99+
final OnSubscribe<T> onSubscribeBase = subscriptionManager.getOnSubscribeFunc(
83100
/**
84101
* This function executes at beginning of subscription.
85102
*
86103
* This will always run, even if Subject is in terminal state.
87104
*/
88105
new Action1<SubjectObserver<? super T>>() {
89-
90106
@Override
91107
public void call(SubjectObserver<? super T> o) {
92108
/*
93109
* When we subscribe we always emit the latest value to the observer.
94110
*
95111
* Here we only emit if it's an onNext as terminal states are handled in the next function.
96112
*/
97-
Notification<T> n = lastNotification.get();
98-
if (n.isOnNext()) {
99-
n.accept(o);
100-
}
113+
state.addPending(o);
101114
}
102115
},
103116
/**
104117
* This function executes if the Subject is terminated before subscription occurs.
105118
*/
106119
new Action1<SubjectObserver<? super T>>() {
107-
108120
@Override
109121
public void call(SubjectObserver<? super T> o) {
110122
/*
111123
* If we are already terminated, or termination happens while trying to subscribe
112124
* this will be invoked and we emit whatever the last terminal value was.
113125
*/
114-
lastNotification.get().accept(o);
126+
state.removePending(o);
115127
}
116-
}, null);
128+
}, new Action1<SubjectObserver<? super T>>() {
129+
@Override
130+
public void call(SubjectObserver<? super T> o) {
131+
state.removePending(o);
132+
}
133+
134+
});
135+
OnSubscribe<T> onSubscribe = new OnSubscribe<T>() {
117136

118-
return new BehaviorSubject<T>(onSubscribe, subscriptionManager, lastNotification);
137+
@Override
138+
public void call(Subscriber<? super T> t1) {
139+
onSubscribeBase.call(t1);
140+
state.removePendingSubscriber(t1);
141+
}
142+
};
143+
return new BehaviorSubject<T>(onSubscribe, subscriptionManager, state);
119144
}
120145

146+
static final class State<T> {
147+
final AtomicReference<Notification<T>> lastNotification;
148+
/** Guarded by this. */
149+
List<Object> pendingSubscriptions;
150+
public State() {
151+
this.lastNotification = new AtomicReference<Notification<T>>();
152+
}
153+
public void addPending(SubjectObserver<? super T> subscriber) {
154+
synchronized (this) {
155+
if (pendingSubscriptions == null) {
156+
pendingSubscriptions = new ArrayList<Object>(4);
157+
}
158+
pendingSubscriptions.add(subscriber);
159+
List<Notification<T>> list = new ArrayList<Notification<T>>(4);
160+
list.add(lastNotification.get());
161+
pendingSubscriptions.add(list);
162+
}
163+
}
164+
public void bufferValue(Notification<T> value) {
165+
synchronized (this) {
166+
if (pendingSubscriptions == null) {
167+
return;
168+
}
169+
for (int i = 1; i < pendingSubscriptions.size(); i += 2) {
170+
@SuppressWarnings("unchecked")
171+
List<Notification<T>> list = (List<Notification<T>>)pendingSubscriptions.get(i);
172+
list.add(value);
173+
}
174+
}
175+
}
176+
public void removePending(SubjectObserver<? super T> subscriber) {
177+
List<Notification<T>> toCatchUp = null;
178+
synchronized (this) {
179+
if (pendingSubscriptions == null) {
180+
return;
181+
}
182+
int idx = pendingSubscriptions.indexOf(subscriber);
183+
if (idx >= 0) {
184+
pendingSubscriptions.remove(idx);
185+
@SuppressWarnings("unchecked")
186+
List<Notification<T>> list = (List<Notification<T>>)pendingSubscriptions.remove(idx);
187+
toCatchUp = list;
188+
subscriber.caughtUp = true;
189+
if (pendingSubscriptions.isEmpty()) {
190+
pendingSubscriptions = null;
191+
}
192+
}
193+
}
194+
if (toCatchUp != null) {
195+
for (Notification<T> n : toCatchUp) {
196+
if (n != null) {
197+
n.accept(subscriber);
198+
}
199+
}
200+
}
201+
}
202+
public void removePendingSubscriber(Subscriber<? super T> subscriber) {
203+
List<Notification<T>> toCatchUp = null;
204+
synchronized (this) {
205+
if (pendingSubscriptions == null) {
206+
return;
207+
}
208+
for (int i = 0; i < pendingSubscriptions.size(); i += 2) {
209+
@SuppressWarnings("unchecked")
210+
SubjectObserver<? super T> so = (SubjectObserver<? super T>)pendingSubscriptions.get(i);
211+
if (so.getActual() == subscriber && !so.caughtUp) {
212+
@SuppressWarnings("unchecked")
213+
List<Notification<T>> list = (List<Notification<T>>)pendingSubscriptions.get(i + 1);
214+
toCatchUp = list;
215+
so.caughtUp = true;
216+
pendingSubscriptions.remove(i);
217+
pendingSubscriptions.remove(i);
218+
if (pendingSubscriptions.isEmpty()) {
219+
pendingSubscriptions = null;
220+
}
221+
break;
222+
}
223+
}
224+
}
225+
if (toCatchUp != null) {
226+
for (Notification<T> n : toCatchUp) {
227+
if (n != null) {
228+
n.accept(subscriber);
229+
}
230+
}
231+
}
232+
}
233+
public void replayAllPending() {
234+
List<Object> localPending;
235+
synchronized (this) {
236+
localPending = pendingSubscriptions;
237+
pendingSubscriptions = null;
238+
}
239+
if (localPending != null) {
240+
for (int i = 0; i < localPending.size(); i += 2) {
241+
@SuppressWarnings("unchecked")
242+
SubjectObserver<? super T> so = (SubjectObserver<? super T>)localPending.get(i);
243+
if (!so.caughtUp) {
244+
@SuppressWarnings("unchecked")
245+
List<Notification<T>> list = (List<Notification<T>>)localPending.get(i + 1);
246+
for (Notification<T> v : list) {
247+
if (v != null) {
248+
v.accept(so);
249+
}
250+
}
251+
so.caughtUp = true;
252+
}
253+
}
254+
}
255+
}
256+
}
257+
258+
private final State<T> state;
121259
private final SubjectSubscriptionManager<T> subscriptionManager;
122-
final AtomicReference<Notification<T>> lastNotification;
123260

124-
protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
261+
protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager,
262+
State<T> state) {
125263
super(onSubscribe);
126264
this.subscriptionManager = subscriptionManager;
127-
this.lastNotification = lastNotification;
265+
this.state = state;
128266
}
129267

130268
@Override
@@ -133,10 +271,13 @@ public void onCompleted() {
133271

134272
@Override
135273
public void call() {
136-
lastNotification.set(Notification.<T> createOnCompleted());
274+
final Notification<T> ne = Notification.<T>createOnCompleted();
275+
state.bufferValue(ne);
276+
state.lastNotification.set(ne);
137277
}
138278
});
139279
if (observers != null) {
280+
state.replayAllPending();
140281
for (Observer<? super T> o : observers) {
141282
o.onCompleted();
142283
}
@@ -149,10 +290,13 @@ public void onError(final Throwable e) {
149290

150291
@Override
151292
public void call() {
152-
lastNotification.set(Notification.<T> createOnError(e));
293+
final Notification<T> ne = Notification.<T>createOnError(e);
294+
state.bufferValue(ne);
295+
state.lastNotification.set(ne);
153296
}
154297
});
155298
if (observers != null) {
299+
state.replayAllPending();
156300
for (Observer<? super T> o : observers) {
157301
o.onError(e);
158302
}
@@ -163,12 +307,19 @@ public void call() {
163307
public void onNext(T v) {
164308
// do not overwrite a terminal notification
165309
// so new subscribers can get them
166-
if (lastNotification.get().isOnNext()) {
167-
lastNotification.set(Notification.createOnNext(v));
168-
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
169-
o.onNext(v);
310+
Notification<T> last = state.lastNotification.get();
311+
if (last == null || last.isOnNext()) {
312+
Notification<T> n = Notification.createOnNext(v);
313+
state.bufferValue(n);
314+
state.lastNotification.set(n);
315+
316+
for (SubjectObserver<? super T> o : subscriptionManager.rawSnapshot()) {
317+
if (o.caughtUp) {
318+
o.onNext(v);
319+
} else {
320+
state.removePending(o);
321+
}
170322
}
171323
}
172324
}
173-
174325
}

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,9 @@ public void onError(Throwable e) {
260260
public void onNext(T v) {
261261
this.actual.onNext(v);
262262
}
263-
263+
Observer<? super T> getActual() {
264+
return actual;
265+
}
264266
}
265267

266268
}

rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,4 +285,69 @@ public void onCompleted() {
285285
verify(o, never()).onError(any(Throwable.class));
286286
}
287287
}
288+
@Test
289+
public void testStartEmpty() {
290+
BehaviorSubject<Integer> source = BehaviorSubject.create();
291+
@SuppressWarnings("unchecked")
292+
final Observer<Object> o = mock(Observer.class);
293+
InOrder inOrder = inOrder(o);
294+
295+
source.subscribe(o);
296+
297+
inOrder.verify(o, never()).onNext(any());
298+
inOrder.verify(o, never()).onCompleted();
299+
300+
source.onNext(1);
301+
302+
source.onCompleted();
303+
304+
source.onNext(2);
305+
306+
inOrder.verify(o).onNext(1);
307+
inOrder.verify(o).onCompleted();
308+
inOrder.verifyNoMoreInteractions();
309+
310+
verify(o, never()).onError(any(Throwable.class));
311+
312+
}
313+
@Test
314+
public void testStartEmptyThenAddOne() {
315+
BehaviorSubject<Integer> source = BehaviorSubject.create();
316+
@SuppressWarnings("unchecked")
317+
final Observer<Object> o = mock(Observer.class);
318+
InOrder inOrder = inOrder(o);
319+
320+
source.onNext(1);
321+
322+
source.subscribe(o);
323+
324+
inOrder.verify(o).onNext(1);
325+
326+
source.onCompleted();
327+
328+
source.onNext(2);
329+
330+
inOrder.verify(o).onCompleted();
331+
inOrder.verifyNoMoreInteractions();
332+
333+
verify(o, never()).onError(any(Throwable.class));
334+
335+
}
336+
@Test
337+
public void testStartEmptyCompleteWithOne() {
338+
BehaviorSubject<Integer> source = BehaviorSubject.create();
339+
@SuppressWarnings("unchecked")
340+
final Observer<Object> o = mock(Observer.class);
341+
342+
source.onNext(1);
343+
source.onCompleted();
344+
345+
source.onNext(2);
346+
347+
source.subscribe(o);
348+
349+
verify(o).onCompleted();
350+
verify(o, never()).onError(any(Throwable.class));
351+
verify(o, never()).onNext(any());
352+
}
288353
}

0 commit comments

Comments
 (0)