Skip to content

Commit 761f4e7

Browse files
authored
Pipe: Fixed the problem of not being able to write normally due to insufficient memory (apache#15701)
* Pipe: Fixed the problem of not being able to write normally due to insufficient memory * fix * fix * fix
1 parent bec0409 commit 761f4e7

File tree

9 files changed

+30
-133
lines changed

9 files changed

+30
-133
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,6 @@ public class IoTDBConfig {
151151

152152
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
153153

154-
private long allocateMemoryPerWalCache = 512 * 1024;
155-
156154
/** Flush proportion for system */
157155
private double flushProportion = 0.4;
158156

@@ -2004,14 +2002,6 @@ public void setWriteMemoryVariationReportProportion(double writeMemoryVariationR
20042002
this.writeMemoryVariationReportProportion = writeMemoryVariationReportProportion;
20052003
}
20062004

2007-
public long getAllocateMemoryPerWalCache() {
2008-
return allocateMemoryPerWalCache;
2009-
}
2010-
2011-
public void setAllocateMemoryPerWalCache(final long allocateMemoryForWalCache) {
2012-
this.allocateMemoryPerWalCache = allocateMemoryForWalCache;
2013-
}
2014-
20152005
public boolean isEnablePartialInsert() {
20162006
return enablePartialInsert;
20172007
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2458,12 +2458,6 @@ private void loadPipeProps(TrimProperties properties) {
24582458
conf.setIotConsensusV2DeletionFileDir(
24592459
properties.getProperty(
24602460
"iot_consensus_v2_deletion_file_dir", conf.getIotConsensusV2DeletionFileDir()));
2461-
2462-
conf.setAllocateMemoryPerWalCache(
2463-
Long.parseLong(
2464-
properties.getProperty(
2465-
"allocate_memory_per_wal_cache",
2466-
Long.toString(conf.getAllocateMemoryPerWalCache()))));
24672461
}
24682462

24692463
private void loadCQProps(TrimProperties properties) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeWALInsertNodeCacheMetrics.java

Lines changed: 9 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -27,117 +27,43 @@
2727
import org.apache.iotdb.metrics.utils.MetricLevel;
2828
import org.apache.iotdb.metrics.utils.MetricType;
2929

30-
import com.google.common.collect.ImmutableSet;
31-
import org.checkerframework.checker.nullness.qual.NonNull;
3230
import org.slf4j.Logger;
3331
import org.slf4j.LoggerFactory;
3432

35-
import java.util.Map;
36-
import java.util.Objects;
37-
import java.util.concurrent.ConcurrentHashMap;
38-
3933
public class PipeWALInsertNodeCacheMetrics implements IMetricSet {
4034

4135
private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALInsertNodeCacheMetrics.class);
4236

43-
@SuppressWarnings("java:S3077")
44-
private volatile AbstractMetricService metricService;
45-
46-
private final Map<Integer, WALInsertNodeCache> cacheMap = new ConcurrentHashMap<>();
47-
4837
//////////////////////////// bindTo & unbindFrom (metric framework) ////////////////////////////
4938

5039
@Override
5140
public void bindTo(AbstractMetricService metricService) {
52-
this.metricService = metricService;
53-
ImmutableSet<Integer> dataRegionIds = ImmutableSet.copyOf(cacheMap.keySet());
54-
for (Integer dataRegionId : dataRegionIds) {
55-
createMetrics(dataRegionId);
56-
}
57-
}
58-
59-
private void createMetrics(Integer dataRegionId) {
60-
createAutoGauge(dataRegionId);
61-
}
62-
63-
private void createAutoGauge(Integer dataRegionId) {
6441
metricService.createAutoGauge(
6542
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
6643
MetricLevel.IMPORTANT,
67-
cacheMap.get(dataRegionId),
68-
WALInsertNodeCache::getCacheHitRate,
69-
Tag.REGION.toString(),
70-
String.valueOf(dataRegionId));
44+
WALInsertNodeCache.getInstance(),
45+
WALInsertNodeCache::getCacheHitRate);
7146
metricService.createAutoGauge(
7247
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
7348
MetricLevel.IMPORTANT,
74-
cacheMap.get(dataRegionId),
75-
WALInsertNodeCache::getCacheHitCount,
76-
Tag.REGION.toString(),
77-
String.valueOf(dataRegionId));
49+
WALInsertNodeCache.getInstance(),
50+
WALInsertNodeCache::getCacheHitCount);
7851
metricService.createAutoGauge(
7952
Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
8053
MetricLevel.IMPORTANT,
81-
cacheMap.get(dataRegionId),
54+
WALInsertNodeCache.getInstance(),
8255
WALInsertNodeCache::getCacheRequestCount,
83-
Tag.REGION.toString(),
84-
String.valueOf(dataRegionId));
56+
Tag.REGION.toString());
8557
}
8658

