Skip to content

Commit e99fc44

Browse files
authored
Pipe: Optimize Batch and WAL memory allocation algorithms (apache#15534)
1 parent 9b6b323 commit e99fc44

File tree

11 files changed

+772
-60
lines changed

11 files changed

+772
-60
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2323
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
2424
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
25-
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
25+
import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
26+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
27+
import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
2628
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
2729
import org.apache.iotdb.pipe.api.event.Event;
2830
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -38,14 +40,19 @@
3840
public abstract class PipeTabletEventBatch implements AutoCloseable {
3941

4042
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class);
43+
private static final PipeModelFixedMemoryBlock PIPE_MODEL_FIXED_MEMORY_BLOCK =
44+
PipeDataNodeResourceManager.memory()
45+
.forceAllocateForModelFixedMemoryBlock(
46+
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
47+
PipeMemoryBlockType.BATCH);
4148

4249
protected final List<EnrichedEvent> events = new ArrayList<>();
4350

4451
private final int maxDelayInMs;
4552
private long firstEventProcessingTime = Long.MIN_VALUE;
4653

4754
protected long totalBufferSize = 0;
48-
private final PipeMemoryBlock allocatedMemoryBlock;
55+
private final PipeDynamicMemoryBlock allocatedMemoryBlock;
4956

5057
protected volatile boolean isClosed = false;
5158

@@ -54,19 +61,8 @@ protected PipeTabletEventBatch(final int maxDelayInMs, final long requestMaxBatc
5461

5562
// limit in buffer size
5663
this.allocatedMemoryBlock =
57-
PipeDataNodeResourceManager.memory()
58-
.tryAllocate(requestMaxBatchSizeInBytes)
59-
.setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
60-
.setShrinkCallback(
61-
(oldMemory, newMemory) ->
62-
LOGGER.info(
63-
"The batch size limit has shrunk from {} to {}.", oldMemory, newMemory))
64-
.setExpandMethod(
65-
oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, requestMaxBatchSizeInBytes))
66-
.setExpandCallback(
67-
(oldMemory, newMemory) ->
68-
LOGGER.info(
69-
"The batch size limit has expanded from {} to {}.", oldMemory, newMemory));
64+
PIPE_MODEL_FIXED_MEMORY_BLOCK.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
65+
allocatedMemoryBlock.setExpandable(false);
7066

7167
if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
7268
LOGGER.info(
@@ -131,8 +127,12 @@ protected abstract boolean constructBatch(final TabletInsertionEvent event)
131127
throws WALPipeException, IOException;
132128

133129
public boolean shouldEmit() {
134-
return totalBufferSize >= getMaxBatchSizeInBytes()
135-
|| System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs;
130+
final long diff = System.currentTimeMillis() - firstEventProcessingTime;
131+
if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
132+
allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) diff / maxDelayInMs);
133+
return true;
134+
}
135+
return false;
136136
}
137137

