Skip to content

Commit 23068b4

Browse files
authored
Add memory metrics and refact memory allocation (apache#14710)
* fix memory concurrency problem * add memory related metric name and tag * add memory threshold code * add total * add part of write metrics * remove hard string code * fix error code(set storage engine size to schema engine * degrade metric level * fix error level * fix error type and add datanode device path cache * finish write part * fix format * fix name bug * move buffer pool to memtable * fix comment * optimize the code * add Query Memory Threshold(level=2) * add Schema Memory Threshold(level=2) * add Schema Memory Threshold(update comment) * move Consensus memory threshold * move Consensus memory threshold * split class to optimize code * Add Off Heap Memory Total * change metric name * add Metric memory_actual_size * init first version of memory manager and memory block * add IIoTDBMemoryBlock * fix template * rollback * rollback 2 * Rename IoTDBRuntimeOutOfMemoryException to IoTDBMemoryException * Rename IoTDBRuntimeOutOfMemoryException to IoTDBMemoryException * add version2 memory * add name of MemoryManager and MemoryBlock * add global memory manager * add search memory manager * add default method to get memory manager * modify storage engine * fix * update * update * fix init problem * fix init problem * TimePartitionInfo * Add TimePartitionInfo * Add Consensus Memory Manager * Add Pipe Memory Manager * Add Schema Memory Manager * add query memory manager * fix operators memory * move now into memory manager * add wal buffer queue size metrics * remove TestOnly Annotation * add comment and optimize code * split off heap * bug fix * move compaction into block * add metric of actual size * fix tag name * fix level * add actural size of memory of memtable * have a try * test * test * return * Revert "return" This reverts commit 16fd24e. * Revert "test" This reverts commit 3642591. * Revert "test" This reverts commit 1a54125. * Revert "have a try" This reverts commit f1d3e35. * Revert "add actural size of memory of memtable" This reverts commit 9f974e9. * add storage level=4 * fix wait-notify * fix wait-notify and statistic * spotless * init IT test * avoid notify * Fix bug in Memory Manager and Memory Block * Add License * Empty-Commit * rollback message in PipeMemoryManager * fix * Add some log * fix reclaim bug * avoid multi-create * remove debug log * fix typo * Modify TimePartitionMemoryController * Modify BloomFilterCache Memory Block * Modify ChunkCache Memory Block * Modify TimeSeriesMetadataCache Memory Block * Modify Operators Memory Block * Modify DataExchange Memory Block * spotless * use allocate method * Modify TimeIndex Memory Block * Update StorageEngineMemoryMetrics * Modify Gauge to AutoGauge * Fix error metric name * Update StreamEngineMetrics * Add Consensus Memory Block * Add Schema Memory Block * Fix NPE in consensus test * avoid same name memory block * Fix return bug in clearAll * Upgrade Metric Level * Fix info log * Fix info log * Fix concurrent problem * Fix conflict * Spotless * Fix method name * Fix bug around remove of memory block * solve all conflict * add simple auto * fix format * Fix start * Add Log * Add Memory Service * Change higher and lower * Add Config * Add test log * Add test log * Add test log * Fix auto * Change higher and lower * Change higher and lower * Change higher and lower and add check * Optimize judge * Fix pipe bug when merge * move some manager to memory config * move some manager to memory config(finish) * update format * Fix Agent * Fix IT * Fix Symbol * Fix Format * Add MemoryBlock UT Test * Add MemoryBlock UT Test * Update Auto Logic * Add Score Method * Move Logic * Remove to DataNodeMemoryConfig * Add Config * Move into IoTDBDescriptor * Add RAT * Change Name * Add Import * Add Import * Add Import * Fix Format * Fix Format * Fix Format * Fix IT/UT * Fix Some * Fix Before * Add ratio check * Reset * Reset and remove * Reset and remove * Reset and remove * Remove * Rename forceAllocate to exactAllocate * Rename registerMemoryBlock to getOrCreate * Update getOrCreateMemoryManager * Update getOrCreateMemoryManager * Update Memory Manager * Fix Log
1 parent 809ecd4 commit 23068b4

File tree

74 files changed

+3177
-1439
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+3177
-1439
lines changed

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.apache.iotdb.consensus.config;
2121

2222
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
23+
import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
24+
import org.apache.iotdb.commons.memory.IMemoryBlock;
2325

2426
import java.util.Optional;
2527
import java.util.concurrent.TimeUnit;
@@ -246,8 +248,8 @@ public static class Replication {
246248
private final long walThrottleThreshold;
247249
private final long throttleTimeOutMs;
248250
private final long checkpointGap;
249-
private final long allocateMemoryForConsensus;
250-
private final long allocateMemoryForQueue;
251+
private final IMemoryBlock consensusMemoryBlock;
252+
private final double maxMemoryRatioForQueue;
251253
private final long regionMigrationSpeedLimitBytesPerSecond;
252254

253255
private Replication(
@@ -262,7 +264,7 @@ private Replication(
262264
long walThrottleThreshold,
263265
long throttleTimeOutMs,
264266
long checkpointGap,
265-
long allocateMemoryForConsensus,
267+
IMemoryBlock consensusMemoryBlock,
266268
double maxMemoryRatioForQueue,
267269
long regionMigrationSpeedLimitBytesPerSecond) {
268270
this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
@@ -276,8 +278,8 @@ private Replication(
276278
this.walThrottleThreshold = walThrottleThreshold;
277279
this.throttleTimeOutMs = throttleTimeOutMs;
278280
this.checkpointGap = checkpointGap;
279-
this.allocateMemoryForConsensus = allocateMemoryForConsensus;
280-
this.allocateMemoryForQueue = (long) (allocateMemoryForConsensus * maxMemoryRatioForQueue);
281+
this.consensusMemoryBlock = consensusMemoryBlock;
282+
this.maxMemoryRatioForQueue = maxMemoryRatioForQueue;
281283
this.regionMigrationSpeedLimitBytesPerSecond = regionMigrationSpeedLimitBytesPerSecond;
282284
}
283285

@@ -325,12 +327,12 @@ public long getCheckpointGap() {
325327
return checkpointGap;
326328
}
327329

328-
public Long getAllocateMemoryForConsensus() {
329-
return allocateMemoryForConsensus;
330+
public IMemoryBlock getConsensusMemoryBlock() {
331+
return consensusMemoryBlock;
330332
}
331333

332-
public long getAllocateMemoryForQueue() {
333-
return allocateMemoryForQueue;
334+
public double getMaxMemoryRatioForQueue() {
335+
return maxMemoryRatioForQueue;
334336
}
335337

336338
public long getRegionMigrationSpeedLimitBytesPerSecond() {
@@ -355,7 +357,9 @@ public static class Builder {
355357
private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
356358
private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
357359
private long checkpointGap = 500;
358-
private long allocateMemoryForConsensus = Runtime.getRuntime().maxMemory() / 10;
360+
private IMemoryBlock consensusMemoryBlock =
361+
new AtomicLongMemoryBlock(
362+
"Consensus-Default", null, Runtime.getRuntime().maxMemory() / 10);
359363
private double maxMemoryRatioForQueue = 0.6;
360364
private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;
361365

@@ -416,8 +420,8 @@ public Builder setCheckpointGap(long checkpointGap) {
416420
return this;
417421
}
418422

419-
public Replication.Builder setAllocateMemoryForConsensus(long allocateMemoryForConsensus) {
420-
this.allocateMemoryForConsensus = allocateMemoryForConsensus;
423+
public Replication.Builder setConsensusMemoryBlock(IMemoryBlock consensusMemoryBlock) {
424+
this.consensusMemoryBlock = consensusMemoryBlock;
421425
return this;
422426
}
423427

@@ -445,7 +449,7 @@ public Replication build() {
445449
walThrottleThreshold,
446450
throttleTimeOutMs,
447451
checkpointGap,
448-
allocateMemoryForConsensus,
452+
consensusMemoryBlock,
449453
maxMemoryRatioForQueue,
450454
regionMigrationSpeedLimitBytesPerSecond);
451455
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ public IoTConsensus(ConsensusConfig config, Registry registry) {
125125
// init IoTConsensus memory manager
126126
IoTConsensusMemoryManager.getInstance()
127127
.init(
128-
config.getIotConsensusConfig().getReplication().getAllocateMemoryForConsensus(),
129-
config.getIotConsensusConfig().getReplication().getAllocateMemoryForQueue());
128+
config.getIotConsensusConfig().getReplication().getConsensusMemoryBlock(),
129+
config.getIotConsensusConfig().getReplication().getMaxMemoryRatioForQueue());
130130
// init IoTConsensus Rate Limiter
131131
IoTConsensusRateLimiter.getInstance()
132132
.init(

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,62 +19,44 @@
1919

2020
package org.apache.iotdb.consensus.iot.logdispatcher;
2121

22+
import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
23+
import org.apache.iotdb.commons.memory.IMemoryBlock;
2224
import org.apache.iotdb.commons.service.metric.MetricService;
2325

2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
2628

27-
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.atomic.AtomicLong;
2930

3031
public class IoTConsensusMemoryManager {
3132
private static final Logger logger = LoggerFactory.getLogger(IoTConsensusMemoryManager.class);
32-
private final AtomicLong memorySizeInByte = new AtomicLong(0);
3333
private final AtomicLong queueMemorySizeInByte = new AtomicLong(0);
3434
private final AtomicLong syncMemorySizeInByte = new AtomicLong(0);
35-
private Long maxMemorySizeInByte = Runtime.getRuntime().maxMemory() / 10;
36-
private Long maxMemorySizeForQueueInByte = Runtime.getRuntime().maxMemory() / 100 * 6;
35+
private IMemoryBlock memoryBlock =
36+
new AtomicLongMemoryBlock("Consensus-Default", null, Runtime.getRuntime().maxMemory() / 10);
37+
private Double maxMemoryRatioForQueue = 0.6;
3738

3839
private IoTConsensusMemoryManager() {
3940
MetricService.getInstance().addMetricSet(new IoTConsensusMemoryManagerMetrics(this));
4041
}
4142

4243
public boolean reserve(long size, boolean fromQueue) {
43-
AtomicBoolean result = new AtomicBoolean(false);
44-
memorySizeInByte.updateAndGet(
45-
memorySize -> {
46-
long remainSize =
47-
(fromQueue ? maxMemorySizeForQueueInByte : maxMemorySizeInByte) - memorySize;
48-
if (size > remainSize) {
49-
logger.debug(
50-
"consensus memory limited. required: {}, used: {}, total: {}",
51-
size,
52-
memorySize,
53-
maxMemorySizeInByte);
54-
result.set(false);
55-
return memorySize;
56-
} else {
57-
logger.debug(
58-
"{} add {} bytes, total memory size: {} bytes.",
59-
Thread.currentThread().getName(),
60-
size,
61-
memorySize + size);
62-
result.set(true);
63-
return memorySize + size;
64-
}
65-
});
66-
if (result.get()) {
44+
boolean result =
45+
fromQueue
46+
? memoryBlock.allocateIfSufficient(size, maxMemoryRatioForQueue)
47+
: memoryBlock.allocate(size);
48+
if (result) {
6749
if (fromQueue) {
6850
queueMemorySizeInByte.addAndGet(size);
6951
} else {
7052
syncMemorySizeInByte.addAndGet(size);
7153
}
7254
}
73-
return result.get();
55+
return result;
7456
}
7557

7658
public void free(long size, boolean fromQueue) {
77-
long currentUsedMemory = memorySizeInByte.addAndGet(-size);
59+
long currentUsedMemory = memoryBlock.release(size);
7860
if (fromQueue) {
7961
queueMemorySizeInByte.addAndGet(-size);
8062
} else {
@@ -87,13 +69,13 @@ public void free(long size, boolean fromQueue) {
8769
currentUsedMemory);
8870
}
8971

90-
public void init(long maxMemorySize, long maxMemorySizeForQueue) {
91-
this.maxMemorySizeInByte = maxMemorySize;
92-
this.maxMemorySizeForQueueInByte = maxMemorySizeForQueue;
72+
public void init(IMemoryBlock memoryBlock, double maxMemoryRatioForQueue) {
73+
this.memoryBlock = memoryBlock;
74+
this.maxMemoryRatioForQueue = maxMemoryRatioForQueue;
9375
}
9476

9577
long getMemorySizeInByte() {
96-
return memorySizeInByte.get();
78+
return memoryBlock.getUsedMemoryInBytes();
9779
}
9880

9981
long getQueueMemorySizeInByte() {

0 commit comments

Comments
 (0)