8759
@Override
8860
public void unbindFrom(AbstractMetricService metricService) {
89-
ImmutableSet<Integer> dataRegionIds = ImmutableSet.copyOf(cacheMap.keySet());
90-
for (Integer dataRegionId : dataRegionIds) {
91-
deregister(dataRegionId);
92-
}
93-
if (!cacheMap.isEmpty()) {
94-
LOGGER.warn("Failed to unbind from wal insert node cache metrics, cache map not empty");
95-
}
96-
}
97-
98-
private void removeMetrics(Integer dataRegionId) {
99-
removeAutoGauge(dataRegionId);
100-
}
101-
102-
private void removeAutoGauge(Integer dataRegionId) {
10361
metricService.remove(
104-
MetricType.AUTO_GAUGE,
105-
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
106-
Tag.REGION.toString(),
107-
String.valueOf(dataRegionId));
62+
MetricType.AUTO_GAUGE, Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString());
10863
metricService.remove(
109-
MetricType.AUTO_GAUGE,
110-
Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString(),
111-
Tag.REGION.toString(),
112-
String.valueOf(dataRegionId));
64+
MetricType.AUTO_GAUGE, Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_COUNT.toString());
11365
metricService.remove(
114-
MetricType.AUTO_GAUGE,
115-
Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString(),
116-
Tag.REGION.toString(),
117-
String.valueOf(dataRegionId));
118-
}
119-
120-
//////////////////////////// register & deregister (pipe integration) ////////////////////////////
121-
122-
public void register(@NonNull WALInsertNodeCache walInsertNodeCache, Integer dataRegionId) {
123-
cacheMap.putIfAbsent(dataRegionId, walInsertNodeCache);
124-
if (Objects.nonNull(metricService)) {
125-
createMetrics(dataRegionId);
126-
}
127-
}
128-
129-
public void deregister(Integer dataRegionId) {
130-
// TODO: waiting called by WALInsertNodeCache
131-
if (!cacheMap.containsKey(dataRegionId)) {
132-
LOGGER.warn(
133-
"Failed to deregister wal insert node cache metrics, WALInsertNodeCache({}) does not exist",
134-
dataRegionId);
135-
return;
136-
}
137-
if (Objects.nonNull(metricService)) {
138-
removeMetrics(dataRegionId);
139-
}
140-
cacheMap.remove(dataRegionId);
66+
MetricType.AUTO_GAUGE, Metric.PIPE_WAL_INSERT_NODE_CACHE_REQUEST_COUNT.toString());
14167
}
14268

