Skip to content

Commit 731e0a0

Browse files
committed
Fixed ReplaySubject leak
1 parent 5753a92 commit 731e0a0

File tree

6 files changed

+36
-7
lines changed

6 files changed

+36
-7
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void call(SubjectObserver<? super T> o) {
8282
// to send onCompleted if the last value is an onNext
8383
emitValueToObserver(lastNotification.get(), o);
8484
}
85-
});
85+
}, null);
8686

8787
return new AsyncSubject<T>(onSubscribe, subscriptionManager, lastNotification);
8888
}

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void call(SubjectObserver<? super T> o) {
128128
*/
129129
lastNotification.get().accept(o);
130130
}
131-
});
131+
}, null);
132132

133133
return new BehaviorSubject<T>(onSubscribe, subscriptionManager, lastNotification);
134134
}

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void call(SubjectObserver<? super T> o) {
7979
*/
8080
lastNotification.get().accept(o);
8181
}
82-
});
82+
}, null);
8383

8484
return new PublishSubject<T>(onSubscribe, subscriptionManager, lastNotification);
8585
}

rxjava-core/src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,15 @@ public void call(SubjectObserver<? super T> o) {
8484

8585
@Override
8686
public void call(SubjectObserver<? super T> o) {
87+
Integer idx = state.replayState.remove(o);
8788
// we will finish replaying if there is anything left
88-
replayObserverFromIndex(state.history, state.replayState.get(o), o);
89+
replayObserverFromIndex(state.history, idx, o);
90+
}
91+
},
92+
new Action1<SubjectObserver<? super T>>() {
93+
@Override
94+
public void call(SubjectObserver<? super T> o) {
95+
state.replayState.remove(o);
8996
}
9097
});
9198

@@ -229,5 +236,10 @@ public void complete(Notification<T> n) {
229236
terminalValue.set(n);
230237
}
231238
}
232-
239+
/**
240+
* @return Returns the number of subscribers.
241+
*/
242+
/* Support test.*/ int subscriberCount() {
243+
return state.replayState.size();
244+
}
233245
}

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,16 @@
3939
* Always runs at the beginning of 'subscribe' regardless of terminal state.
4040
* @param onTerminated
4141
* Only runs if Subject is in terminal state and the Observer ends up not being registered.
42+
* @param onUnsubscribe called after the child subscription is removed from the state
4243
* @return
4344
*/
44-
public OnSubscribe<T> getOnSubscribeFunc(final Action1<SubjectObserver<? super T>> onSubscribe, final Action1<SubjectObserver<? super T>> onTerminated) {
45+
public OnSubscribe<T> getOnSubscribeFunc(final Action1<SubjectObserver<? super T>> onSubscribe,
46+
final Action1<SubjectObserver<? super T>> onTerminated,
47+
final Action1<SubjectObserver<? super T>> onUnsubscribe) {
4548
return new OnSubscribe<T>() {
4649
@Override
4750
public void call(Subscriber<? super T> actualOperator) {
48-
SubjectObserver<T> observer = new SubjectObserver<T>(actualOperator);
51+
final SubjectObserver<T> observer = new SubjectObserver<T>(actualOperator);
4952
// invoke onSubscribe logic
5053
if (onSubscribe != null) {
5154
onSubscribe.call(observer);
@@ -84,6 +87,9 @@ public void call() {
8487
// on unsubscribe remove it from the map of outbound observers to notify
8588
newState = current.removeObserver(subscription);
8689
} while (!state.compareAndSet(current, newState));
90+
if (onUnsubscribe != null) {
91+
onUnsubscribe.call(observer);
92+
}
8793
}
8894
}));
8995

rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,5 +344,16 @@ public void onNext(String v) {
344344
assertEquals("three", lastValueForObserver2.get());
345345

346346
}
347+
@Test
348+
public void testSubscriptionLeak() {
349+
ReplaySubject<Object> replaySubject = ReplaySubject.create();
350+
351+
Subscription s = replaySubject.subscribe();
352+
353+
assertEquals(1, replaySubject.subscriberCount());
347354

355+
s.unsubscribe();
356+
357+
assertEquals(0, replaySubject.subscriberCount());
358+
}
348359
}

0 commit comments

Comments
 (0)