Skip to content

Commit 8a43e43

Browse files
committed
feat: buffer pool add weak ref cache
1 parent 983c23e commit 8a43e43

File tree

4 files changed

+186
-9
lines changed

4 files changed

+186
-9
lines changed

client/src/main/java/com/github/dtprj/dongting/buf/FixSizeBufferPool.java

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
*/
1616
package com.github.dtprj.dongting.buf;
1717

18-
import com.github.dtprj.dongting.common.DtException;
18+
import com.github.dtprj.dongting.common.DtBugException;
1919
import com.github.dtprj.dongting.common.IndexedQueue;
2020

21+
import java.lang.ref.WeakReference;
2122
import java.nio.ByteBuffer;
2223

2324
/**
@@ -35,13 +36,16 @@ class FixSizeBufferPool {
3536
private static final int MAGIC = 0xEA1D9C07;
3637

3738
private final IndexedQueue<ByteBuffer> bufferStack;
39+
private final IndexedQueue<WeakReference<ByteBuffer>> weakRefStack;
40+
private final boolean weakRefEnabled;
3841

3942
long statBorrowCount;
4043
long statBorrowHitCount;
4144
long statReleaseCount;
4245
long statReleaseHitCount;
4346

44-
public FixSizeBufferPool(SimpleByteBufferPool p, boolean direct, long shareSize, int minCount, int maxCount, int bufferSize) {
47+
public FixSizeBufferPool(SimpleByteBufferPool p, boolean direct, long shareSize,
48+
int minCount, int maxCount, int bufferSize, int weakRefThreshold) {
4549
this.p = p;
4650
this.direct = direct;
4751
this.shareSize = shareSize;
@@ -52,24 +56,46 @@ public FixSizeBufferPool(SimpleByteBufferPool p, boolean direct, long shareSize,
5256
this.maxCount = maxCount;
5357
this.bufferSize = bufferSize;
5458
this.bufferStack = new IndexedQueue<>(maxCount);
59+
// Enable weak reference feature for heap buffers with size >= threshold
60+
// Direct buffers are excluded because they create "iceberg" objects
61+
this.weakRefEnabled = !direct && bufferSize >= weakRefThreshold;
62+
this.weakRefStack = weakRefEnabled ? new IndexedQueue<>(16) : null;
5563
}
5664

5765
public ByteBuffer borrow() {
5866
statBorrowCount++;
59-
ByteBuffer buf = bufferStack.removeLast();
67+
ByteBuffer buf = borrow0();
6068
if (buf != null) {
6169
int bufMagic = buf.getInt(MAGIC_INDEX);
6270
if (bufMagic != MAGIC) {
63-
throw new DtException("A bug may exist where the buffer is written to after release.");
71+
throw new DtBugException("A bug may exist where the buffer is written to after release.");
6472
}
6573
buf.putInt(MAGIC_INDEX, 0);
6674
statBorrowHitCount++;
67-
updateCurrentUsedShareSizeAfterRemove();
6875
buf.clear();
6976
}
7077
return buf;
7178
}
7279

80+
private ByteBuffer borrow0() {
81+
if (weakRefEnabled) {
82+
while (weakRefStack.size() > 0) {
83+
WeakReference<ByteBuffer> ref = weakRefStack.removeLast();
84+
ByteBuffer buf = ref.get();
85+
if (buf != null) {
86+
return buf;
87+
}
88+
}
89+
}
90+
91+
ByteBuffer buf = bufferStack.removeLast();
92+
if (buf != null) {
93+
updateCurrentUsedShareSizeAfterRemove();
94+
return buf;
95+
}
96+
return null;
97+
}
98+
7399
private void updateCurrentUsedShareSizeAfterRemove() {
74100
int size = bufferStack.size();
75101
if (size >= maxCount) {
@@ -87,15 +113,19 @@ public boolean release(ByteBuffer buf, long nanos) {
87113
// shit
88114
for (int i = 0, stackSize = bufferStack.size(); i < stackSize; i++) {
89115
if (bufferStack.get(i) == buf) {
90-
throw new DtException("A bug may exist where the buffer is released twice.");
116+
throw new DtBugException("A bug may exist where the buffer is released twice.");
91117
}
92118
}
93119
}
94120

95121
if (bufferStack.size() >= maxCount) {
96122
long newUsedShareSize = p.currentUsedShareSize + bufferSize;
97123
if (newUsedShareSize > shareSize) {
98-
// too many buffer in pool
124+
if (weakRefEnabled) {
125+
// only write magic, return time is not needed
126+
buf.putInt(MAGIC_INDEX, MAGIC);
127+
weakRefStack.addLast(new WeakReference<>(buf));
128+
}
99129
return false;
100130
} else {
101131
p.currentUsedShareSize = newUsedShareSize;
@@ -124,9 +154,34 @@ public void clean(long expireNanos) {
124154
updateCurrentUsedShareSizeAfterRemove();
125155
if (direct) {
126156
SimpleByteBufferPool.VF.releaseDirectBuffer(buf);
157+
} else if (weakRefEnabled) {
158+
// the buffer is not used recently, add to bottom
159+
weakRefStack.addFirst(new WeakReference<>(buf));
127160
}
128161
}
129162
}
163+
if (weakRefEnabled) {
164+
cleanWeakRefHeadAndTail();
165+
}
166+
}
167+
168+
private void cleanWeakRefHeadAndTail() {
169+
WeakReference<ByteBuffer> ref = weakRefStack.getFirst();
170+
if (ref == null) {
171+
return;
172+
}
173+
while (ref.get() == null) {
174+
weakRefStack.removeFirst();
175+
ref = weakRefStack.getFirst();
176+
if (ref == null) {
177+
return;
178+
}
179+
}
180+
ref = weakRefStack.getLast();
181+
while (ref != null && ref.get() == null) {
182+
weakRefStack.removeLast();
183+
ref = weakRefStack.getLast();
184+
}
130185
}
131186

132187
public void cleanAll() {

client/src/main/java/com/github/dtprj/dongting/buf/SimpleByteBufferPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public SimpleByteBufferPool(SimpleByteBufferPoolConfig config) {
9696
this.pools = new FixSizeBufferPool[bufferTypeCount];
9797
for (int i = 0; i < bufferTypeCount; i++) {
9898
this.pools[i] = new FixSizeBufferPool(this, direct, config.shareSize,
99-
minCount[i], maxCount[i], bufSizes[i]);
99+
minCount[i], maxCount[i], bufSizes[i], config.weakRefThreshold);
100100
}
101101
}
102102

client/src/main/java/com/github/dtprj/dongting/buf/SimpleByteBufferPoolConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
* @author huangli
2222
*/
2323
public class SimpleByteBufferPoolConfig {
24+
public static final int DEFAULT_WEAK_REF_THRESHOLD = 4096;
25+
2426
public final Timestamp ts;
2527
public final boolean direct;
2628
public final boolean threadSafe;
2729
public final int threshold;
30+
public final int weakRefThreshold;
2831
public final int[] bufSizes;
2932
public final int[] minCount;
3033
public final int[] maxCount;
@@ -33,14 +36,21 @@ public class SimpleByteBufferPoolConfig {
3336

3437
public SimpleByteBufferPoolConfig(Timestamp ts, boolean direct, int threshold, boolean threadSafe,
3538
int[] bufSizes, int[] minCount, int[] maxCount) {
36-
this(ts, direct, threshold, threadSafe, bufSizes, minCount, maxCount, 10 * 1000, 0);
39+
this(ts, direct, threshold, DEFAULT_WEAK_REF_THRESHOLD, threadSafe, bufSizes, minCount, maxCount, 10 * 1000, 0);
3740
}
3841

3942
public SimpleByteBufferPoolConfig(Timestamp ts, boolean direct, int threshold, boolean threadSafe, int[] bufSizes,
4043
int[] minCount, int[] maxCount, long timeoutMillis, long shareSize) {
44+
this(ts, direct, threshold, DEFAULT_WEAK_REF_THRESHOLD, threadSafe, bufSizes, minCount, maxCount, timeoutMillis, shareSize);
45+
}
46+
47+
public SimpleByteBufferPoolConfig(Timestamp ts, boolean direct, int threshold, int weakRefThreshold,
48+
boolean threadSafe, int[] bufSizes, int[] minCount, int[] maxCount,
49+
long timeoutMillis, long shareSize) {
4150
this.ts = ts;
4251
this.direct = direct;
4352
this.threshold = threshold;
53+
this.weakRefThreshold = weakRefThreshold;
4454
this.threadSafe = threadSafe;
4555
this.bufSizes = bufSizes;
4656
this.minCount = minCount;

client/src/test/java/com/github/dtprj/dongting/buf/SimpleByteBufferPoolTest.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private void plus(SimpleByteBufferPool pool, long millis) {
4040
public void tearDown() {
4141
if (pool != null) {
4242
pool.formatStat();
43+
pool.cleanAll();
4344
pool = null;
4445
}
4546
}
@@ -322,6 +323,117 @@ public void testBadUsage() {
322323
pool.release(buf3);
323324
}
324325

326+
@Test
327+
public void testWeakRefNotEnabledForDirect() {
328+
SimpleByteBufferPoolConfig c = new SimpleByteBufferPoolConfig(TS, true, 0, false,
329+
new int[]{4096}, new int[]{1}, new int[]{2});
330+
pool = new SimpleByteBufferPool(c);
331+
ByteBuffer buf1 = pool.borrow(4096);
332+
ByteBuffer buf2 = pool.borrow(4096);
333+
ByteBuffer buf3 = pool.borrow(4096);
334+
pool.release(buf1);
335+
pool.release(buf2);
336+
pool.release(buf3);
337+
assertSame(buf2, pool.borrow(4096));
338+
assertSame(buf1, pool.borrow(4096));
339+
assertNotSame(buf3, pool.borrow(4096));
340+
}
341+
342+
@Test
343+
public void testWeakRefNotEnabledForSmallBuffer() {
344+
SimpleByteBufferPoolConfig c = new SimpleByteBufferPoolConfig(TS, false, 0, 4096,
345+
false, new int[]{1024}, new int[]{1}, new int[]{2}, 1000, 0);
346+
pool = new SimpleByteBufferPool(c);
347+
ByteBuffer buf1 = pool.borrow(1024);
348+
ByteBuffer buf2 = pool.borrow(1024);
349+
ByteBuffer buf3 = pool.borrow(1024);
350+
pool.release(buf1);
351+
pool.release(buf2);
352+
pool.release(buf3);
353+
assertSame(buf2, pool.borrow(1024));
354+
assertSame(buf1, pool.borrow(1024));
355+
assertNotSame(buf3, pool.borrow(1024));
356+
}
357+
358+
@Test
359+
public void testWeakRefReleaseToWeakStack() {
360+
for (int attempt = 0; attempt < 3; attempt++) {
361+
SimpleByteBufferPoolConfig c = new SimpleByteBufferPoolConfig(TS, false, 0, false,
362+
new int[]{4096}, new int[]{1}, new int[]{2});
363+
pool = new SimpleByteBufferPool(c);
364+
ByteBuffer buf1 = pool.borrow(4096);
365+
ByteBuffer buf2 = pool.borrow(4096);
366+
ByteBuffer buf3 = pool.borrow(4096);
367+
pool.release(buf1);
368+
pool.release(buf2);
369+
pool.release(buf3);
370+
ByteBuffer b1 = pool.borrow(4096);
371+
ByteBuffer b2 = pool.borrow(4096);
372+
ByteBuffer b3 = pool.borrow(4096);
373+
if (b1 == buf3 && b2 == buf2 && b3 == buf1) {
374+
return;
375+
}
376+
}
377+
fail("weak ref test failed after 3 attempts");
378+
}
379+
380+
@Test
381+
public void testWeakRefCleanToWeakStack() {
382+
for (int attempt = 0; attempt < 3; attempt++) {
383+
SimpleByteBufferPoolConfig c = new SimpleByteBufferPoolConfig(TS, false, 0, false,
384+
new int[]{4096}, new int[]{1}, new int[]{3}, 1000, 0);
385+
pool = new SimpleByteBufferPool(c);
386+
ByteBuffer buf1 = pool.borrow(4096);
387+
ByteBuffer buf2 = pool.borrow(4096);
388+
ByteBuffer buf3 = pool.borrow(4096);
389+
pool.release(buf1);
390+
pool.release(buf2);
391+
pool.release(buf3);
392+
393+
plus(pool, 1001);
394+
pool.clean();
395+
396+
ByteBuffer buf4 = pool.borrow(4096);
397+
ByteBuffer buf5 = pool.borrow(4096);
398+
ByteBuffer buf6 = pool.borrow(4096);
399+
400+
if (buf4 == buf1 && buf5 == buf2 && buf6 == buf3) {
401+
return;
402+
}
403+
}
404+
fail("weak ref test failed after 3 attempts");
405+
}
406+
407+
@Test
408+
public void testWeakRefGCAndClean() {
409+
SimpleByteBufferPoolConfig c = new SimpleByteBufferPoolConfig(TS, false, 0, false,
410+
new int[]{4096}, new int[]{0}, new int[]{1}, 1000, 0);
411+
SimpleByteBufferPool testPool = new SimpleByteBufferPool(c);
412+
413+
ByteBuffer buf1 = testPool.borrow(4096);
414+
ByteBuffer buf2 = testPool.borrow(4096);
415+
ByteBuffer buf3 = testPool.borrow(4096);
416+
testPool.release(buf1);
417+
testPool.release(buf2);
418+
testPool.release(buf3);
419+
420+
//noinspection UnusedAssignment
421+
buf2 = null;
422+
423+
System.gc();
424+
System.runFinalization();
425+
426+
testPool.clean();
427+
428+
ByteBuffer borrowed1 = testPool.borrow(4096);
429+
ByteBuffer borrowed2 = testPool.borrow(4096);
430+
431+
assertNotNull(borrowed1);
432+
assertNotNull(borrowed2);
433+
assertEquals(4096, borrowed1.capacity());
434+
assertEquals(4096, borrowed2.capacity());
435+
}
436+
325437
public static void main(String[] args) {
326438
System.out.println("default two level global");
327439
System.out.printf("max:%,d\nmin:%,d\n\n",

0 commit comments

Comments
 (0)