Skip to content

Commit 7b588c8

Browse files
authored
Pipe: fix threshold judgment for tablet and tsfile memory block & Subscription: close data container for current PipeTsFileInsertionEvent in batch (apache#14901)
1 parent 4aadc09 commit 7b588c8

File tree

3 files changed

+74
-22
lines changed

3 files changed

+74
-22
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ public class PipeResourceMetrics implements IMetricSet {
3535

3636
private static final String PIPE_USED_MEMORY = "PipeUsedMemory";
3737

38+
private static final String PIPE_TABLET_USED_MEMORY = "PipeTabletUsedMemory";
39+
40+
private static final String PIPE_TS_FILE_USED_MEMORY = "PipeTsFileUsedMemory";
41+
3842
private static final String PIPE_TOTAL_MEMORY = "PipeTotalMemory";
3943

4044
//////////////////////////// bindTo & unbindFrom (metric framework) ////////////////////////////
@@ -49,6 +53,20 @@ public void bindTo(final AbstractMetricService metricService) {
4953
PipeMemoryManager::getUsedMemorySizeInBytes,
5054
Tag.NAME.toString(),
5155
PIPE_USED_MEMORY);
56+
metricService.createAutoGauge(
57+
Metric.PIPE_MEM.toString(),
58+
MetricLevel.IMPORTANT,
59+
PipeDataNodeResourceManager.memory(),
60+
PipeMemoryManager::getUsedMemorySizeInBytesOfTablets,
61+
Tag.NAME.toString(),
62+
PIPE_TABLET_USED_MEMORY);
63+
metricService.createAutoGauge(
64+
Metric.PIPE_MEM.toString(),
65+
MetricLevel.IMPORTANT,
66+
PipeDataNodeResourceManager.memory(),
67+
PipeMemoryManager::getUsedMemorySizeInBytesOfTsFiles,
68+
Tag.NAME.toString(),
69+
PIPE_TS_FILE_USED_MEMORY);
5270
metricService.createAutoGauge(
5371
Metric.PIPE_MEM.toString(),
5472
MetricLevel.IMPORTANT,
@@ -85,6 +103,16 @@ public void unbindFrom(final AbstractMetricService metricService) {
85103
// pipe memory related
86104
metricService.remove(
87105
MetricType.AUTO_GAUGE, Metric.PIPE_MEM.toString(), Tag.NAME.toString(), PIPE_USED_MEMORY);
106+
metricService.remove(
107+
MetricType.AUTO_GAUGE,
108+
Metric.PIPE_MEM.toString(),
109+
Tag.NAME.toString(),
110+
PIPE_TABLET_USED_MEMORY);
111+
metricService.remove(
112+
MetricType.AUTO_GAUGE,
113+
Metric.PIPE_MEM.toString(),
114+
Tag.NAME.toString(),
115+
PIPE_TS_FILE_USED_MEMORY);
88116
metricService.remove(
89117
MetricType.AUTO_GAUGE, Metric.PIPE_MEM.toString(), Tag.NAME.toString(), PIPE_TOTAL_MEMORY);
90118
// resource reference count

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class PipeMemoryManager {
5353

5454
private long usedMemorySizeInBytes;
5555

56+
private static final double EXCEED_PROTECT_THRESHOLD = 0.95;
57+
5658
// To avoid too much parsed events causing OOM. If total tablet memory size exceeds this
5759
// threshold, allocations of memory block for tablets will be rejected.
5860
private static final double TABLET_MEMORY_REJECT_THRESHOLD =
@@ -76,43 +78,53 @@ public PipeMemoryManager() {
7678

7779
// NOTE: Here we unify the memory threshold judgment for tablet and tsfile memory block, because
7880
// introducing too many heuristic rules not conducive to flexible dynamic adjustment of memory
79-
// configuration.
80-
81-
// Proportion of Memory Occupied by Tablet Memory Block: [TABLET_MEMORY_REJECT_THRESHOLD / 2,
81+
// configuration:
82+
// 1. Proportion of memory occupied by tablet memory block: [TABLET_MEMORY_REJECT_THRESHOLD / 2,
8283
// TABLET_MEMORY_REJECT_THRESHOLD + TS_FILE_MEMORY_REJECT_THRESHOLD / 2]
83-
// Proportion of Memory Occupied by TsFile Memory Block: [TS_FILE_MEMORY_REJECT_THRESHOLD / 2,
84+
// 2. Proportion of memory occupied by tsfile memory block: [TS_FILE_MEMORY_REJECT_THRESHOLD / 2,
8485
// TS_FILE_MEMORY_REJECT_THRESHOLD + TABLET_MEMORY_REJECT_THRESHOLD / 2]
86+
// 3. The sum of the memory proportion occupied by the tablet memory block and the tsfile memory
87+
// block does not exceed TABLET_MEMORY_REJECT_THRESHOLD + TS_FILE_MEMORY_REJECT_THRESHOLD
88+
89+
private static double allowedMaxMemorySizeInBytesOfTabletsAndTsFiles() {
90+
return (TABLET_MEMORY_REJECT_THRESHOLD + TS_FILE_MEMORY_REJECT_THRESHOLD)
91+
* TOTAL_MEMORY_SIZE_IN_BYTES;
92+
}
93+
94+
private static double allowedMaxMemorySizeInBytesOfTablets() {
95+
return (TABLET_MEMORY_REJECT_THRESHOLD + TS_FILE_MEMORY_REJECT_THRESHOLD / 2)
96+
* TOTAL_MEMORY_SIZE_IN_BYTES;
97+
}
98+
99+
private static double allowedMaxMemorySizeInBytesOfTsTiles() {
100+
return (TS_FILE_MEMORY_REJECT_THRESHOLD + TABLET_MEMORY_REJECT_THRESHOLD / 2)
101+
* TOTAL_MEMORY_SIZE_IN_BYTES;
102+
}
85103

86104
public boolean isEnough4TabletParsing() {
87105
return (double) usedMemorySizeInBytesOfTablets + (double) usedMemorySizeInBytesOfTsFiles
88-
< 0.95 * TABLET_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES
89-
+ 0.95 * TS_FILE_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES
106+
< EXCEED_PROTECT_THRESHOLD * allowedMaxMemorySizeInBytesOfTabletsAndTsFiles()
90107
&& (double) usedMemorySizeInBytesOfTablets
91-
< 0.95 * TABLET_MEMORY_REJECT_THRESHOLD / 2 * TOTAL_MEMORY_SIZE_IN_BYTES;
108+
< EXCEED_PROTECT_THRESHOLD * allowedMaxMemorySizeInBytesOfTablets();
92109
}
93110

94111
private boolean isHardEnough4TabletParsing() {
95112
return (double) usedMemorySizeInBytesOfTablets + (double) usedMemorySizeInBytesOfTsFiles
96-
< TABLET_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES
97-
+ TS_FILE_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES
98-
&& (double) usedMemorySizeInBytesOfTablets
99-
< TABLET_MEMORY_REJECT_THRESHOLD / 2 * TOTAL_MEMORY_SIZE_IN_BYTES;
113+
< allowedMaxMemorySizeInBytesOfTabletsAndTsFiles()
114+
&& (double) usedMemorySizeInBytesOfTablets < allowedMaxMemorySizeInBytesOfTablets();
100115
}
101116

102117
public boolean isEnough4TsFileSlicing() {
103118
return (double) usedMemorySizeInBytesOfTablets + (double) usedMemorySizeInBytesOfTsFiles
104-
< 0.95 * TABLET_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES
105-
+ 0.95 * TS_FILE_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES
119+
< EXCEED_PROTECT_THRESHOLD * allowedMaxMemorySizeInBytesOfTabletsAndTsFiles()
106120
&& (double) usedMemorySizeInBytesOfTsFiles
107-
< 0.95 * TS_FILE_MEMORY_REJECT_THRESHOLD / 2 * TOTAL_MEMORY_SIZE_IN_BYTES;
121+
< EXCEED_PROTECT_THRESHOLD * allowedMaxMemorySizeInBytesOfTsTiles();
108122
}
109123

110124
private boolean isHardEnough4TsFileSlicing() {
111125
return (double) usedMemorySizeInBytesOfTablets + (double) usedMemorySizeInBytesOfTsFiles
112-
< TABLET_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES
113-
+ TS_FILE_MEMORY_REJECT_THRESHOLD * TOTAL_MEMORY_SIZE_IN_BYTES
114-
&& (double) usedMemorySizeInBytesOfTsFiles
115-
< TS_FILE_MEMORY_REJECT_THRESHOLD / 2 * TOTAL_MEMORY_SIZE_IN_BYTES;
126+
< allowedMaxMemorySizeInBytesOfTabletsAndTsFiles()
127+
&& (double) usedMemorySizeInBytesOfTsFiles < allowedMaxMemorySizeInBytesOfTsTiles();
116128
}
117129

118130
public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
@@ -144,8 +156,8 @@ public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBy
144156
throw new PipeRuntimeOutOfMemoryCriticalException(
145157
String.format(
146158
"forceAllocateForTablet: failed to allocate because there's too much memory for tablets, "
147-
+ "total memory size %d bytes, used memory for tablet size %d bytes",
148-
TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTablets));
159+
+ "total memory size %d bytes, used memory for tablet size %d bytes, requested memory size %d bytes",
160+
TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTablets, tabletSizeInBytes));
149161
}
150162

151163
synchronized (this) {
@@ -179,8 +191,8 @@ public PipeTsFileMemoryBlock forceAllocateForTsFileWithRetry(long tsFileSizeInBy
179191
throw new PipeRuntimeOutOfMemoryCriticalException(
180192
String.format(
181193
"forceAllocateForTsFile: failed to allocate because there's too much memory for tsfiles, "
182-
+ "total memory size %d bytes, used memory for tsfile size %d bytes",
183-
TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTsFiles));
194+
+ "total memory size %d bytes, used memory for tsfile size %d bytes, requested memory size %d bytes",
195+
TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTsFiles, tsFileSizeInBytes));
184196
}
185197

186198
synchronized (this) {
@@ -526,6 +538,14 @@ public long getUsedMemorySizeInBytes() {
526538
return usedMemorySizeInBytes;
527539
}
528540

541+
public long getUsedMemorySizeInBytesOfTablets() {
542+
return usedMemorySizeInBytesOfTablets;
543+
}
544+
545+
public long getUsedMemorySizeInBytesOfTsFiles() {
546+
return usedMemorySizeInBytesOfTsFiles;
547+
}
548+
529549
public long getFreeMemorySizeInBytes() {
530550
return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes;
531551
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ public synchronized SubscriptionPipeTabletIterationSnapshot sendIterationSnapsho
219219
public synchronized void resetForIteration() {
220220
currentEnrichedEventsIterator = enrichedEvents.iterator();
221221
currentTabletInsertionEventsIterator = null;
222+
if (Objects.nonNull(currentTsFileInsertionEvent)
223+
&& currentTsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
224+
((PipeTsFileInsertionEvent) currentTsFileInsertionEvent).close();
225+
}
222226
currentTsFileInsertionEvent = null;
223227

224228
if (Objects.nonNull(iterationSnapshot)) {

0 commit comments

Comments
 (0)