Skip to content

Commit a144b0e

Browse files
Refactor for Performance
- previous commit got non-blocking working but perf tests showed it slow - this commit retains non-blocking but surpasses master branch performance Master branch: 11,947,459 ops/sec This commit: 16,151,174 ops/sec
1 parent dba5de9 commit a144b0e

File tree

6 files changed

+292
-96
lines changed

6 files changed

+292
-96
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import rx.Notification;
2222
import rx.Observer;
23+
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2324
import rx.util.functions.Action1;
2425

2526
/**
@@ -63,20 +64,20 @@ public static <T> AsyncSubject<T> create() {
6364
*
6465
* This will always run, even if Subject is in terminal state.
6566
*/
66-
new Action1<Observer<? super T>>() {
67+
new Action1<SubjectObserver<? super T>>() {
6768

6869
@Override
69-
public void call(Observer<? super T> o) {
70+
public void call(SubjectObserver<? super T> o) {
7071
// nothing to do if not terminated
7172
}
7273
},
7374
/**
7475
* This function executes if the Subject is terminated.
7576
*/
76-
new Action1<Observer<? super T>>() {
77+
new Action1<SubjectObserver<? super T>>() {
7778

7879
@Override
79-
public void call(Observer<? super T> o) {
80+
public void call(SubjectObserver<? super T> o) {
8081
// we want the last value + completed so add this extra logic
8182
// to send onCompleted if the last value is an onNext
8283
emitValueToObserver(lastNotification.get(), o);
@@ -104,10 +105,10 @@ protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionManage
104105

105106
@Override
106107
public void onCompleted() {
107-
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
108+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
108109

109110
@Override
110-
public void call(Collection<Observer<? super T>> observers) {
111+
public void call(Collection<SubjectObserver<? super T>> observers) {
111112
for (Observer<? super T> o : observers) {
112113
emitValueToObserver(lastNotification.get(), o);
113114
}
@@ -117,10 +118,10 @@ public void call(Collection<Observer<? super T>> observers) {
117118

118119
@Override
119120
public void onError(final Throwable e) {
120-
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
121+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
121122

122123
@Override
123-
public void call(Collection<Observer<? super T>> observers) {
124+
public void call(Collection<SubjectObserver<? super T>> observers) {
124125
lastNotification.set(new Notification<T>(e));
125126
for (Observer<? super T> o : observers) {
126127
emitValueToObserver(lastNotification.get(), o);

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import rx.Notification;
2222
import rx.Observer;
23+
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2324
import rx.util.functions.Action1;
2425

2526
/**
@@ -95,10 +96,10 @@ public static <T> BehaviorSubject<T> create(T defaultValue) {
9596
*
9697
* This will always run, even if Subject is in terminal state.
9798
*/
98-
new Action1<Observer<? super T>>() {
99+
new Action1<SubjectObserver<? super T>>() {
99100

100101
@Override
101-
public void call(Observer<? super T> o) {
102+
public void call(SubjectObserver<? super T> o) {
102103
/*
103104
* When we subscribe we always emit the latest value to the observer.
104105
*
@@ -113,10 +114,10 @@ public void call(Observer<? super T> o) {
113114
/**
114115
* This function executes if the Subject is terminated before subscription occurs.
115116
*/
116-
new Action1<Observer<? super T>>() {
117+
new Action1<SubjectObserver<? super T>>() {
117118

118119
@Override
119-
public void call(Observer<? super T> o) {
120+
public void call(SubjectObserver<? super T> o) {
120121
/*
121122
* If we are already terminated, or termination happens while trying to subscribe
122123
* this will be invoked and we emit whatever the last terminal value was.
@@ -139,10 +140,10 @@ protected BehaviorSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionMan
139140

140141
@Override
141142
public void onCompleted() {
142-
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
143+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
143144

144145
@Override
145-
public void call(Collection<Observer<? super T>> observers) {
146+
public void call(Collection<SubjectObserver<? super T>> observers) {
146147
lastNotification.set(new Notification<T>());
147148
for (Observer<? super T> o : observers) {
148149
o.onCompleted();
@@ -153,10 +154,10 @@ public void call(Collection<Observer<? super T>> observers) {
153154

154155
@Override
155156
public void onError(final Throwable e) {
156-
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
157+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
157158

158159
@Override
159-
public void call(Collection<Observer<? super T>> observers) {
160+
public void call(Collection<SubjectObserver<? super T>> observers) {
160161
lastNotification.set(new Notification<T>(e));
161162
for (Observer<? super T> o : observers) {
162163
o.onError(e);

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import rx.Notification;
2222
import rx.Observer;
23+
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2324
import rx.util.functions.Action1;
2425

2526
/**
@@ -58,20 +59,20 @@ public static <T> PublishSubject<T> create() {
5859
*
5960
* This will always run, even if Subject is in terminal state.
6061
*/
61-
new Action1<Observer<? super T>>() {
62+
new Action1<SubjectObserver<? super T>>() {
6263

6364
@Override
64-
public void call(Observer<? super T> o) {
65+
public void call(SubjectObserver<? super T> o) {
6566
// nothing onSubscribe unless in terminal state which is the next function
6667
}
6768
},
6869
/**
6970
* This function executes if the Subject is terminated before subscription occurs.
7071
*/
71-
new Action1<Observer<? super T>>() {
72+
new Action1<SubjectObserver<? super T>>() {
7273

7374
@Override
74-
public void call(Observer<? super T> o) {
75+
public void call(SubjectObserver<? super T> o) {
7576
/*
7677
* If we are already terminated, or termination happens while trying to subscribe
7778
* this will be invoked and we emit whatever the last terminal value was.
@@ -94,10 +95,10 @@ protected PublishSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionMana
9495

9596
@Override
9697
public void onCompleted() {
97-
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
98+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
9899

99100
@Override
100-
public void call(Collection<Observer<? super T>> observers) {
101+
public void call(Collection<SubjectObserver<? super T>> observers) {
101102
lastNotification.set(new Notification<T>());
102103
for (Observer<? super T> o : observers) {
103104
o.onCompleted();
@@ -108,10 +109,10 @@ public void call(Collection<Observer<? super T>> observers) {
108109

109110
@Override
110111
public void onError(final Throwable e) {
111-
subscriptionManager.terminate(new Action1<Collection<Observer<? super T>>>() {
112+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
112113

113114
@Override
114-
public void call(Collection<Observer<? super T>> observers) {
115+
public void call(Collection<SubjectObserver<? super T>> observers) {
115116
lastNotification.set(new Notification<T>(e));
116117
for (Observer<? super T> o : observers) {
117118
o.onError(e);

0 commit comments

Comments
 (0)