Skip to content

Commit 0607db3

Browse files
RxRingBufferSize (128 default, 16 on Android)
- changing from 1024 to 128 based on perf tests - platform dependent check for Android to set to 16 to reduce memory usage
1 parent faa270c commit 0607db3

File tree

2 files changed

+182
-1
lines changed

2 files changed

+182
-1
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import java.security.AccessController;
19+
import java.security.PrivilegedAction;
20+
21+
/**
22+
* Allow platform dependent logic such as checks for Android.
23+
*
24+
* Modeled after Netty with some code copy/pasted from: https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/PlatformDependent.java
25+
*/
26+
public final class PlatformDependent {
27+
28+
private static final boolean IS_ANDROID = isAndroid0();
29+
30+
/**
31+
* Returns {@code true} if and only if the current platform is Android
32+
*/
33+
public static boolean isAndroid() {
34+
return IS_ANDROID;
35+
}
36+
37+
private static boolean isAndroid0() {
38+
boolean android;
39+
try {
40+
Class.forName("android.app.Application", false, getSystemClassLoader());
41+
android = true;
42+
} catch (Exception e) {
43+
// Failed to load the class uniquely available in Android.
44+
android = false;
45+
}
46+
47+
return android;
48+
}
49+
50+
/**
51+
* Return the system {@link ClassLoader}.
52+
*/
53+
static ClassLoader getSystemClassLoader() {
54+
if (System.getSecurityManager() == null) {
55+
return ClassLoader.getSystemClassLoader();
56+
} else {
57+
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
58+
@Override
59+
public ClassLoader run() {
60+
return ClassLoader.getSystemClassLoader();
61+
}
62+
});
63+
}
64+
}
65+
}

src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,123 @@ public static RxRingBuffer getSpmcInstance() {
158158
*/
159159
public volatile Object terminalState;
160160

161-
public static final int SIZE = 1024;
161+
// default size of ring buffer
162+
/**
163+
* 128 was chosen as the default based on the numbers below. A stream processing system may benefit from increasing to 512+.
164+
*
165+
* <pre> {@code
166+
* ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorObserveOnPerf.*'
167+
*
168+
* 1024
169+
*
170+
* Benchmark (size) Mode Samples Score Score error Units
171+
* r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 100642.874 24676.478 ops/s
172+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 4095.901 90.730 ops/s
173+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 9.797 4.982 ops/s
174+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 15536155.489 758579.454 ops/s
175+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 156257.341 6324.176 ops/s
176+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 157.099 7.143 ops/s
177+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 16864.641 1826.877 ops/s
178+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 4269.317 169.480 ops/s
179+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 13.393 1.047 ops/s
180+
*
181+
* 512
182+
*
183+
* Benchmark (size) Mode Samples Score Score error Units
184+
* r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 98945.980 48050.282 ops/s
185+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 4111.149 95.987 ops/s
186+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 12.483 3.067 ops/s
187+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16032469.143 620157.818 ops/s
188+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 157997.290 5097.718 ops/s
189+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 156.462 7.728 ops/s
190+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 15813.984 8260.170 ops/s
191+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 4358.334 251.609 ops/s
192+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 13.647 0.613 ops/s
193+
*
194+
* 256
195+
*
196+
* Benchmark (size) Mode Samples Score Score error Units
197+
* r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 108489.834 2688.489 ops/s
198+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 4526.674 728.019 ops/s
199+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 13.372 0.457 ops/s
200+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16435113.709 311602.627 ops/s
201+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 157611.204 13146.108 ops/s
202+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 158.346 2.500 ops/s
203+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 16976.775 968.191 ops/s
204+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 6238.210 2060.387 ops/s
205+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 13.465 0.566 ops/s
206+
*
207+
* 128
208+
*
209+
* Benchmark (size) Mode Samples Score Score error Units
210+
* r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 106887.027 29307.913 ops/s
211+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 6713.891 202.989 ops/s
212+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 11.929 0.187 ops/s
213+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16055774.724 350633.068 ops/s
214+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 153403.821 17976.156 ops/s
215+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 153.559 20.178 ops/s
216+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 17172.274 236.816 ops/s
217+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 7073.555 595.990 ops/s
218+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 11.855 1.093 ops/s
219+
*
220+
* 32
221+
*
222+
* Benchmark (size) Mode Samples Score Score error Units
223+
* r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 106128.589 20986.201 ops/s
224+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 6396.607 73.627 ops/s
225+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 7.643 0.668 ops/s
226+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16012419.447 409004.521 ops/s
227+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 157907.001 5772.849 ops/s
228+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 155.308 23.853 ops/s
229+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 16927.513 606.692 ops/s
230+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 5191.084 244.876 ops/s
231+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 8.288 0.217 ops/s
232+
*
233+
* 16
234+
*
235+
* Benchmark (size) Mode Samples Score Score error Units
236+
* r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 109974.741 839.064 ops/s
237+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 4538.912 173.561 ops/s
238+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 5.420 0.111 ops/s
239+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16017466.785 768748.695 ops/s
240+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 157934.065 13479.575 ops/s
241+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 155.922 17.781 ops/s
242+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 14903.686 3325.205 ops/s
243+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 3784.776 1054.131 ops/s
244+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 5.624 0.130 ops/s
245+
*
246+
* 2
247+
*
248+
* Benchmark (size) Mode Samples Score Score error Units
249+
* r.o.OperatorObserveOnPerf.observeOnComputation 1 thrpt 5 112663.216 899.005 ops/s
250+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000 thrpt 5 899.737 9.460 ops/s
251+
* r.o.OperatorObserveOnPerf.observeOnComputation 1000000 thrpt 5 0.999 0.100 ops/s
252+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1 thrpt 5 16087325.336 783206.227 ops/s
253+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000 thrpt 5 156747.025 4880.489 ops/s
254+
* r.o.OperatorObserveOnPerf.observeOnImmediate 1000000 thrpt 5 156.645 3.810 ops/s
255+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1 thrpt 5 15958.711 673.895 ops/s
256+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000 thrpt 5 884.624 47.692 ops/s
257+
* r.o.OperatorObserveOnPerf.observeOnNewThread 1000000 thrpt 5 1.173 0.100 ops/s
258+
* } </pre>
259+
*/
260+
static int _size = 128;
261+
static {
262+
// lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820)
263+
if (PlatformDependent.isAndroid()) {
264+
_size = 16;
265+
}
266+
267+
// possible system property for overriding
268+
String sizeFromProperty = System.getProperty("rx.ring-buffer.size"); // also see IndexedRingBuffer
269+
if (sizeFromProperty != null) {
270+
try {
271+
_size = Integer.parseInt(sizeFromProperty);
272+
} catch (Exception e) {
273+
System.err.println("Failed to set 'rx.buffer.size' with value " + sizeFromProperty + " => " + e.getMessage());
274+
}
275+
}
276+
}
277+
public static final int SIZE = _size;
162278

163279
private static ObjectPool<Queue<Object>> SPSC_POOL = new ObjectPool<Queue<Object>>() {
164280

0 commit comments

Comments
 (0)