Skip to content

Commit 9c1ca02

Browse files
Merge pull request #1400 from benjchristensen/internal-data-structures
Internal Data Structures
2 parents 746f199 + fa94e1d commit 9c1ca02

21 files changed

+2997
-1
lines changed
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.concurrent.atomic.AtomicReferenceArray;
20+
21+
import rx.Subscription;
22+
import rx.functions.Func1;
23+
import rx.internal.util.unsafe.AtomicIntReferenceArray;
24+
import rx.internal.util.unsafe.UnsafeAccess;
25+
26+
/**
27+
* Add/Remove without object allocation (after initial construction).
28+
* <p>
29+
* This is meant for hundreds or single-digit thousands of elements that need
30+
* to be rapidly added and randomly or sequentially removed while avoiding object allocation.
31+
* <p>
32+
* On Intel Core i7, 2.3Mhz, Mac Java 8:
33+
* <p>
34+
* - adds per second single-threaded => ~32,598,500 for 100
35+
* - adds per second single-threaded => ~23,200,000 for 10,000
36+
* - adds + removes per second single-threaded => 15,562,100 for 100
37+
* - adds + removes per second single-threaded => 8,760,000 for 10,000
38+
*
39+
* <pre> {@code
40+
* Benchmark (size) Mode Samples Mean Mean error Units
41+
* r.i.u.PerfIndexedRingBuffer.indexedRingBufferAdd 100 thrpt 5 307403.329 17487.185 ops/s
42+
* r.i.u.PerfIndexedRingBuffer.indexedRingBufferAdd 10000 thrpt 5 1819.151 764.603 ops/s
43+
* r.i.u.PerfIndexedRingBuffer.indexedRingBufferAddRemove 100 thrpt 5 149649.075 4765.899 ops/s
44+
* r.i.u.PerfIndexedRingBuffer.indexedRingBufferAddRemove 10000 thrpt 5 825.304 14.079 ops/s
45+
* } </pre>
46+
*
47+
* @param <E>
48+
*/
49+
public class IndexedRingBuffer<E> implements Subscription {
50+
51+
private static final ObjectPool<IndexedRingBuffer> POOL = new ObjectPool<IndexedRingBuffer>() {
52+
53+
@Override
54+
protected IndexedRingBuffer createObject() {
55+
return new IndexedRingBuffer();
56+
}
57+
58+
};
59+
60+
public final static IndexedRingBuffer getInstance() {
61+
return POOL.borrowObject();
62+
}
63+
64+
private final ElementSection<E> elements = new ElementSection<E>();
65+
private final IndexSection removed = new IndexSection();
66+
/* package for unit testing */final AtomicInteger index = new AtomicInteger();
67+
/* package for unit testing */final AtomicInteger removedIndex = new AtomicInteger();
68+
/* package for unit testing */static final int SIZE = 512;
69+
70+
/**
71+
* This resets the arrays, nulls out references and returns it to the pool.
72+
* This extra CPU cost is far smaller than the object allocation cost of not pooling.
73+
*/
74+
public void releaseToPool() {
75+
// need to clear all elements so we don't leak memory
76+
int maxIndex = index.get();
77+
int realIndex = 0;
78+
ElementSection<E> section = elements;
79+
outer: while (section != null) {
80+
for (int i = 0; i < SIZE; i++, realIndex++) {
81+
if (realIndex >= maxIndex) {
82+
section = null;
83+
break outer;
84+
}
85+
// we can use lazySet here because we are nulling things out and not accessing them again
86+
// (relative on Mac Intel i7) lazySet gets us ~30m vs ~26m ops/second in the JMH test (100 adds per release)
87+
section.array.set(i, null);
88+
}
89+
section = section.next;
90+
}
91+
92+
index.set(0);
93+
removedIndex.set(0);
94+
POOL.returnObject(this);
95+
}
96+
97+
@Override
98+
public void unsubscribe() {
99+
releaseToPool();
100+
}
101+
102+
private IndexedRingBuffer() {
103+
if (!UnsafeAccess.isUnsafeAvailable()) {
104+
throw new IllegalStateException("This does not work on systems without sun.misc.Unsafe");
105+
}
106+
// TODO need to make this class (or its users) have alternative support for non-Unsafe environments
107+
}
108+
109+
/**
110+
* Add an element and return the index where it was added to allow removal.
111+
*
112+
* @param e
113+
* @return
114+
*/
115+
public int add(E e) {
116+
int i = getIndexForAdd();
117+
if (i < SIZE) {
118+
// fast-path when we are in the first section
119+
elements.array.set(i, e);
120+
return i;
121+
} else {
122+
int sectionIndex = i % SIZE;
123+
getElementSection(i).array.set(sectionIndex, e);
124+
return i;
125+
}
126+
}
127+
128+
public E remove(int index) {
129+
try {
130+
E e;
131+
if (index < SIZE) {
132+
// fast-path when we are in the first section
133+
e = elements.array.getAndSet(index, null);
134+
} else {
135+
int sectionIndex = index % SIZE;
136+
e = getElementSection(index).array.getAndSet(sectionIndex, null);
137+
}
138+
pushRemovedIndex(index);
139+
return e;
140+
} catch (NullPointerException ne) {
141+
ne.printStackTrace();
142+
throw ne;
143+
}
144+
}
145+
146+
private IndexSection getIndexSection(int index) {
147+
// short-cut the normal case
148+
if (index < SIZE) {
149+
return removed;
150+
}
151+
152+
// if we have passed the first array we get more complicated and do recursive chaining
153+
int numSections = index / SIZE;
154+
IndexSection a = removed;
155+
for (int i = 0; i < numSections; i++) {
156+
a = a.getNext();
157+
}
158+
return a;
159+
}
160+
161+
private ElementSection<E> getElementSection(int index) {
162+
// short-cut the normal case
163+
if (index < SIZE) {
164+
return elements;
165+
}
166+
167+
// if we have passed the first array we get more complicated and do recursive chaining
168+
int numSections = index / SIZE;
169+
ElementSection<E> a = elements;
170+
for (int i = 0; i < numSections; i++) {
171+
a = a.getNext();
172+
}
173+
return a;
174+
}
175+
176+
private synchronized int getIndexForAdd() {
177+
/*
178+
* Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
179+
*/
180+
int i;
181+
int ri = getIndexFromPreviouslyRemoved();
182+
if (ri >= 0) {
183+
if (ri < SIZE) {
184+
// fast-path when we are in the first section
185+
i = removed.array.getAndSet(ri, -1);
186+
} else {
187+
int sectionIndex = ri % SIZE;
188+
i = getIndexSection(ri).array.getAndSet(sectionIndex, -1);
189+
}
190+
} else {
191+
i = index.getAndIncrement();
192+
}
193+
return i;
194+
}
195+
196+
/**
197+
* Returns -1 if nothing, 0 or greater if the index should be used
198+
*
199+
* @return
200+
*/
201+
private synchronized int getIndexFromPreviouslyRemoved() {
202+
/*
203+
* Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
204+
*/
205+
206+
// loop because of CAS
207+
while (true) {
208+
int currentRi = removedIndex.get();
209+
if (currentRi > 0) {
210+
// claim it
211+
if (removedIndex.compareAndSet(currentRi, currentRi - 1)) {
212+
return currentRi - 1;
213+
}
214+
} else {
215+
// do nothing
216+
return -1;
217+
}
218+
}
219+
}
220+
221+
private synchronized void pushRemovedIndex(int elementIndex) {
222+
/*
223+
* Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
224+
*/
225+
226+
int i = removedIndex.getAndIncrement();
227+
if (i < SIZE) {
228+
// fast-path when we are in the first section
229+
removed.array.set(i, elementIndex);
230+
} else {
231+
int sectionIndex = i % SIZE;
232+
getIndexSection(i).array.set(sectionIndex, elementIndex);
233+
}
234+
}
235+
236+
@Override
237+
public boolean isUnsubscribed() {
238+
return false;
239+
}
240+
241+
public int forEach(Func1<? super E, Boolean> action) {
242+
return forEach(action, 0);
243+
}
244+
245+
/**
246+
*
247+
* @param action
248+
* that processes each item and returns true if it wants to continue to the next
249+
* @return int of next index to process, or last index seen if it exited early
250+
*/
251+
public int forEach(Func1<? super E, Boolean> action, int startIndex) {
252+
int endedAt = forEach(action, startIndex, index.get());
253+
if (startIndex > 0 && endedAt == index.get()) {
254+
// start at the beginning again and go up to startIndex
255+
endedAt = forEach(action, 0, startIndex);
256+
} else if (endedAt == index.get()) {
257+
// start back at the beginning
258+
endedAt = 0;
259+
}
260+
return endedAt;
261+
}
262+
263+
private int forEach(Func1<? super E, Boolean> action, int startIndex, int endIndex) {
264+
int lastIndex = startIndex;
265+
int maxIndex = index.get();
266+
int realIndex = startIndex;
267+
ElementSection<E> section = elements;
268+
269+
if (startIndex >= SIZE) {
270+
int orig = startIndex;
271+
// move into the correct section
272+
section = getElementSection(startIndex);
273+
startIndex = startIndex % SIZE;
274+
}
275+
276+
outer: while (section != null) {
277+
for (int i = startIndex; i < SIZE; i++, realIndex++) {
278+
if (realIndex >= maxIndex || realIndex >= endIndex) {
279+
section = null;
280+
break outer;
281+
}
282+
E element = section.array.get(i);
283+
if (element == null) {
284+
continue;
285+
}
286+
lastIndex = realIndex;
287+
boolean continueLoop = action.call(element);
288+
if (!continueLoop) {
289+
return lastIndex;
290+
}
291+
}
292+
section = section.next;
293+
startIndex = 0; // reset to start for next section
294+
}
295+
296+
// return the OutOfBounds index position if we processed all of them ... the one we should be less-than
297+
return realIndex;
298+
}
299+
300+
private static class ElementSection<E> {
301+
final AtomicReferenceArray<E> array = new AtomicReferenceArray<E>(SIZE);
302+
volatile ElementSection<E> next;
303+
private static final long _nextOffset;
304+
305+
static {
306+
try {
307+
_nextOffset = UnsafeAccess.UNSAFE.objectFieldOffset(ElementSection.class.getDeclaredField("next"));
308+
} catch (Exception ex) {
309+
throw new Error(ex);
310+
}
311+
}
312+
313+
ElementSection<E> getNext() {
314+
if (next != null) {
315+
return next;
316+
} else {
317+
ElementSection<E> newSection = new ElementSection<E>();
318+
if (UnsafeAccess.UNSAFE.compareAndSwapObject(this, _nextOffset, null, newSection)) {
319+
// we won
320+
return newSection;
321+
} else {
322+
// we lost so get the value that won
323+
return next;
324+
}
325+
}
326+
}
327+
}
328+
329+
private static class IndexSection {
330+
final AtomicIntReferenceArray array = new AtomicIntReferenceArray(SIZE);
331+
private volatile IndexSection next;
332+
private static final long _nextOffset;
333+
334+
static {
335+
try {
336+
_nextOffset = UnsafeAccess.UNSAFE.objectFieldOffset(IndexSection.class.getDeclaredField("next"));
337+
} catch (Exception ex) {
338+
throw new Error(ex);
339+
}
340+
}
341+
342+
IndexSection getNext() {
343+
if (next != null) {
344+
return next;
345+
} else {
346+
IndexSection newSection = new IndexSection();
347+
if (UnsafeAccess.UNSAFE.compareAndSwapObject(this, _nextOffset, null, newSection)) {
348+
// we won
349+
return newSection;
350+
} else {
351+
// we lost so get the value that won
352+
return next;
353+
}
354+
}
355+
}
356+
}
357+
}

rxjava-core/src/main/java/rx/internal/util/MpscPaddedQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void clear() {
143143
static final class Node<E> {
144144
E value;
145145
@SuppressWarnings(value = "rawtypes")
146-
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
146+
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
147147
private volatile Node<E> next;
148148

149149
Node(E value) {

0 commit comments

Comments
 (0)