Skip to content

Commit a23db28

Browse files
Implememt auto release technique for Blob Allocator (apache#15491)
* add auto release logic * add license * enhance Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org> * Refine API Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org> * fix typo Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org> --------- Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org> Co-authored-by: MrQuansy <siyiquan2021@gmail.com>
1 parent a63b608 commit a23db28

File tree

8 files changed

+274
-35
lines changed

8 files changed

+274
-35
lines changed

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.binaryallocator.arena.Arena;
2323
import org.apache.iotdb.commons.binaryallocator.arena.ArenaStrategy;
24+
import org.apache.iotdb.commons.binaryallocator.autoreleaser.Releaser;
2425
import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
2526
import org.apache.iotdb.commons.binaryallocator.evictor.Evictor;
2627
import org.apache.iotdb.commons.binaryallocator.metric.BinaryAllocatorMetrics;
@@ -33,7 +34,11 @@
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

37+
import java.lang.ref.ReferenceQueue;
3638
import java.time.Duration;
39+
import java.util.Collections;
40+
import java.util.Set;
41+
import java.util.concurrent.ConcurrentHashMap;
3742
import java.util.concurrent.atomic.AtomicReference;
3843

3944
public class BinaryAllocator {
@@ -49,14 +54,22 @@ public class BinaryAllocator {
4954

5055
private final BinaryAllocatorMetrics metrics;
5156
private Evictor sampleEvictor;
57+
private Releaser autoReleaser;
5258
private static final ThreadLocal<ThreadArenaRegistry> arenaRegistry =
5359
ThreadLocal.withInitial(ThreadArenaRegistry::new);
5460

55-
private static final int WARNING_GC_TIME_PERCENTAGE = 10;
56-
private static final int HALF_GC_TIME_PERCENTAGE = 20;
61+
private static final int WARNING_GC_TIME_PERCENTAGE = 20;
62+
private static final int HALF_GC_TIME_PERCENTAGE = 25;
5763
private static final int SHUTDOWN_GC_TIME_PERCENTAGE = 30;
5864
private static final int RESTART_GC_TIME_PERCENTAGE = 5;
5965

66+
public final ReferenceQueue<PooledBinary> referenceQueue = new ReferenceQueue<>();
67+
68+
// JDK 9+ Cleaner uses double-linked list and synchronized to manage references, which has worse
69+
// performance than lock-free hash set
70+
public final Set<PooledBinaryPhantomReference> phantomRefs =
71+
Collections.newSetFromMap(new ConcurrentHashMap<>());
72+
6073
public BinaryAllocator(AllocatorConfig allocatorConfig) {
6174
this.allocatorConfig = allocatorConfig;
6275

@@ -87,8 +100,14 @@ public synchronized void start() {
87100
sampleEvictor =
88101
new SampleEvictor(
89102
ThreadName.BINARY_ALLOCATOR_SAMPLE_EVICTOR.getName(),
90-
allocatorConfig.durationEvictorShutdownTimeout);
91-
sampleEvictor.startEvictor(allocatorConfig.durationBetweenEvictorRuns);
103+
allocatorConfig.durationShutdownTimeout,
104+
allocatorConfig.durationBetweenEvictorRuns);
105+
sampleEvictor.start();
106+
autoReleaser =
107+
new AutoReleaser(
108+
ThreadName.BINARY_ALLOCATOR_AUTO_RELEASER.getName(),
109+
allocatorConfig.durationShutdownTimeout);
110+
autoReleaser.start();
92111
}
93112

94113
public synchronized void close(boolean forceClose) {
@@ -99,13 +118,14 @@ public synchronized void close(boolean forceClose) {
99118
state.set(BinaryAllocatorState.PENDING);
100119
}
101120

102-
sampleEvictor.stopEvictor();
121+
sampleEvictor.stop();
122+
autoReleaser.stop();
103123
for (Arena arena : heapArenas) {
104124
arena.close();
105125
}
106126
}
107127

108-
public PooledBinary allocateBinary(int reqCapacity) {
128+
public PooledBinary allocateBinary(int reqCapacity, boolean autoRelease) {
109129
if (reqCapacity < allocatorConfig.minAllocateSize
110130
|| reqCapacity > allocatorConfig.maxAllocateSize
111131
|| state.get() != BinaryAllocatorState.OPEN) {
@@ -114,7 +134,7 @@ public PooledBinary allocateBinary(int reqCapacity) {
114134

115135
Arena arena = arenaStrategy.choose(heapArenas);
116136

117-
return new PooledBinary(arena.allocate(reqCapacity), reqCapacity, arena.getArenaID());
137+
return arena.allocate(reqCapacity, autoRelease);
118138
}
119139

120140
public void deallocateBinary(PooledBinary binary) {
@@ -125,7 +145,7 @@ public void deallocateBinary(PooledBinary binary) {
125145
int arenaIndex = binary.getArenaIndex();
126146
if (arenaIndex != -1) {
127147
Arena arena = heapArenas[arenaIndex];
128-
arena.deallocate(binary.getValues());
148+
arena.deallocate(binary);
129149
}
130150
}
131151
}
@@ -168,11 +188,13 @@ public static BinaryAllocator getInstance() {
168188
}
169189

170190
private static class BinaryAllocatorHolder {
191+
171192
private static final BinaryAllocator INSTANCE =
172193
new BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG);
173194
}
174195

175196
private static class ThreadArenaRegistry {
197+
176198
private Arena threadArenaBinding = null;
177199

178200
public Arena getArena() {
@@ -199,6 +221,7 @@ protected void finalize() {
199221
}
200222

201223
private static class LeastUsedArenaStrategy implements ArenaStrategy {
224+
202225
@Override
203226
public Arena choose(Arena[] arenas) {
204227
Arena boundArena = arenaRegistry.get().getArena();
@@ -250,8 +273,9 @@ public void runGcEviction(long curGcTimePercent) {
250273

251274
public class SampleEvictor extends Evictor {
252275

253-
public SampleEvictor(String name, Duration evictorShutdownTimeoutDuration) {
254-
super(name, evictorShutdownTimeoutDuration);
276+
public SampleEvictor(
277+
String name, Duration evictorShutdownTimeoutDuration, Duration durationBetweenEvictorRuns) {
278+
super(name, evictorShutdownTimeoutDuration, durationBetweenEvictorRuns);
255279
}
256280

257281
@Override
@@ -263,4 +287,26 @@ public void run() {
263287
metrics.updateSampleEvictionCounter(evictedSize);
264288
}
265289
}
290+
291+
/** Process phantomly reachable objects and return their byte arrays to pool. */
292+
public class AutoReleaser extends Releaser {
293+
294+
public AutoReleaser(String name, Duration shutdownTimeoutDuration) {
295+
super(name, shutdownTimeoutDuration);
296+
}
297+
298+
@Override
299+
public void run() {
300+
PooledBinaryPhantomReference ref;
301+
try {
302+
while ((ref = (PooledBinaryPhantomReference) referenceQueue.remove()) != null) {
303+
phantomRefs.remove(ref);
304+
ref.slabRegion.deallocate(ref.byteArray);
305+
}
306+
} catch (InterruptedException e) {
307+
LOGGER.info("{} exits due to interruptedException.", name);
308+
Thread.currentThread().interrupt();
309+
}
310+
}
311+
}
266312
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.binaryallocator;
21+
22+
import org.apache.iotdb.commons.binaryallocator.arena.Arena;
23+
24+
import org.apache.tsfile.utils.PooledBinary;
25+
26+
import java.lang.ref.PhantomReference;
27+
import java.lang.ref.ReferenceQueue;
28+
29+
public class PooledBinaryPhantomReference extends PhantomReference<PooledBinary> {
30+
public final byte[] byteArray;
31+
public Arena.SlabRegion slabRegion;
32+
33+
public PooledBinaryPhantomReference(
34+
PooledBinary referent,
35+
ReferenceQueue<? super PooledBinary> q,
36+
byte[] byteArray,
37+
Arena.SlabRegion region) {
38+
super(referent, q);
39+
this.byteArray = byteArray;
40+
this.slabRegion = region;
41+
}
42+
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@
2020
package org.apache.iotdb.commons.binaryallocator.arena;
2121

2222
import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
23+
import org.apache.iotdb.commons.binaryallocator.PooledBinaryPhantomReference;
2324
import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
2425
import org.apache.iotdb.commons.binaryallocator.ema.AdaptiveWeightedAverage;
2526
import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses;
2627

28+
import org.apache.tsfile.utils.PooledBinary;
29+
30+
import java.lang.ref.ReferenceQueue;
31+
import java.util.Set;
2732
import java.util.concurrent.ConcurrentLinkedQueue;
2833
import java.util.concurrent.atomic.AtomicInteger;
2934

@@ -39,6 +44,9 @@ public class Arena {
3944

4045
private int sampleCount;
4146

47+
private final ReferenceQueue<PooledBinary> referenceQueue;
48+
private final Set<PooledBinaryPhantomReference> phantomRefs;
49+
4250
public Arena(
4351
BinaryAllocator allocator, SizeClasses sizeClasses, int id, AllocatorConfig allocatorConfig) {
4452
this.binaryAllocator = allocator;
@@ -52,20 +60,31 @@ public Arena(
5260
}
5361

5462
sampleCount = 0;
63+
referenceQueue = binaryAllocator.referenceQueue;
64+
phantomRefs = binaryAllocator.phantomRefs;
5565
}
5666

5767
public int getArenaID() {
5868
return arenaID;
5969
}
6070

61-
public byte[] allocate(int reqCapacity) {
71+
public PooledBinary allocate(int reqCapacity, boolean autoRelease) {
6272
final int sizeIdx = sizeClasses.size2SizeIdx(reqCapacity);
63-
return regions[sizeIdx].allocate();
73+
byte[] data = regions[sizeIdx].allocate();
74+
if (autoRelease) {
75+
PooledBinary binary = new PooledBinary(data, reqCapacity, -1);
76+
PooledBinaryPhantomReference ref =
77+
new PooledBinaryPhantomReference(binary, referenceQueue, data, regions[sizeIdx]);
78+
phantomRefs.add(ref);
79+
return binary;
80+
} else {
81+
return new PooledBinary(data, reqCapacity, arenaID);
82+
}
6483
}
6584

66-
public void deallocate(byte[] bytes) {
67-
final int sizeIdx = sizeClasses.size2SizeIdx(bytes.length);
68-
regions[sizeIdx].deallocate(bytes);
85+
public void deallocate(PooledBinary binary) {
86+
final int sizeIdx = sizeClasses.size2SizeIdx(binary.getLength());
87+
regions[sizeIdx].deallocate(binary.getValues());
6988
}
7089

7190
public long evict(double ratio) {
@@ -146,8 +165,13 @@ public long runSampleEviction() {
146165
return evictedSize;
147166
}
148167

149-
private static class SlabRegion {
168+
public static class SlabRegion {
150169
private final int byteArraySize;
170+
171+
// Current implementation uses ConcurrentLinkedQueue for simplicity
172+
// TODO: Can be optimized with more efficient lock-free approaches:
173+
// 1. No need for strict FIFO, it's just an object pool
174+
// 2. Use segmented arrays/queues with per-segment counters to reduce contention
151175
private final ConcurrentLinkedQueue<byte[]> queue;
152176

153177
private final AtomicInteger allocationsFromAllocator;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.binaryallocator.autoreleaser;
21+
22+
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import java.time.Duration;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Future;
30+
import java.util.concurrent.TimeUnit;
31+
32+
public abstract class Releaser implements Runnable {
33+
private static final Logger LOGGER = LoggerFactory.getLogger(Releaser.class);
34+
35+
private Future<?> future;
36+
protected final String name;
37+
private final Duration shutdownTimeoutDuration;
38+
39+
private ExecutorService executor;
40+
41+
public Releaser(String name, Duration shutdownTimeoutDuration) {
42+
this.name = name;
43+
this.shutdownTimeoutDuration = shutdownTimeoutDuration;
44+
}
45+
46+
/** Cancels the future. */
47+
void cancel() {
48+
future.cancel(false);
49+
}
50+
51+
@Override
52+
public abstract void run();
53+
54+
void setFuture(final Future<?> future) {
55+
this.future = future;
56+
}
57+
58+
@Override
59+
public String toString() {
60+
return getClass().getName() + " [future=" + future + "]";
61+
}
62+
63+
public void start() {
64+
if (null == executor) {
65+
executor = IoTDBThreadPoolFactory.newSingleThreadExecutor(name);
66+
}
67+
final Future<?> future = executor.submit(this);
68+
this.setFuture(future);
69+
}
70+
71+
public void stop() {
72+
if (executor == null) {
73+
return;
74+
}
75+
76+
LOGGER.info("Stopping {}", name);
77+
78+
cancel();
79+
executor.shutdown();
80+
try {
81+
boolean result =
82+
executor.awaitTermination(shutdownTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS);
83+
if (!result) {
84+
LOGGER.info("unable to stop auto releaser after {} ms", shutdownTimeoutDuration.toMillis());
85+
}
86+
} catch (final InterruptedException ignored) {
87+
Thread.currentThread().interrupt();
88+
}
89+
executor = null;
90+
}
91+
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ public class AllocatorConfig {
3737
public boolean enableBinaryAllocator =
3838
CommonDescriptor.getInstance().getConfig().isEnableBinaryAllocator();
3939

40-
/** Maximum wait time in milliseconds when shutting down the evictor */
41-
public Duration durationEvictorShutdownTimeout = Duration.ofMillis(1000L);
40+
/** Maximum wait time in milliseconds when shutting down the evictor and autoReleaser */
41+
public Duration durationShutdownTimeout = Duration.ofMillis(1000L);
4242

4343
/** Time interval in milliseconds between two consecutive evictor runs */
4444
public Duration durationBetweenEvictorRuns = Duration.ofMillis(1000L);

0 commit comments

Comments
 (0)