Skip to content

Commit a1fe241

Browse files
authored
Pipe: Avoided the OOM risks by replacing the progressive cheating factor with policy change (apache#16398)
1 parent 1d7e82d commit a1fe241

File tree

3 files changed

+86
-106
lines changed

3 files changed

+86
-106
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/PartialPathLastObjectCache.java

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.github.benmanes.caffeine.cache.Cache;
2727
import com.github.benmanes.caffeine.cache.Caffeine;
2828
import com.github.benmanes.caffeine.cache.Weigher;
29-
import com.google.common.util.concurrent.AtomicDouble;
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
3231

@@ -35,35 +34,11 @@ public abstract class PartialPathLastObjectCache<T> implements AutoCloseable {
3534
private static final Logger LOGGER = LoggerFactory.getLogger(PartialPathLastObjectCache.class);
3635

3736
private final PipeMemoryBlock allocatedMemoryBlock;
38-
// Used to adjust the memory usage of the cache
39-
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
4037

4138
private final Cache<String, T> partialPath2ObjectCache;
4239

4340
protected PartialPathLastObjectCache(final long memoryLimitInBytes) {
44-
allocatedMemoryBlock =
45-
PipeDataNodeResourceManager.memory()
46-
.tryAllocate(memoryLimitInBytes)
47-
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
48-
.setShrinkCallback(
49-
(oldMemory, newMemory) -> {
50-
memoryUsageCheatFactor.updateAndGet(
51-
factor -> factor * ((double) oldMemory / newMemory));
52-
LOGGER.info(
53-
"PartialPathLastObjectCache.allocatedMemoryBlock has shrunk from {} to {}.",
54-
oldMemory,
55-
newMemory);
56-
})
57-
.setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, memoryLimitInBytes))
58-
.setExpandCallback(
59-
(oldMemory, newMemory) -> {
60-
memoryUsageCheatFactor.updateAndGet(
61-
factor -> factor / ((double) newMemory / oldMemory));
62-
LOGGER.info(
63-
"PartialPathLastObjectCache.allocatedMemoryBlock has expanded from {} to {}.",
64-
oldMemory,
65-
newMemory);
66-
});
41+
allocatedMemoryBlock = PipeDataNodeResourceManager.memory().tryAllocate(memoryLimitInBytes);
6742

6843
// Currently disable the metric here because it's not a constant cache and the number may
6944
// fluctuate. In the future all the "processorCache"s may be recorded in single metric entry
@@ -75,16 +50,37 @@ protected PartialPathLastObjectCache(final long memoryLimitInBytes) {
7550
(Weigher<String, T>)
7651
(partialPath, object) -> {
7752
final long weightInLong =
78-
(long)
79-
((MemUtils.getStringMem(partialPath) + calculateMemoryUsage(object))
80-
* memoryUsageCheatFactor.get());
81-
if (weightInLong <= 0) {
82-
return Integer.MAX_VALUE;
83-
}
53+
MemUtils.getStringMem(partialPath) + calculateMemoryUsage(object);
8454
final int weightInInt = (int) weightInLong;
8555
return weightInInt != weightInLong ? Integer.MAX_VALUE : weightInInt;
8656
})
8757
.build();
58+
59+
allocatedMemoryBlock
60+
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
61+
.setShrinkCallback(
62+
(oldMemory, newMemory) -> {
63+
partialPath2ObjectCache
64+
.policy()
65+
.eviction()
66+
.ifPresent(eviction -> eviction.setMaximum(newMemory));
67+
LOGGER.info(
68+
"PartialPathLastObjectCache.allocatedMemoryBlock has shrunk from {} to {}.",
69+
oldMemory,
70+
newMemory);
71+
})
72+
.setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, memoryLimitInBytes))
73+
.setExpandCallback(
74+
(oldMemory, newMemory) -> {
75+
partialPath2ObjectCache
76+
.policy()
77+
.eviction()
78+
.ifPresent(eviction -> eviction.setMaximum(newMemory));
79+
LOGGER.info(
80+
"PartialPathLastObjectCache.allocatedMemoryBlock has expanded from {} to {}.",
81+
oldMemory,
82+
newMemory);
83+
});
8884
}
8985

