Skip to content

Commit 947e5f8

Browse files
committed
Fixes to the operators.
1 parent d62ddb7 commit 947e5f8

File tree

5 files changed

+220
-35
lines changed

5 files changed

+220
-35
lines changed

rxjava-core/src/main/java/rx/observers/Subscribers.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,9 @@
77
import rx.functions.Action1;
88

99
public class Subscribers {
10-
11-
private static final Subscriber<Object> EMPTY = new Subscriber<Object>() {
12-
13-
@Override
14-
public final void onCompleted() {
15-
// do nothing
16-
}
17-
18-
@Override
19-
public final void onError(Throwable e) {
20-
throw new OnErrorNotImplementedException(e);
21-
}
22-
23-
@Override
24-
public final void onNext(Object args) {
25-
// do nothing
26-
}
27-
28-
};
29-
3010
@SuppressWarnings("unchecked")
3111
public static <T> Subscriber<T> empty() {
32-
return (Subscriber<T>) EMPTY;
12+
return from(Observers.empty());
3313
}
3414

3515
public static <T> Subscriber<T> from(final Observer<? super T> o) {

rxjava-core/src/main/java/rx/operators/OperatorMulticast.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.BitSet;
1819
import rx.Observable;
1920
import rx.Subscriber;
2021
import rx.Subscription;
@@ -49,9 +50,10 @@ public void call(Subscriber<? super R> subscriber) {
4950

5051
@Override
5152
public Subscription connect() {
53+
Subscriber<T> s = null;
5254
synchronized (guard) {
5355
if (subscription == null) {
54-
subscription = source.unsafeSubscribe(new Subscriber<T>() {
56+
s = new Subscriber<T>() {
5557
@Override
5658
public void onCompleted() {
5759
subject.onCompleted();
@@ -66,18 +68,24 @@ public void onError(Throwable e) {
6668
public void onNext(T args) {
6769
subject.onNext(args);
6870
}
69-
});
71+
};
72+
subscription = s;
7073
}
7174
}
75+
if (s != null) {
76+
source.unsafeSubscribe(s);
77+
}
7278

7379
return Subscriptions.create(new Action0() {
7480
@Override
7581
public void call() {
82+
Subscription s;
7683
synchronized (guard) {
77-
if (subscription != null) {
78-
subscription.unsubscribe();
79-
subscription = null;
80-
}
84+
s = subscription;
85+
subscription = null;
86+
}
87+
if (s != null) {
88+
s.unsubscribe();
8189
}
8290
}
8391
});

rxjava-core/src/main/java/rx/operators/OperatorRefCount.java

Lines changed: 154 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.WeakHashMap;
1822
import rx.Observable.OnSubscribe;
1923
import rx.Subscriber;
2024
import rx.Subscription;
@@ -31,32 +35,175 @@ public final class OperatorRefCount<T> implements OnSubscribe<T> {
3135
final ConnectableObservable<? extends T> source;
3236
final Object guard;
3337
/** Guarded by guard. */
34-
int count;
38+
int index;
3539
/** Guarded by guard. */
40+
boolean emitting;
41+
/** Guarded by guard. If true, indicates a connection request, false indicates a disconnect request. */
42+
List<Token> queue;
43+
/** Manipulated while in the serialized section. */
44+
int count;
45+
/** Manipulated while in the serialized section. */
3646
Subscription connection;
47+
/** Manipulated while in the serialized section. */
48+
final Map<Token, Object> connectionStatus;
49+
/** Occupied indicator. */
50+
private static final Object OCCUPIED = new Object();
3751
public OperatorRefCount(ConnectableObservable<? extends T> source) {
3852
this.source = source;
3953
this.guard = new Object();
54+
this.connectionStatus = new WeakHashMap<Token, Object>();
4055
}
4156

4257
@Override
4358
public void call(Subscriber<? super T> t1) {
59+
int id;
60+
synchronized (guard) {
61+
id = ++index;
62+
}
63+
final Token t = new Token(id);
4464
t1.add(Subscriptions.create(new Action0() {
4565
@Override
4666
public void call() {
47-
synchronized (guard) {
48-
if (--count == 0) {
49-
connection.unsubscribe();
50-
connection = null;
51-
}
52-
}
67+
disconnect(t);
5368
}
5469
}));
5570
source.unsafeSubscribe(t1);
71+
connect(t);
72+
}
73+
private void connect(Token id) {
74+
List<Token> localQueue;
75+
synchronized (guard) {
76+
if (emitting) {
77+
if (queue == null) {
78+
queue = new ArrayList<Token>();
79+
}
80+
queue.add(id);
81+
return;
82+
}
83+
84+
localQueue = queue;
85+
queue = null;
86+
emitting = true;
87+
}
88+
boolean once = true;
89+
do {
90+
drain(localQueue);
91+
if (once) {
92+
once = false;
93+
doConnect(id);
94+
}
95+
synchronized (guard) {
96+
localQueue = queue;
97+
queue = null;
98+
if (localQueue == null) {
99+
emitting = false;
100+
return;
101+
}
102+
}
103+
} while (true);
104+
}
105+
private void disconnect(Token id) {
106+
List<Token> localQueue;
56107
synchronized (guard) {
108+
if (emitting) {
109+
if (queue == null) {
110+
queue = new ArrayList<Token>();
111+
}
112+
queue.add(id.toDisconnect()); // negative value indicates disconnect
113+
return;
114+
}
115+
116+
localQueue = queue;
117+
queue = null;
118+
emitting = true;
119+
}
120+
boolean once = true;
121+
do {
122+
drain(localQueue);
123+
if (once) {
124+
once = false;
125+
doDisconnect(id);
126+
}
127+
synchronized (guard) {
128+
localQueue = queue;
129+
queue = null;
130+
if (localQueue == null) {
131+
emitting = false;
132+
return;
133+
}
134+
}
135+
} while (true);
136+
}
137+
private void drain(List<Token> localQueue) {
138+
if (localQueue == null) {
139+
return;
140+
}
141+
int n = localQueue.size();
142+
for (int i = 0; i < n; i++) {
143+
Token id = localQueue.get(i);
144+
if (id.isDisconnect()) {
145+
doDisconnect(id);
146+
} else {
147+
doConnect(id);
148+
}
149+
}
150+
}
151+
private void doConnect(Token id) {
152+
// this method is called only once per id
153+
// if add succeeds, id was not yet disconnected
154+
if (connectionStatus.put(id, OCCUPIED) == null) {
57155
if (count++ == 0) {
58156
connection = source.connect();
59157
}
158+
} else {
159+
// connection exists due to disconnect, just remove
160+
connectionStatus.remove(id);
161+
}
162+
}
163+
private void doDisconnect(Token id) {
164+
// this method is called only once per id
165+
// if remove succeeds, id was connected
166+
if (connectionStatus.remove(id) != null) {
167+
if (--count == 0) {
168+
connection.unsubscribe();
169+
connection = null;
170+
}
171+
} else {
172+
// mark id as if connected
173+
connectionStatus.put(id, OCCUPIED);
174+
}
175+
}
176+
/** Token that represens a connection request or a disconnection request. */
177+
private static final class Token {
178+
final int id;
179+
public Token(int id) {
180+
this.id = id;
181+
}
182+
183+
@Override
184+
public boolean equals(Object obj) {
185+
if (obj == null) {
186+
return false;
187+
}
188+
if (obj.getClass() != getClass()) {
189+
return false;
190+
}
191+
int other = ((Token)obj).id;
192+
return id == other || -id == other;
193+
}
194+
195+
@Override
196+
public int hashCode() {
197+
return id < 0 ? -id : id;
198+
}
199+
public boolean isDisconnect() {
200+
return id < 0;
201+
}
202+
public Token toDisconnect() {
203+
if (id < 0) {
204+
return this;
205+
}
206+
return new Token(-id);
60207
}
61208
}
62209
}

rxjava-core/src/main/java/rx/operators/OperatorSampleWithTime.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public void onError(Throwable e) {
8080
@Override
8181
public void onCompleted() {
8282
subscriber.onCompleted();
83+
unsubscribe();
8384
}
8485

8586
@Override

rxjava-core/src/test/java/rx/RefCountTests.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package rx;
1717

1818
import static org.junit.Assert.assertEquals;
19-
import static org.mockito.Mockito.mock;
19+
import static org.mockito.Mockito.*;
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
@@ -25,11 +25,14 @@
2525

2626
import org.junit.Before;
2727
import org.junit.Test;
28+
import org.mockito.InOrder;
2829
import org.mockito.MockitoAnnotations;
2930

3031
import rx.functions.Action0;
3132
import rx.functions.Action1;
33+
import rx.observers.Subscribers;
3234
import rx.schedulers.TestScheduler;
35+
import rx.subjects.ReplaySubject;
3336
import rx.subscriptions.Subscriptions;
3437

3538
public class RefCountTests {
@@ -152,4 +155,50 @@ public void call(Long t1) {
152155
assertEquals(1L, list3.get(1).longValue());
153156

154157
}
158+
159+
@Test
160+
public void testAlreadyUnsubscribedClient() {
161+
Subscriber<Integer> done = Subscribers.empty();
162+
done.unsubscribe();
163+
164+
@SuppressWarnings("unchecked")
165+
Observer<Integer> o = mock(Observer.class);
166+
167+
Observable<Integer> result = Observable.just(1).publish().refCount();
168+
169+
result.subscribe(done);
170+
171+
result.subscribe(o);
172+
173+
verify(o).onNext(1);
174+
verify(o).onCompleted();
175+
verify(o, never()).onError(any(Throwable.class));
176+
}
177+
@Test
178+
public void testAlreadyUnsubscribedInterleavesWithClient() {
179+
ReplaySubject<Integer> source = ReplaySubject.create();
180+
181+
Subscriber<Integer> done = Subscribers.empty();
182+
done.unsubscribe();
183+
184+
@SuppressWarnings("unchecked")
185+
Observer<Integer> o = mock(Observer.class);
186+
InOrder inOrder = inOrder(o);
187+
188+
Observable<Integer> result = source.publish().refCount();
189+
190+
result.subscribe(o);
191+
192+
source.onNext(1);
193+
194+
result.subscribe(done);
195+
196+
source.onNext(2);
197+
source.onCompleted();
198+
199+
inOrder.verify(o).onNext(1);
200+
inOrder.verify(o).onNext(2);
201+
inOrder.verify(o).onCompleted();
202+
verify(o, never()).onError(any(Throwable.class));
203+
}
155204
}

0 commit comments

Comments
 (0)