Skip to content

Commit fd105e0

Browse files
Add Binary Allocator (apache#14321)
* add binary allocator * fix * add gc evictor and hot load configuration * fix * fix * add license * resolve comments * add comments * modified adaptive weighted average * fix unit test * add necessary comments * restruct package Signed-off-by: OneSizeFitQuorum <[email protected]> * restruct SizeClasses Signed-off-by: OneSizeFitQuorum <[email protected]> * enhance Signed-off-by: OneSizeFitQuorum <[email protected]> * refactor eviction thread and remove evict timer * almost finish Signed-off-by: OneSizeFitQuorum <[email protected]> * start binary allocator with hot loading * add config check * fix compile error * enhance Signed-off-by: OneSizeFitQuorum <[email protected]> * Remove duplicate test Signed-off-by: OneSizeFitQuorum <[email protected]> --------- Signed-off-by: OneSizeFitQuorum <[email protected]> Co-authored-by: MrQuansy <[email protected]>
1 parent 502f54a commit fd105e0

File tree

17 files changed

+1375
-0
lines changed

17 files changed

+1375
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iotdb.db.conf;
2020

21+
import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
2122
import org.apache.iotdb.commons.conf.CommonConfig;
2223
import org.apache.iotdb.commons.conf.CommonDescriptor;
2324
import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
@@ -2849,6 +2850,26 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
28492850

28502851
// update retry config
28512852
commonDescriptor.loadRetryProperties(properties);
2853+
2854+
// update binary allocator
2855+
commonDescriptor
2856+
.getConfig()
2857+
.setEnableBinaryAllocator(
2858+
Boolean.parseBoolean(
2859+
Optional.ofNullable(
2860+
properties.getProperty(
2861+
"enable_binary_allocator",
2862+
ConfigurationFileUtils.getConfigurationDefaultValue(
2863+
"enable_binary_allocator")))
2864+
.map(String::trim)
2865+
.orElse(
2866+
ConfigurationFileUtils.getConfigurationDefaultValue(
2867+
"enable_binary_allocator"))));
2868+
if (commonDescriptor.getConfig().isEnableBinaryAllocator()) {
2869+
BinaryAllocator.getInstance().start();
2870+
} else {
2871+
BinaryAllocator.getInstance().close(true);
2872+
}
28522873
} catch (Exception e) {
28532874
if (e instanceof InterruptedException) {
28542875
Thread.currentThread().interrupt();

iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,40 @@ data_region_iot_max_memory_ratio_for_queue = 0.6
14931493
# Datatype: long
14941494
region_migration_speed_limit_bytes_per_second = 33554432
14951495

1496+
####################
1497+
### Blob Allocator Configuration
1498+
####################
1499+
# Whether to enable binary allocator.
1500+
# For scenarios where large binary streams cause severe GC, enabling this parameter significantly improves performance.
1501+
# effectiveMode: hot_reload
1502+
enable_binary_allocator=true
1503+
1504+
# The size boundaries that allocator is responsible for
1505+
# lower boundary for allocation size
1506+
# unit: bytes
1507+
# Datatype: int
1508+
# effectiveMode: restart
1509+
small_binary_object=4096
1510+
1511+
# The size boundaries that allocator is responsible for
1512+
# upper boundary for allocation size
1513+
# unit: bytes
1514+
# Datatype: int
1515+
# effectiveMode: restart
1516+
huge_binary_object=1048576
1517+
1518+
# Number of arena regions in blob allocator, used to control concurrent performance
1519+
# Datatype: int
1520+
# effectiveMode: restart
1521+
arena_num=4
1522+
1523+
# Control the number of slabs in allocator
1524+
# The number of different sizes in each power-of-2 interval is 2^LOG2_SIZE_CLASS_GROUP
1525+
# For example: if LOG2_SIZE_CLASS_GROUP=3, between 1024-2048 there will be 8 different sizes
1526+
# Datatype: int
1527+
# effectiveMode: restart
1528+
log2_size_class_group=3
1529+
14961530
####################
14971531
### TsFile Configurations
14981532
####################
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.binaryallocator;
21+
22+
import org.apache.iotdb.commons.binaryallocator.arena.Arena;
23+
import org.apache.iotdb.commons.binaryallocator.arena.ArenaStrategy;
24+
import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
25+
import org.apache.iotdb.commons.binaryallocator.evictor.Evictor;
26+
import org.apache.iotdb.commons.binaryallocator.metric.BinaryAllocatorMetrics;
27+
import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses;
28+
import org.apache.iotdb.commons.concurrent.ThreadName;
29+
import org.apache.iotdb.commons.service.metric.MetricService;
30+
import org.apache.iotdb.commons.utils.TestOnly;
31+
32+
import org.apache.tsfile.utils.PooledBinary;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.time.Duration;
37+
import java.util.concurrent.atomic.AtomicReference;
38+
39+
public class BinaryAllocator {
40+
41+
private static final Logger LOGGER = LoggerFactory.getLogger(BinaryAllocator.class);
42+
43+
private final Arena[] heapArenas;
44+
private final AllocatorConfig allocatorConfig;
45+
46+
private final ArenaStrategy arenaStrategy = new LeastUsedArenaStrategy();
47+
private final AtomicReference<BinaryAllocatorState> state =
48+
new AtomicReference<>(BinaryAllocatorState.UNINITIALIZED);
49+
50+
private final BinaryAllocatorMetrics metrics;
51+
private Evictor sampleEvictor;
52+
private static final ThreadLocal<ThreadArenaRegistry> arenaRegistry =
53+
ThreadLocal.withInitial(ThreadArenaRegistry::new);
54+
55+
private static final int WARNING_GC_TIME_PERCENTAGE = 10;
56+
private static final int HALF_GC_TIME_PERCENTAGE = 20;
57+
private static final int SHUTDOWN_GC_TIME_PERCENTAGE = 30;
58+
private static final int RESTART_GC_TIME_PERCENTAGE = 5;
59+
60+
public BinaryAllocator(AllocatorConfig allocatorConfig) {
61+
this.allocatorConfig = allocatorConfig;
62+
63+
heapArenas = new Arena[allocatorConfig.arenaNum];
64+
SizeClasses sizeClasses = new SizeClasses(allocatorConfig);
65+
66+
for (int i = 0; i < heapArenas.length; i++) {
67+
Arena arena = new Arena(this, sizeClasses, i, allocatorConfig);
68+
heapArenas[i] = arena;
69+
}
70+
71+
this.metrics = new BinaryAllocatorMetrics(this);
72+
73+
if (allocatorConfig.enableBinaryAllocator) {
74+
start();
75+
} else {
76+
state.set(BinaryAllocatorState.CLOSE);
77+
}
78+
}
79+
80+
public synchronized void start() {
81+
if (state.get() == BinaryAllocatorState.OPEN) {
82+
return;
83+
}
84+
85+
state.set(BinaryAllocatorState.OPEN);
86+
MetricService.getInstance().addMetricSet(this.metrics);
87+
sampleEvictor =
88+
new SampleEvictor(
89+
ThreadName.BINARY_ALLOCATOR_SAMPLE_EVICTOR.getName(),
90+
allocatorConfig.durationEvictorShutdownTimeout);
91+
sampleEvictor.startEvictor(allocatorConfig.durationBetweenEvictorRuns);
92+
}
93+
94+
public synchronized void close(boolean forceClose) {
95+
if (forceClose) {
96+
state.set(BinaryAllocatorState.CLOSE);
97+
MetricService.getInstance().removeMetricSet(this.metrics);
98+
} else {
99+
state.set(BinaryAllocatorState.PENDING);
100+
}
101+
102+
sampleEvictor.stopEvictor();
103+
for (Arena arena : heapArenas) {
104+
arena.close();
105+
}
106+
}
107+
108+
public PooledBinary allocateBinary(int reqCapacity) {
109+
if (reqCapacity < allocatorConfig.minAllocateSize
110+
|| reqCapacity > allocatorConfig.maxAllocateSize
111+
|| state.get() != BinaryAllocatorState.OPEN) {
112+
return new PooledBinary(new byte[reqCapacity]);
113+
}
114+
115+
Arena arena = arenaStrategy.choose(heapArenas);
116+
117+
return new PooledBinary(arena.allocate(reqCapacity), reqCapacity, arena.getArenaID());
118+
}
119+
120+
public void deallocateBinary(PooledBinary binary) {
121+
if (binary != null
122+
&& binary.getLength() >= allocatorConfig.minAllocateSize
123+
&& binary.getLength() <= allocatorConfig.maxAllocateSize
124+
&& state.get() == BinaryAllocatorState.OPEN) {
125+
int arenaIndex = binary.getArenaIndex();
126+
if (arenaIndex != -1) {
127+
Arena arena = heapArenas[arenaIndex];
128+
arena.deallocate(binary.getValues());
129+
}
130+
}
131+
}
132+
133+
public long getTotalUsedMemory() {
134+
long totalUsedMemory = 0;
135+
for (Arena arena : heapArenas) {
136+
totalUsedMemory += arena.getTotalUsedMemory();
137+
}
138+
return totalUsedMemory;
139+
}
140+
141+
public long getTotalActiveMemory() {
142+
long totalActiveMemory = 0;
143+
for (Arena arena : heapArenas) {
144+
totalActiveMemory += arena.getActiveMemory();
145+
}
146+
return totalActiveMemory;
147+
}
148+
149+
@TestOnly
150+
public void resetArenaBinding() {
151+
arenaRegistry.get().unbindArena();
152+
}
153+
154+
public BinaryAllocatorMetrics getMetrics() {
155+
return metrics;
156+
}
157+
158+
private void evict(double ratio) {
159+
for (Arena arena : heapArenas) {
160+
arena.evict(ratio);
161+
}
162+
}
163+
164+
public static BinaryAllocator getInstance() {
165+
return BinaryAllocatorHolder.INSTANCE;
166+
}
167+
168+
private static class BinaryAllocatorHolder {
169+
private static final BinaryAllocator INSTANCE =
170+
new BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG);
171+
}
172+
173+
private static class ThreadArenaRegistry {
174+
private Arena threadArenaBinding = null;
175+
176+
public Arena getArena() {
177+
return threadArenaBinding;
178+
}
179+
180+
public void bindArena(Arena arena) {
181+
threadArenaBinding = arena;
182+
arena.incRegisteredThread();
183+
}
184+
185+
public void unbindArena() {
186+
Arena arena = threadArenaBinding;
187+
if (arena != null) {
188+
arena.decRegisteredThread();
189+
threadArenaBinding = null;
190+
}
191+
}
192+
193+
@Override
194+
protected void finalize() {
195+
unbindArena();
196+
}
197+
}
198+
199+
private static class LeastUsedArenaStrategy implements ArenaStrategy {
200+
@Override
201+
public Arena choose(Arena[] arenas) {
202+
Arena boundArena = arenaRegistry.get().getArena();
203+
if (boundArena != null) {
204+
return boundArena;
205+
}
206+
207+
Arena minArena = arenas[0];
208+
209+
for (int i = 1; i < arenas.length; i++) {
210+
Arena arena = arenas[i];
211+
if (arena.getNumRegisteredThread() < minArena.getNumRegisteredThread()) {
212+
minArena = arena;
213+
}
214+
}
215+
216+
arenaRegistry.get().bindArena(minArena);
217+
return minArena;
218+
}
219+
}
220+
221+
public void runGcEviction(long curGcTimePercent) {
222+
if (state.get() == BinaryAllocatorState.CLOSE) {
223+
return;
224+
}
225+
226+
LOGGER.debug("Binary allocator running GC eviction");
227+
if (state.get() == BinaryAllocatorState.PENDING) {
228+
if (curGcTimePercent <= RESTART_GC_TIME_PERCENTAGE) {
229+
start();
230+
}
231+
return;
232+
}
233+
234+
if (curGcTimePercent > SHUTDOWN_GC_TIME_PERCENTAGE) {
235+
LOGGER.info(
236+
"Binary allocator is shutting down because of high GC time percentage {}%.",
237+
curGcTimePercent);
238+
evict(1.0);
239+
close(false);
240+
} else if (curGcTimePercent > HALF_GC_TIME_PERCENTAGE) {
241+
evict(0.5);
242+
} else if (curGcTimePercent > WARNING_GC_TIME_PERCENTAGE) {
243+
evict(0.2);
244+
}
245+
}
246+
247+
public class SampleEvictor extends Evictor {
248+
249+
public SampleEvictor(String name, Duration evictorShutdownTimeoutDuration) {
250+
super(name, evictorShutdownTimeoutDuration);
251+
}
252+
253+
@Override
254+
public void run() {
255+
for (Arena arena : heapArenas) {
256+
arena.runSampleEviction();
257+
}
258+
}
259+
}
260+
}

0 commit comments

Comments
 (0)