9086
protected abstract long calculateMemoryUsage(final T object);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeCacheLeaderClientManager.java

Lines changed: 35 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.github.benmanes.caffeine.cache.Cache;
2828
import com.github.benmanes.caffeine.cache.Caffeine;
2929
import com.github.benmanes.caffeine.cache.Weigher;
30-
import com.google.common.util.concurrent.AtomicDouble;
3130
import org.slf4j.Logger;
3231
import org.slf4j.LoggerFactory;
3332

@@ -42,8 +41,6 @@ class LeaderCacheManager {
4241
private static final Logger LOGGER = LoggerFactory.getLogger(LeaderCacheManager.class);
4342
private static final PipeConfig CONFIG = PipeConfig.getInstance();
4443

45-
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
46-
4744
// leader cache built by LRU
4845
private final Cache<String, TEndPoint> device2endpoint;
4946
// a hashmap to reuse the created endpoint
@@ -55,52 +52,47 @@ public LeaderCacheManager() {
5552

5653
// properties required by pipe memory control framework
5754
final PipeMemoryBlock allocatedMemoryBlock =
58-
PipeDataNodeResourceManager.memory()
59-
.tryAllocate(initMemorySizeInBytes)
60-
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
61-
.setShrinkCallback(
62-
(oldMemory, newMemory) -> {
63-
memoryUsageCheatFactor.updateAndGet(
64-
factor -> factor * ((double) oldMemory / newMemory));
65-
LOGGER.info(
66-
"LeaderCacheManager.allocatedMemoryBlock has shrunk from {} to {}.",
67-
oldMemory,
68-
newMemory);
69-
})
70-
.setExpandMethod(
71-
oldMemory ->
72-
Math.min(
73-
Math.max(oldMemory, 1) * 2,
74-
(long)
75-
(PipeDataNodeResourceManager.memory()
76-
.getTotalNonFloatingMemorySizeInBytes()
77-
* CONFIG.getPipeLeaderCacheMemoryUsagePercentage())))
78-
.setExpandCallback(
79-
(oldMemory, newMemory) -> {
80-
memoryUsageCheatFactor.updateAndGet(
81-
factor -> factor / ((double) newMemory / oldMemory));
82-
LOGGER.info(
83-
"LeaderCacheManager.allocatedMemoryBlock has expanded from {} to {}.",
84-
oldMemory,
85-
newMemory);
86-
});
55+
PipeDataNodeResourceManager.memory().tryAllocate(initMemorySizeInBytes);
8756

8857
device2endpoint =
8958
Caffeine.newBuilder()
9059
.maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
91-
.weigher(
92-
(Weigher<String, TEndPoint>)
93-
(device, endPoint) -> {
94-
final long weightInLong =
95-
(long) (device.getBytes().length * memoryUsageCheatFactor.get());
96-
if (weightInLong <= 0) {
97-
return Integer.MAX_VALUE;
98-
}
99-
final int weightInInt = (int) weightInLong;
100-
return weightInInt != weightInLong ? Integer.MAX_VALUE : weightInInt;
101-
})
60+
.weigher((Weigher<String, TEndPoint>) (device, endPoint) -> device.getBytes().length)
10261
.recordStats()
10362
.build();
63+
64+
allocatedMemoryBlock
65+
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
66+
.setShrinkCallback(
67+
(oldMemory, newMemory) -> {
68+
device2endpoint
69+
.policy()
70+
.eviction()
71+
.ifPresent(eviction -> eviction.setMaximum(newMemory));
72+
LOGGER.info(
73+
"LeaderCacheManager.allocatedMemoryBlock has shrunk from {} to {}.",
74+
oldMemory,
75+
newMemory);
76+
})
77+
.setExpandMethod(
78+
oldMemory ->
79+
Math.min(
80+
Math.max(oldMemory, 1) * 2,
81+
(long)
82+
(PipeDataNodeResourceManager.memory()
83+
.getTotalNonFloatingMemorySizeInBytes()
84+
* CONFIG.getPipeLeaderCacheMemoryUsagePercentage())))
85+
.setExpandCallback(
86+
(oldMemory, newMemory) -> {
87+
device2endpoint
88+
.policy()
89+
.eviction()
90+
.ifPresent(eviction -> eviction.setMaximum(newMemory));
91+
LOGGER.info(
92+
"LeaderCacheManager.allocatedMemoryBlock has expanded from {} to {}.",
93+
oldMemory,
94+
newMemory);
95+
});
10496
}
10597

10698
public TEndPoint getLeaderEndPoint(final String deviceId) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/cache/SubscriptionPollResponseCache.java

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.github.benmanes.caffeine.cache.Caffeine;
2828
import com.github.benmanes.caffeine.cache.LoadingCache;
2929
import com.github.benmanes.caffeine.cache.Weigher;
30-
import com.google.common.util.concurrent.AtomicDouble;
3130
import org.slf4j.Logger;
3231
import org.slf4j.LoggerFactory;
3332

@@ -44,8 +43,6 @@ public class SubscriptionPollResponseCache {
4443

4544
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPollResponseCache.class);
4645

47-
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
48-
4946
private final LoadingCache<CachedSubscriptionPollResponse, ByteBuffer> cache;
5047

5148
public ByteBuffer serialize(final CachedSubscriptionPollResponse response) throws IOException {
@@ -113,41 +110,36 @@ private SubscriptionPollResponseCache() {
113110

114111
// properties required by pipe memory control framework
115112
final PipeMemoryBlock allocatedMemoryBlock =
116-
PipeDataNodeResourceManager.memory()
117-
.tryAllocate(initMemorySizeInBytes)
118-
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
119-
.setShrinkCallback(
120-
(oldMemory, newMemory) -> {
121-
memoryUsageCheatFactor.updateAndGet(
122-
factor -> factor * ((double) oldMemory / newMemory));
123-
LOGGER.info(
124-
"SubscriptionEventBinaryCache.allocatedMemoryBlock has shrunk from {} to {}.",
125-
oldMemory,
126-
newMemory);
127-
})
128-
.setExpandMethod(
129-
oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, maxMemorySizeInBytes))
130-
.setExpandCallback(
131-
(oldMemory, newMemory) -> {
132-
memoryUsageCheatFactor.updateAndGet(
133-
factor -> factor / ((double) newMemory / oldMemory));
134-
LOGGER.info(
135-
"SubscriptionEventBinaryCache.allocatedMemoryBlock has expanded from {} to {}.",
136-
oldMemory,
137-
newMemory);
138-
});
113+
PipeDataNodeResourceManager.memory().tryAllocate(initMemorySizeInBytes);
139114

140115
this.cache =
141116
Caffeine.newBuilder()
142117
.maximumWeight(allocatedMemoryBlock.getMemoryUsageInBytes())
143118
.weigher(
144119
(Weigher<CachedSubscriptionPollResponse, ByteBuffer>)
145-
(message, buffer) -> {
146-
// TODO: overflow
147-
return (int) (buffer.capacity() * memoryUsageCheatFactor.get());
148-
})
120+
(message, buffer) -> buffer.capacity())
149121
.recordStats() // TODO: metrics
150122
// NOTE: lambda CAN NOT be replaced with method reference
151-
.build(response -> CachedSubscriptionPollResponse.serialize(response));
123+
.build(CachedSubscriptionPollResponse::serialize);
124+
125+
allocatedMemoryBlock
126+
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 1))
127+
.setShrinkCallback(
128+
(oldMemory, newMemory) -> {
129+
cache.policy().eviction().ifPresent(eviction -> eviction.setMaximum(newMemory));
130+
LOGGER.info(
131+
"SubscriptionEventBinaryCache.allocatedMemoryBlock has shrunk from {} to {}.",
132+
oldMemory,
133+
newMemory);
134+
})
135+
.setExpandMethod(oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, maxMemorySizeInBytes))
136+
.setExpandCallback(
137+
(oldMemory, newMemory) -> {
138+
cache.policy().eviction().ifPresent(eviction -> eviction.setMaximum(newMemory));
139+
LOGGER.info(
140+
"SubscriptionEventBinaryCache.allocatedMemoryBlock has expanded from {} to {}.",
141+
oldMemory,
142+
newMemory);
143+
});
152144
}
153145
}

0 commit comments

Comments
 (0)