19
19
import rx .util .functions .Action1 ;
20
20
import rx .util .functions .Func1 ;
21
21
22
- public class Subject <T > extends Observable <T > implements Observer <T > {
23
- public static <T > Subject <T > create () {
22
+ public class PublishSubject <T > extends Observable <T > implements Observer <T > {
23
+ public static <T > PublishSubject <T > create () {
24
24
final ConcurrentHashMap <Subscription , Observer <T >> observers = new ConcurrentHashMap <Subscription , Observer <T >>();
25
25
26
26
Func1 <Observer <T >, Subscription > onSubscribe = new Func1 <Observer <T >, Subscription >() {
@@ -42,12 +42,12 @@ public void unsubscribe() {
42
42
}
43
43
};
44
44
45
- return new Subject <T >(onSubscribe , observers );
45
+ return new PublishSubject <T >(onSubscribe , observers );
46
46
}
47
47
48
48
private final ConcurrentHashMap <Subscription , Observer <T >> observers ;
49
49
50
- protected Subject (Func1 <Observer <T >, Subscription > onSubscribe , ConcurrentHashMap <Subscription , Observer <T >> observers ) {
50
+ protected PublishSubject (Func1 <Observer <T >, Subscription > onSubscribe , ConcurrentHashMap <Subscription , Observer <T >> observers ) {
51
51
super (onSubscribe );
52
52
this .observers = observers ;
53
53
}
@@ -76,10 +76,10 @@ public void onNext(T args) {
76
76
public static class UnitTest {
77
77
@ Test
78
78
public void test () {
79
- Subject <Integer > subject = Subject .create ();
79
+ PublishSubject <Integer > publishSubject = PublishSubject .create ();
80
80
final AtomicReference <List <Notification <String >>> actualRef = new AtomicReference <List <Notification <String >>>();
81
81
82
- Observable <List <Notification <Integer >>> wNotificationsList = subject .materialize ().toList ();
82
+ Observable <List <Notification <Integer >>> wNotificationsList = publishSubject .materialize ().toList ();
83
83
wNotificationsList .subscribe (new Action1 <List <Notification <String >>>() {
84
84
@ Override
85
85
public void call (List <Notification <String >> actual ) {
@@ -108,10 +108,10 @@ public void unsubscribe() {
108
108
}
109
109
};
110
110
}
111
- }).subscribe (subject );
112
- // the subject has received an onComplete from the first subscribe because
111
+ }).subscribe (publishSubject );
112
+ // the publishSubject has received an onComplete from the first subscribe because
113
113
// it is synchronous and the next subscribe won't do anything.
114
- Observable .toObservable (-1 , -2 , -3 ).subscribe (subject );
114
+ Observable .toObservable (-1 , -2 , -3 ).subscribe (publishSubject );
115
115
116
116
List <Notification <Integer >> expected = new ArrayList <Notification <Integer >>();
117
117
expected .add (new Notification <Integer >(-1 ));
0 commit comments