14369
//////////////////////////// singleton ////////////////////////////

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public void pinMemTable(long memTableId) throws MemTablePinException {
279279
}
280280
MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
281281
if (!memTableInfo.isPinned()) {
282-
WALInsertNodeCache.getInstance(memTableInfo.getDataRegionId()).addMemTable(memTableId);
282+
WALInsertNodeCache.getInstance().addMemTable(memTableId);
283283
}
284284
memTableInfo.pin();
285285
} finally {
@@ -309,7 +309,7 @@ public void unpinMemTable(long memTableId) throws MemTablePinException {
309309
MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
310310
memTableInfo.unpin();
311311
if (!memTableInfo.isPinned()) {
312-
WALInsertNodeCache.getInstance(memTableInfo.getDataRegionId()).removeMemTable(memTableId);
312+
WALInsertNodeCache.getInstance().removeMemTable(memTableId);
313313
if (memTableInfo.isFlushed()) {
314314
memTableId2Info.remove(memTableId);
315315
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public boolean isInSealedFile() {
190190
public void setWalNode(WALNode walNode, long memTableId) {
191191
this.walNode = walNode;
192192
identifier = walNode.getIdentifier();
193-
cache = WALInsertNodeCache.getInstance(walNode.getRegionId(memTableId));
193+
cache = WALInsertNodeCache.getInstance();
194194
}
195195

196196
public String getIdentifier() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
2525
import org.apache.iotdb.db.conf.IoTDBConfig;
2626
import org.apache.iotdb.db.conf.IoTDBDescriptor;
27-
import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
2827
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
2928
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
3029
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
@@ -39,7 +38,6 @@
3938
import com.github.benmanes.caffeine.cache.Caffeine;
4039
import com.github.benmanes.caffeine.cache.LoadingCache;
4140
import com.github.benmanes.caffeine.cache.Weigher;
42-
import com.google.common.util.concurrent.AtomicDouble;
4341
import org.apache.tsfile.utils.Pair;
4442
import org.checkerframework.checker.nullness.qual.NonNull;
4543
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -65,10 +63,6 @@ public class WALInsertNodeCache {
6563

6664
private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
6765

68-
private final PipeModelFixedMemoryBlock memoryBlock;
69-
70-
// Used to adjust the memory usage of the cache
71-
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
7266
// LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
7367
private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> lruCache;
7468

@@ -77,16 +71,15 @@ public class WALInsertNodeCache {
7771

7872
private volatile boolean hasPipeRunning = false;
7973

80-
private WALInsertNodeCache(final Integer dataRegionId) {
74+
private WALInsertNodeCache() {
8175
if (walModelFixedMemory == null) {
8276
init();
8377
}
8478

85-
final long requestedAllocateSize = CONFIG.getAllocateMemoryPerWalCache();
86-
87-
memoryBlock =
88-
PipeDataNodeResourceManager.memory()
89-
.forceAllocateForModelFixedMemoryBlock(requestedAllocateSize, PipeMemoryBlockType.WAL);
79+
final long requestedAllocateSize =
80+
(long)
81+
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
82+
* PIPE_CONFIG.getPipeDataStructureWalMemoryProportion());
9083

9184
lruCache =
9285
Caffeine.newBuilder()
@@ -96,12 +89,9 @@ private WALInsertNodeCache(final Integer dataRegionId) {
9689
(position, pair) -> {
9790
long weightInLong = 0L;
9891
if (pair.right != null) {
99-
weightInLong =
100-
(long)
101-
(InsertNodeMemoryEstimator.sizeOf(pair.right)
102-
* memoryUsageCheatFactor.get());
92+
weightInLong = InsertNodeMemoryEstimator.sizeOf(pair.right);
10393
} else {
104-
weightInLong = (long) (position.getSize() * memoryUsageCheatFactor.get());
94+
weightInLong = position.getSize();
10595
}
10696
if (weightInLong <= 0) {
10797
return Integer.MAX_VALUE;
@@ -111,8 +101,6 @@ private WALInsertNodeCache(final Integer dataRegionId) {
111101
})
112102
.recordStats()
113103
.build(new WALInsertNodeCacheLoader());
114-
115-
PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
116104
}
117105

118106
// please call this method at PipeLauncher
@@ -124,7 +112,11 @@ public static void init() {
124112
// Allocate memory for the fixed memory block of WAL
125113
walModelFixedMemory =
126114
PipeDataNodeResourceManager.memory()
127-
.forceAllocateForModelFixedMemoryBlock(0L, PipeMemoryBlockType.WAL);
115+
.forceAllocateForModelFixedMemoryBlock(
116+
(long)
117+
(PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
118+
* PIPE_CONFIG.getPipeDataStructureWalMemoryProportion()),
119+
PipeMemoryBlockType.WAL);
128120
} catch (Exception e) {
129121
LOGGER.error("Failed to initialize WAL model fixed memory block", e);
130122
walModelFixedMemory =
@@ -318,17 +310,13 @@ class WALInsertNodeCacheLoader
318310

319311
/////////////////////////// Singleton ///////////////////////////
320312

321-
public static WALInsertNodeCache getInstance(final Integer regionId) {
322-
return InstanceHolder.getOrCreateInstance(regionId);
313+
public static WALInsertNodeCache getInstance() {
314+
return InstanceHolder.INSTANCE;
323315
}
324316

325317
private static class InstanceHolder {
326318

327-
private static final Map<Integer, WALInsertNodeCache> INSTANCE_MAP = new ConcurrentHashMap<>();
328-
329-
public static WALInsertNodeCache getOrCreateInstance(final Integer key) {
330-
return INSTANCE_MAP.computeIfAbsent(key, k -> new WALInsertNodeCache(key));
331-
}
319+
public static final WALInsertNodeCache INSTANCE = new WALInsertNodeCache();
332320

333321
private InstanceHolder() {
334322
// forbidding instantiation
@@ -345,7 +333,6 @@ boolean contains(WALEntryPosition position) {
345333
@TestOnly
346334
public void clear() {
347335
lruCache.invalidateAll();
348-
memoryBlock.close();
349336
memTablesNeedSearch.clear();
350337
}
351338
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void tearDown() throws Exception {
9494
config.setWalMode(prevMode);
9595
EnvironmentUtils.cleanDir(logDirectory1);
9696
EnvironmentUtils.cleanDir(logDirectory2);
97-
WALInsertNodeCache.getInstance(1).clear();
97+
WALInsertNodeCache.getInstance().clear();
9898
}
9999

100100
@Test(expected = MemTablePinException.class)

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void tearDown() throws Exception {
8888
config.setDataRegionConsensusProtocolClass(prevConsensus);
8989
EnvironmentUtils.cleanDir(logDirectory1);
9090
StorageEngine.getInstance().reset();
91-
WALInsertNodeCache.getInstance(1).clear();
91+
WALInsertNodeCache.getInstance().clear();
9292
}
9393

9494
/**

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class WALInsertNodeCacheTest {
5454
private static final String databasePath = "root.test_sg";
5555
private static final String devicePath = databasePath + ".test_d";
5656
private static final String dataRegionId = "1";
57-
private static final WALInsertNodeCache cache = WALInsertNodeCache.getInstance(1);
57+
private static final WALInsertNodeCache cache = WALInsertNodeCache.getInstance();
5858
private WALMode prevMode;
5959
private WALNode walNode;
6060

0 commit comments

Comments
 (0)