3131public final class CompositeSubscription implements Subscription {
3232
3333 private Set <Subscription > subscriptions ;
34- private boolean unsubscribed = false ;
34+ private volatile boolean unsubscribed ;
3535
3636 public CompositeSubscription () {
3737 }
@@ -41,7 +41,7 @@ public CompositeSubscription(final Subscription... subscriptions) {
4141 }
4242
4343 @ Override
44- public synchronized boolean isUnsubscribed () {
44+ public boolean isUnsubscribed () {
4545 return unsubscribed ;
4646 }
4747
@@ -58,21 +58,19 @@ public void add(final Subscription s) {
5858 if (s .isUnsubscribed ()) {
5959 return ;
6060 }
61- Subscription unsubscribe = null ;
62- synchronized (this ) {
63- if (unsubscribed ) {
64- unsubscribe = s ;
65- } else {
66- if (subscriptions == null ) {
67- subscriptions = new HashSet <Subscription >(4 );
61+ if (!unsubscribed ) {
62+ synchronized (this ) {
63+ if (!unsubscribed ) {
64+ if (subscriptions == null ) {
65+ subscriptions = new HashSet <Subscription >(4 );
66+ }
67+ subscriptions .add (s );
68+ return ;
6869 }
69- subscriptions .add (s );
7070 }
7171 }
72- if (unsubscribe != null ) {
73- // call after leaving the synchronized block so we're not holding a lock while executing this
74- unsubscribe .unsubscribe ();
75- }
72+ // call after leaving the synchronized block so we're not holding a lock while executing this
73+ s .unsubscribe ();
7674 }
7775
7876 /**
@@ -83,16 +81,18 @@ public void add(final Subscription s) {
8381 * the {@link Subscription} to remove
8482 */
8583 public void remove (final Subscription s ) {
86- boolean unsubscribe = false ;
87- synchronized (this ) {
88- if (unsubscribed || subscriptions == null ) {
89- return ;
84+ if (!unsubscribed ) {
85+ boolean unsubscribe = false ;
86+ synchronized (this ) {
87+ if (unsubscribed || subscriptions == null ) {
88+ return ;
89+ }
90+ unsubscribe = subscriptions .remove (s );
91+ }
92+ if (unsubscribe ) {
93+ // if we removed successfully we then need to call unsubscribe on it (outside of the lock)
94+ s .unsubscribe ();
9095 }
91- unsubscribe = subscriptions .remove (s );
92- }
93- if (unsubscribe ) {
94- // if we removed successfully we then need to call unsubscribe on it (outside of the lock)
95- s .unsubscribe ();
9696 }
9797 }
9898
@@ -102,28 +102,35 @@ public void remove(final Subscription s) {
102102 * an unoperative state.
103103 */
104104 public void clear () {
105- Collection <Subscription > unsubscribe = null ;
106- synchronized (this ) {
107- if (unsubscribed || subscriptions == null ) {
108- return ;
109- } else {
110- unsubscribe = subscriptions ;
111- subscriptions = null ;
105+ if (!unsubscribed ) {
106+ Collection <Subscription > unsubscribe = null ;
107+ synchronized (this ) {
108+ if (unsubscribed || subscriptions == null ) {
109+ return ;
110+ } else {
111+ unsubscribe = subscriptions ;
112+ subscriptions = null ;
113+ }
112114 }
115+ unsubscribeFromAll (unsubscribe );
113116 }
114- unsubscribeFromAll (unsubscribe );
115117 }
116118
117119 @ Override
118120 public void unsubscribe () {
119- synchronized (this ) {
120- if (unsubscribed ) {
121- return ;
121+ if (!unsubscribed ) {
122+ Collection <Subscription > unsubscribe = null ;
123+ synchronized (this ) {
124+ if (unsubscribed ) {
125+ return ;
126+ }
127+ unsubscribed = true ;
128+ unsubscribe = subscriptions ;
129+ subscriptions = null ;
122130 }
123- unsubscribed = true ;
131+ // we will only get here once
132+ unsubscribeFromAll (unsubscribe );
124133 }
125- // we will only get here once
126- unsubscribeFromAll (subscriptions );
127134 }
128135
129136 private static void unsubscribeFromAll (Collection <Subscription > subscriptions ) {
@@ -143,4 +150,16 @@ private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
143150 }
144151 Exceptions .throwIfAny (es );
145152 }
153+ /**
154+ * Returns true if this composite is not unsubscribed and contains subscriptions.
155+ * @return {@code true} if this composite is not unsubscribed and contains subscriptions.
156+ */
157+ public boolean hasSubscriptions () {
158+ if (!unsubscribed ) {
159+ synchronized (this ) {
160+ return !unsubscribed && subscriptions != null && !subscriptions .isEmpty ();
161+ }
162+ }
163+ return false ;
164+ }
146165}
0 commit comments