Skip to content

Commit f0dbccb

Browse files
committed
Perf profile & simplifications.
1 parent 51d68ed commit f0dbccb

File tree

2 files changed

+69
-11
lines changed

2 files changed

+69
-11
lines changed

src/main/java/rx/internal/schedulers/ScheduledAction.java

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
*/
1616
package rx.internal.schedulers;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
1820
import java.util.concurrent.Future;
1921
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2022

2123
import rx.Subscription;
24+
import rx.exceptions.CompositeException;
2225
import rx.exceptions.OnErrorNotImplementedException;
2326
import rx.functions.Action0;
2427
import rx.plugins.RxJavaPlugins;
@@ -29,20 +32,22 @@
2932
* {@code Subscriber} in respect of an {@code Observer}.
3033
*/
3134
public final class ScheduledAction implements Runnable, Subscription {
32-
final CompositeSubscription cancel;
35+
final Subscription[] cancel;
36+
volatile int count;
3337
final Action0 action;
3438
/** Set by the run() method to avoid self interrupting at the end of the run method. */
35-
volatile Thread runner;
39+
Thread runner;
3640

3741
public ScheduledAction(Action0 action) {
3842
this.action = action;
39-
this.cancel = new CompositeSubscription();
43+
this.cancel = new Subscription[4];
4044
}
4145

4246
@Override
4347
public void run() {
48+
Thread thread = Thread.currentThread();
4449
try {
45-
runner = Thread.currentThread();
50+
runner = thread;
4651
action.call();
4752
} catch (Throwable e) {
4853
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
@@ -53,7 +58,6 @@ public void run() {
5358
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
5459
}
5560
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
56-
Thread thread = Thread.currentThread();
5761
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
5862
} finally {
5963
unsubscribe();
@@ -62,22 +66,76 @@ public void run() {
6266

6367
@Override
6468
public boolean isUnsubscribed() {
65-
return cancel.isUnsubscribed();
69+
return count < 0;
6670
}
6771

6872
@Override
6973
public void unsubscribe() {
70-
cancel.unsubscribe();
74+
Subscription[] subs = cancel;
75+
if (count >= 0) {
76+
synchronized (this) {
77+
if (count < 0) {
78+
return;
79+
}
80+
count = -1;
81+
}
82+
}
83+
unsubscribeFromAll(subs);
7184
}
7285

86+
7387
/**
7488
* Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed
7589
* if the underlying {@code action} completes or the this scheduled action is cancelled.
7690
*
7791
* @param s the Subscription to add
7892
*/
7993
public void add(Subscription s) {
80-
cancel.add(s);
94+
if (count >= 0) {
95+
synchronized (this) {
96+
int c = count;
97+
if (c >= 0) {
98+
cancel[c] = s;
99+
count = c + 1;
100+
return;
101+
}
102+
}
103+
}
104+
s.unsubscribe();
105+
}
106+
107+
private static void unsubscribeFromAll(Subscription... subscriptions) {
108+
if (subscriptions == null) {
109+
return;
110+
}
111+
List<Throwable> es = null;
112+
for (Subscription s : subscriptions) {
113+
if (s == null) {
114+
break;
115+
}
116+
try {
117+
s.unsubscribe();
118+
} catch (Throwable e) {
119+
if (es == null) {
120+
es = new ArrayList<Throwable>();
121+
}
122+
es.add(e);
123+
}
124+
}
125+
if (es != null) {
126+
if (es.size() == 1) {
127+
Throwable t = es.get(0);
128+
if (t instanceof RuntimeException) {
129+
throw (RuntimeException) t;
130+
} else {
131+
throw new CompositeException(
132+
"Failed to unsubscribe to 1 or more subscriptions.", es);
133+
}
134+
} else {
135+
throw new CompositeException(
136+
"Failed to unsubscribe to 2 or more subscriptions.", es);
137+
}
138+
}
81139
}
82140

83141
/**
@@ -86,7 +144,7 @@ public void add(Subscription s) {
86144
* @param f the future to add
87145
*/
88146
public void add(final Future<?> f) {
89-
cancel.add(new FutureCompleter(f));
147+
add(new FutureCompleter(f));
90148
}
91149

92150
/**
@@ -97,7 +155,7 @@ public void add(final Future<?> f) {
97155
* the parent {@code CompositeSubscription} to add
98156
*/
99157
public void addParent(CompositeSubscription parent) {
100-
cancel.add(new Remover(this, parent));
158+
add(new Remover(this, parent));
101159
}
102160

103161
/**

src/perf/java/rx/schedulers/ComputationSchedulerPerf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class ComputationSchedulerPerf {
3636
@State(Scope.Thread)
3737
public static class Input extends InputWithIncrementingInteger {
3838

39-
@Param({ "1", "1000", "1000000" })
39+
@Param({ "1", "10", "100", "1000", "2000", "3000", "4000", "10000", "100000", "1000000" })
4040
public int size;
4141

4242
@Override

0 commit comments

Comments
 (0)