Skip to content

Commit 0aaf151

Browse files
committed
Created the initial implementation of BehaviorSubject
1 parent bafd440 commit 0aaf151

File tree

1 file changed

+257
-0
lines changed

1 file changed

+257
-0
lines changed
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subjects;
17+
18+
import static org.mockito.Matchers.any;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.times;
21+
import static org.mockito.Mockito.verify;
22+
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
26+
import org.junit.Test;
27+
import org.mockito.Mockito;
28+
29+
import rx.Observer;
30+
import rx.Subscription;
31+
import rx.util.AtomicObservableSubscription;
32+
import rx.util.SynchronizedObserver;
33+
import rx.util.functions.Action1;
34+
import rx.util.functions.Func0;
35+
import rx.util.functions.Func1;
36+
37+
/**
38+
* Subject that publishes the previous and all subsequent events to each {@link Observer} that subscribes.
39+
* <p>
40+
* Example usage:
41+
* <p>
42+
* <pre> {@code
43+
44+
// observer will receive all events.
45+
BehaviorSubject<Object> subject = BehaviorSubject.createWithDefaultValue("default");
46+
subject.subscribe(observer);
47+
subject.onNext("one");
48+
subject.onNext("two");
49+
subject.onNext("three");
50+
51+
// observer will receive the "one", "two" and "three" events.
52+
BehaviorSubject<Object> subject = BehaviorSubject.createWithDefaultValue("default");
53+
subject.onNext("one");
54+
subject.subscribe(observer);
55+
subject.onNext("two");
56+
subject.onNext("three");
57+
58+
} </pre>
59+
*
60+
* @param <T>
61+
*/
62+
public class BehaviorSubject<T> extends Subject<T, T> {
63+
64+
public static <T> BehaviorSubject<T> createWithDefaultValue(T defaultValue) {
65+
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
66+
67+
final AtomicReference<T> currentValue = new AtomicReference<T>(defaultValue);
68+
69+
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
70+
@Override
71+
public Subscription call(Observer<T> observer) {
72+
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
73+
74+
subscription.wrap(new Subscription() {
75+
@Override
76+
public void unsubscribe() {
77+
// on unsubscribe remove it from the map of outbound observers to notify
78+
observers.remove(subscription);
79+
}
80+
});
81+
82+
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(observer, subscription);
83+
synchronizedObserver.onNext(currentValue.get());
84+
85+
// on subscribe add it to the map of outbound observers to notify
86+
observers.put(subscription, synchronizedObserver);
87+
return subscription;
88+
}
89+
};
90+
91+
return new BehaviorSubject<T>(currentValue, onSubscribe, observers);
92+
}
93+
94+
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
95+
private final AtomicReference<T> currentValue;
96+
97+
protected BehaviorSubject(AtomicReference<T> currentValue, Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
98+
super(onSubscribe);
99+
this.currentValue = currentValue;
100+
this.observers = observers;
101+
}
102+
103+
@Override
104+
public void onCompleted() {
105+
for (Observer<T> observer : observers.values()) {
106+
observer.onCompleted();
107+
}
108+
}
109+
110+
@Override
111+
public void onError(Exception e) {
112+
for (Observer<T> observer : observers.values()) {
113+
observer.onError(e);
114+
}
115+
}
116+
117+
@Override
118+
public void onNext(T args) {
119+
currentValue.set(args);
120+
for (Observer<T> observer : observers.values()) {
121+
observer.onNext(args);
122+
}
123+
}
124+
125+
public static class UnitTest {
126+
127+
private final Exception testException = new Exception();
128+
129+
@Test
130+
public void testThatObserverReceivesDefaultValueIfNothingWasPublished() {
131+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
132+
133+
@SuppressWarnings("unchecked")
134+
Observer<String> aObserver = mock(Observer.class);
135+
subject.subscribe(aObserver);
136+
137+
subject.onNext("one");
138+
subject.onNext("two");
139+
subject.onNext("three");
140+
141+
assertReceivedAllEvents(aObserver);
142+
}
143+
144+
private void assertReceivedAllEvents(Observer<String> aObserver) {
145+
verify(aObserver, times(1)).onNext("default");
146+
verify(aObserver, times(1)).onNext("one");
147+
verify(aObserver, times(1)).onNext("two");
148+
verify(aObserver, times(1)).onNext("three");
149+
verify(aObserver, Mockito.never()).onError(testException);
150+
verify(aObserver, Mockito.never()).onCompleted();
151+
}
152+
153+
@Test
154+
public void testThatObserverDoesNotReceiveDefaultValueIfSomethingWasPublished() {
155+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
156+
157+
subject.onNext("one");
158+
159+
@SuppressWarnings("unchecked")
160+
Observer<String> aObserver = mock(Observer.class);
161+
subject.subscribe(aObserver);
162+
163+
subject.onNext("two");
164+
subject.onNext("three");
165+
166+
assertDidNotReceiveTheDefaultValue(aObserver);
167+
}
168+
169+
private void assertDidNotReceiveTheDefaultValue(Observer<String> aObserver) {
170+
verify(aObserver, Mockito.never()).onNext("default");
171+
verify(aObserver, times(1)).onNext("one");
172+
verify(aObserver, times(1)).onNext("two");
173+
verify(aObserver, times(1)).onNext("three");
174+
verify(aObserver, Mockito.never()).onError(testException);
175+
verify(aObserver, Mockito.never()).onCompleted();
176+
}
177+
178+
@Test
179+
public void testCompleted() {
180+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
181+
182+
@SuppressWarnings("unchecked")
183+
Observer<String> aObserver = mock(Observer.class);
184+
subject.subscribe(aObserver);
185+
186+
subject.onNext("one");
187+
subject.onCompleted();
188+
189+
assertCompletedObserver(aObserver);
190+
}
191+
192+
private void assertCompletedObserver(Observer<String> aObserver)
193+
{
194+
verify(aObserver, times(1)).onNext("default");
195+
verify(aObserver, times(1)).onNext("one");
196+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
197+
verify(aObserver, times(1)).onCompleted();
198+
}
199+
200+
@Test
201+
public void testCompletedAfterError() {
202+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
203+
204+
@SuppressWarnings("unchecked")
205+
Observer<String> aObserver = mock(Observer.class);
206+
subject.subscribe(aObserver);
207+
208+
subject.onNext("one");
209+
subject.onError(testException);
210+
subject.onNext("two");
211+
subject.onCompleted();
212+
213+
assertErrorObserver(aObserver);
214+
}
215+
216+
private void assertErrorObserver(Observer<String> aObserver)
217+
{
218+
verify(aObserver, times(1)).onNext("default");
219+
verify(aObserver, times(1)).onNext("one");
220+
verify(aObserver, times(1)).onError(testException);
221+
}
222+
223+
@Test
224+
public void testUnsubscribe()
225+
{
226+
UnsubscribeTester.test(new Func0<BehaviorSubject<String>>()
227+
{
228+
@Override
229+
public BehaviorSubject<String> call()
230+
{
231+
return BehaviorSubject.createWithDefaultValue("default");
232+
}
233+
}, new Action1<BehaviorSubject<String>>()
234+
{
235+
@Override
236+
public void call(BehaviorSubject<String> DefaultSubject)
237+
{
238+
DefaultSubject.onCompleted();
239+
}
240+
}, new Action1<BehaviorSubject<String>>()
241+
{
242+
@Override
243+
public void call(BehaviorSubject<String> DefaultSubject)
244+
{
245+
DefaultSubject.onError(new Exception());
246+
}
247+
}, new Action1<BehaviorSubject<String>>()
248+
{
249+
@Override
250+
public void call(BehaviorSubject<String> DefaultSubject)
251+
{
252+
DefaultSubject.onNext("one");
253+
}
254+
});
255+
}
256+
}
257+
}

0 commit comments

Comments
 (0)