1
- /**
2
- * Copyright 2013 Netflix, Inc.
3
- *
4
- * Licensed under the Apache License, Version 2.0 (the "License");
5
- * you may not use this file except in compliance with the License.
6
- * You may obtain a copy of the License at
7
- *
8
- * http://www.apache.org/licenses/LICENSE-2.0
9
- *
10
- * Unless required by applicable law or agreed to in writing, software
11
- * distributed under the License is distributed on an "AS IS" BASIS,
12
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
- * See the License for the specific language governing permissions and
14
- * limitations under the License.
15
- */
1
+ /**
2
+ * Copyright 2013 Netflix, Inc.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
16
package rx .subscriptions ;
17
17
18
18
import static java .util .Arrays .asList ;
31
31
/**
32
32
* Subscription that represents a group of Subscriptions that are unsubscribed
33
33
* together.
34
- *
35
- * @see <a
36
- * href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
37
- * equivalent CompositeDisposable</a>
34
+ *
35
+ * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
38
36
*/
39
37
public class CompositeSubscription implements Subscription {
40
38
/** Sentinel to indicate a thread is modifying the subscription set. */
41
- private static final Set <Subscription > MUTATE_SENTINEL = unmodifiableSet (Collections .<Subscription >emptySet ());
42
- /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/
43
- private static final Set <Subscription > UNSUBSCRIBED_SENTINEL = unmodifiableSet (Collections .<Subscription >emptySet ());
39
+ private static final Set <Subscription > MUTATE_SENTINEL = unmodifiableSet (Collections .<Subscription > emptySet ());
40
+ /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed. */
41
+ private static final Set <Subscription > UNSUBSCRIBED_SENTINEL = unmodifiableSet (Collections .<Subscription > emptySet ());
44
42
/** The reference to the set of subscriptions. */
45
43
private final AtomicReference <Set <Subscription >> reference = new AtomicReference <Set <Subscription >>();
46
-
44
+
47
45
public CompositeSubscription (final Subscription ... subscriptions ) {
48
46
reference .set (new HashSet <Subscription >(asList (subscriptions )));
49
47
}
50
-
48
+
51
49
public boolean isUnsubscribed () {
52
50
return reference .get () == UNSUBSCRIBED_SENTINEL ;
53
51
}
54
-
52
+
55
53
public void add (final Subscription s ) {
56
54
do {
57
55
final Set <Subscription > existing = reference .get ();
58
56
if (existing == UNSUBSCRIBED_SENTINEL ) {
59
57
s .unsubscribe ();
60
58
break ;
61
59
}
62
-
60
+
63
61
if (existing == MUTATE_SENTINEL ) {
64
62
continue ;
65
63
}
66
-
64
+
67
65
if (reference .compareAndSet (existing , MUTATE_SENTINEL )) {
68
66
existing .add (s );
69
67
reference .set (existing );
70
68
break ;
71
69
}
72
70
} while (true );
73
71
}
74
-
72
+
75
73
public void remove (final Subscription s ) {
76
74
do {
77
75
final Set <Subscription > subscriptions = reference .get ();
78
76
if (subscriptions == UNSUBSCRIBED_SENTINEL ) {
79
77
s .unsubscribe ();
80
78
break ;
81
79
}
82
-
80
+
83
81
if (subscriptions == MUTATE_SENTINEL ) {
84
82
continue ;
85
83
}
86
-
84
+
87
85
if (reference .compareAndSet (subscriptions , MUTATE_SENTINEL )) {
88
86
// also unsubscribe from it:
89
87
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
@@ -94,36 +92,39 @@ public void remove(final Subscription s) {
94
92
}
95
93
} while (true );
96
94
}
97
-
95
+
98
96
public void clear () {
99
97
do {
100
98
final Set <Subscription > subscriptions = reference .get ();
101
99
if (subscriptions == UNSUBSCRIBED_SENTINEL ) {
102
100
break ;
103
101
}
104
-
102
+
105
103
if (subscriptions == MUTATE_SENTINEL ) {
106
104
continue ;
107
105
}
108
-
106
+
109
107
if (reference .compareAndSet (subscriptions , MUTATE_SENTINEL )) {
110
108
final Set <Subscription > copy = new HashSet <Subscription >(
111
109
subscriptions );
112
110
subscriptions .clear ();
113
111
reference .set (subscriptions );
114
-
112
+
115
113
unsubscribeAll (copy );
116
114
break ;
117
115
}
118
116
} while (true );
119
117
}
118
+
120
119
/**
121
120
* Unsubscribe from the collection of subscriptions.
122
121
* <p>
123
122
* Exceptions thrown by any of the {@code unsubscribe()} methods are
124
123
* collected into a {@link CompositeException} and thrown once
125
124
* all unsubscriptions have been attempted.
126
- * @param subs the collection of subscriptions
125
+ *
126
+ * @param subs
127
+ * the collection of subscriptions
127
128
*/
128
129
private void unsubscribeAll (Collection <Subscription > subs ) {
129
130
final Collection <Throwable > es = new ArrayList <Throwable >();
@@ -139,18 +140,19 @@ private void unsubscribeAll(Collection<Subscription> subs) {
139
140
"Failed to unsubscribe to 1 or more subscriptions." , es );
140
141
}
141
142
}
143
+
142
144
@ Override
143
145
public void unsubscribe () {
144
146
do {
145
147
final Set <Subscription > subscriptions = reference .get ();
146
148
if (subscriptions == UNSUBSCRIBED_SENTINEL ) {
147
149
break ;
148
150
}
149
-
151
+
150
152
if (subscriptions == MUTATE_SENTINEL ) {
151
153
continue ;
152
154
}
153
-
155
+
154
156
if (reference .compareAndSet (subscriptions , UNSUBSCRIBED_SENTINEL )) {
155
157
unsubscribeAll (subscriptions );
156
158
break ;
0 commit comments