138138
private long getMaxBatchSizeInBytes() {
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.resource.memory;
21+
22+
import org.apache.tsfile.utils.Pair;
23+
24+
import javax.validation.constraints.NotNull;
25+
26+
import java.util.function.Consumer;
27+
import java.util.stream.Stream;
28+
29+
public class PipeDynamicMemoryBlock {
30+
31+
private final PipeModelFixedMemoryBlock fixedMemoryBlock;
32+
33+
private boolean isExpandable = true;
34+
35+
private Consumer<PipeDynamicMemoryBlock> expand = null;
36+
37+
private volatile boolean released = false;
38+
39+
private volatile long memoryUsageInBytes;
40+
41+
private volatile double historyMemoryEfficiency;
42+
43+
private volatile double currentMemoryEfficiency;
44+
45+
PipeDynamicMemoryBlock(
46+
final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long memoryUsageInBytes) {
47+
this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
48+
this.fixedMemoryBlock = fixedMemoryBlock;
49+
}
50+
51+
public long getMemoryUsageInBytes() {
52+
return memoryUsageInBytes;
53+
}
54+
55+
public void setMemoryUsageInBytes(final long memoryUsageInBytes) {
56+
this.memoryUsageInBytes = memoryUsageInBytes;
57+
}
58+
59+
public Pair<Double, Double> getMemoryEfficiency() {
60+
synchronized (fixedMemoryBlock) {
61+
return new Pair<>(historyMemoryEfficiency, currentMemoryEfficiency);
62+
}
63+
}
64+
65+
public void setExpandable(boolean expandable) {
66+
isExpandable = expandable;
67+
}
68+
69+
public void setExpand(Consumer<PipeDynamicMemoryBlock> expand) {
70+
this.expand = expand;
71+
}
72+
73+
public double getMemoryBlockUsageRatio() {
74+
return (double) memoryUsageInBytes / fixedMemoryBlock.getMemoryUsageInBytes();
75+
}
76+
77+
public double getFixedMemoryBlockUsageRatio() {
78+
return (double) fixedMemoryBlock.getMemoryAllocatedInBytes()
79+
/ fixedMemoryBlock.getMemoryUsageInBytes();
80+
}
81+
82+
public long canAllocateMemorySize() {
83+
return fixedMemoryBlock.getMemoryUsageInBytes() - fixedMemoryBlock.getMemoryAllocatedInBytes();
84+
}
85+
86+
public synchronized long getExpectedAverageAllocatedMemorySize() {
87+
return fixedMemoryBlock.getMemoryUsageInBytes() / fixedMemoryBlock.getMemoryBlocks().size();
88+
}
89+
90+
public void updateCurrentMemoryEfficiencyAdjustMem(double currentMemoryEfficiency) {
91+
synchronized (fixedMemoryBlock) {
92+
this.historyMemoryEfficiency = this.currentMemoryEfficiency;
93+
if (Double.isNaN(currentMemoryEfficiency)
94+
|| Double.isInfinite(currentMemoryEfficiency)
95+
|| currentMemoryEfficiency < 0.0) {
96+
currentMemoryEfficiency = 0.0;
97+
}
98+
this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
99+
fixedMemoryBlock.dynamicallyAdjustMemory(this);
100+
}
101+
}
102+
103+
public long getFixedMemoryCapacity() {
104+
return fixedMemoryBlock.getMemoryUsageInBytes();
105+
}
106+
107+
public void updateMemoryEfficiency(
108+
double currentMemoryEfficiency, double historyMemoryEfficiency) {
109+
synchronized (fixedMemoryBlock) {
110+
if (Double.isNaN(currentMemoryEfficiency)
111+
|| Double.isInfinite(currentMemoryEfficiency)
112+
|| currentMemoryEfficiency < 0.0) {
113+
currentMemoryEfficiency = 0.0;
114+
}
115+
116+
if (Double.isNaN(historyMemoryEfficiency)
117+
|| Double.isInfinite(historyMemoryEfficiency)
118+
|| historyMemoryEfficiency < 0.0) {
119+
currentMemoryEfficiency = 0.0;
120+
}
121+
122+
this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);
123+
this.currentMemoryEfficiency = Math.min(currentMemoryEfficiency, 1.0);
124+
}
125+
}
126+
127+
public Stream<PipeDynamicMemoryBlock> getMemoryBlocks() {
128+
return fixedMemoryBlock.getMemoryBlocksStream();
129+
}
130+
131+
public void applyForDynamicMemory(final long memoryUsageInBytes) {
132+
fixedMemoryBlock.resetMemoryBlockSize(this, memoryUsageInBytes);
133+
}
134+
135+
public boolean isReleased() {
136+
return released;
137+
}
138+
139+
public void close() {
140+
if (released) {
141+
return;
142+
}
143+
synchronized (fixedMemoryBlock) {
144+
if (!released) {
145+
fixedMemoryBlock.releaseMemory(this);
146+
released = true;
147+
}
148+
}
149+
}
150+
151+
void doExpand() {
152+
if (isExpandable && expand != null) {
153+
expand.accept(this);
154+
}
155+
}
156+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlockType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@ public enum PipeMemoryBlockType {
2323
NORMAL,
2424
TABLET,
2525
TS_FILE,
26+
BATCH,
27+
WAL
2628
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2626
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2727
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
28+
import org.apache.iotdb.db.pipe.resource.memory.strategy.ThresholdAllocationStrategy;
2829

2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -100,6 +101,18 @@ private double allowedMaxMemorySizeInBytesOfTsTiles() {
100101
* getTotalNonFloatingMemorySizeInBytes();
101102
}
102103

104+
public long getAllocatedMemorySizeInBytesOfWAL() {
105+
return (long)
106+
(PIPE_CONFIG.getPipeDataStructureWalMemoryProportion()
107+
* getTotalNonFloatingMemorySizeInBytes());
108+
}
109+
110+
public long getAllocatedMemorySizeInBytesOfBatch() {
111+
return (long)
112+
(PIPE_CONFIG.getPipeDataStructureBatchMemoryProportion()
113+
* getTotalNonFloatingMemorySizeInBytes());
114+
}
115+
103116
public boolean isEnough4TabletParsing() {
104117
return (double) usedMemorySizeInBytesOfTablets + (double) usedMemorySizeInBytesOfTsFiles
105118
< EXCEED_PROTECT_THRESHOLD * allowedMaxMemorySizeInBytesOfTabletsAndTsFiles()
@@ -225,6 +238,39 @@ public PipeTsFileMemoryBlock forceAllocateForTsFileWithRetry(long tsFileSizeInBy
225238
}
226239
}
227240

241+
public PipeModelFixedMemoryBlock forceAllocateForModelFixedMemoryBlock(
242+
long fixedSizeInBytes, PipeMemoryBlockType type)
243+
throws PipeRuntimeOutOfMemoryCriticalException {
244+
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
245+
return new PipeModelFixedMemoryBlock(Long.MAX_VALUE, new ThresholdAllocationStrategy());
246+
}
247+
248+
if (fixedSizeInBytes == 0) {
249+
return (PipeModelFixedMemoryBlock) registerMemoryBlock(0, type);
250+
}
251+
252+
for (int i = 1, size = PIPE_CONFIG.getPipeMemoryAllocateMaxRetries(); i <= size; i++) {
253+
if (getFreeMemorySizeInBytes() >= fixedSizeInBytes) {
254+
break;
255+
}
256+
257+
try {
258+
Thread.sleep(PIPE_CONFIG.getPipeMemoryAllocateRetryIntervalInMs());
259+
} catch (InterruptedException ex) {
260+
Thread.currentThread().interrupt();
261+
LOGGER.warn("forceAllocateWithRetry: interrupted while waiting for available memory", ex);
262+
}
263+
}
264+
265+
if (getFreeMemorySizeInBytes() < fixedSizeInBytes) {
266+
return (PipeModelFixedMemoryBlock) forceAllocateWithRetry(getFreeMemorySizeInBytes(), type);
267+
}
268+
269+
synchronized (this) {
270+
return (PipeModelFixedMemoryBlock) forceAllocateWithRetry(fixedSizeInBytes, type);
271+
}
272+
}
273+
228274
private PipeMemoryBlock forceAllocateWithRetry(long sizeInBytes, PipeMemoryBlockType type)
229275
throws PipeRuntimeOutOfMemoryCriticalException {
230276
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
@@ -233,6 +279,9 @@ private PipeMemoryBlock forceAllocateWithRetry(long sizeInBytes, PipeMemoryBlock
233279
return new PipeTabletMemoryBlock(sizeInBytes);
234280
case TS_FILE:
235281
return new PipeTsFileMemoryBlock(sizeInBytes);
282+
case BATCH:
283+
case WAL:
284+
return new PipeModelFixedMemoryBlock(sizeInBytes, new ThresholdAllocationStrategy());
236285
default:
237286
return new PipeMemoryBlock(sizeInBytes);
238287
}
@@ -466,6 +515,11 @@ private PipeMemoryBlock registerMemoryBlock(long sizeInBytes, PipeMemoryBlockTyp
466515
case TS_FILE:
467516
returnedMemoryBlock = new PipeTsFileMemoryBlock(sizeInBytes);
468517
break;
518+
case BATCH:
519+
case WAL:
520+
returnedMemoryBlock =
521+
new PipeModelFixedMemoryBlock(sizeInBytes, new ThresholdAllocationStrategy());
522+
break;
469523
default:
470524
returnedMemoryBlock = new PipeMemoryBlock(sizeInBytes);
471525
break;

0 commit comments

Comments
 (0)