19
19
import java .util .LinkedList ;
20
20
import java .util .List ;
21
21
import java .util .Queue ;
22
+ import java .util .concurrent .atomic .AtomicBoolean ;
23
+
22
24
import rx .Notification ;
23
25
import rx .Observable ;
24
- import rx .subscriptions . SingleAssignmentSubscription ;
26
+ import rx .operators . SafeObservableSubscription ;
25
27
import rx .util .functions .Action1 ;
26
28
27
29
/**
@@ -33,14 +35,15 @@ public final class JoinObserver1<T> extends ObserverBase<Notification<T>> implem
33
35
private final Action1 <Throwable > onError ;
34
36
private final List <ActivePlan0 > activePlans ;
35
37
private final Queue <Notification <T >> queue ;
36
- private final SingleAssignmentSubscription subscription ;
38
+ private final SafeObservableSubscription subscription ;
37
39
private volatile boolean done ;
40
+ private final AtomicBoolean subscribed = new AtomicBoolean (false );
38
41
39
42
public JoinObserver1 (Observable <T > source , Action1 <Throwable > onError ) {
40
43
this .source = source ;
41
44
this .onError = onError ;
42
45
queue = new LinkedList <Notification <T >>();
43
- subscription = new SingleAssignmentSubscription ();
46
+ subscription = new SafeObservableSubscription ();
44
47
activePlans = new ArrayList <ActivePlan0 >();
45
48
}
46
49
public Queue <Notification <T >> queue () {
@@ -51,8 +54,12 @@ public void addActivePlan(ActivePlan0 activePlan) {
51
54
}
52
55
@ Override
53
56
public void subscribe (Object gate ) {
54
- this .gate = gate ;
55
- subscription .set (source .materialize ().subscribe (this ));
57
+ if (subscribed .compareAndSet (false , true )) {
58
+ this .gate = gate ;
59
+ subscription .wrap (source .materialize ().subscribe (this ));
60
+ } else {
61
+ throw new IllegalStateException ("Can only be subscribed to once." );
62
+ }
56
63
}
57
64
58
65
@ Override
0 commit comments