Skip to content

Commit ba26460

Browse files
Pipe: Introduce restart strategy to control resources' memory only used by pipe hardlinked files (apache#14279)
1 parent d101d76 commit ba26460

File tree

5 files changed

+43
-6
lines changed

5 files changed

+43
-6
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,19 @@ private Set<PipeMeta> findAllStuckPipes() {
497497
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
498498
stuckPipes.add(pipeMeta);
499499
}
500+
LOGGER.warn(
501+
"All {} pipe(s) will be restarted because of forced restart policy.", stuckPipes.size());
502+
return stuckPipes;
503+
}
504+
505+
if (3 * PipeDataNodeResourceManager.tsfile().getTotalLinkedButDeletedTsfileResourceRamSize()
506+
>= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()) {
507+
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
508+
stuckPipes.add(pipeMeta);
509+
}
510+
LOGGER.warn(
511+
"All {} pipe(s) will be restarted because linked tsfiles' resource size exceeds memory limit.",
512+
stuckPipes.size());
500513
return stuckPipes;
501514
}
502515

@@ -527,7 +540,7 @@ && mayDeletedTsFileSizeReachDangerousThreshold()) {
527540
continue;
528541
}
529542

530-
// Only restart the stream mode pipes for releasing memTables.
543+
// Try to restart the stream mode pipes for releasing memTables.
531544
if (extractors.get(0).isStreamMode()) {
532545
if (extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
533546
&& (mayMemTablePinnedCountReachDangerousThreshold()
@@ -538,8 +551,7 @@ && mayDeletedTsFileSizeReachDangerousThreshold()) {
538551
pipeMeta.getStaticMeta());
539552
stuckPipes.add(pipeMeta);
540553
} else if (getFloatingMemoryUsageInByte(pipeName)
541-
>= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
542-
- PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
554+
>= PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes()
543555
/ pipeMetaKeeper.getPipeMetaCount()) {
544556
// Extractors of this pipe may have too many insert nodes
545557
LOGGER.warn(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,7 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold() {
250250
return 3
251251
* PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName)
252252
* PipeDataNodeAgent.task().getPipeCount()
253-
>= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
254-
- PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
255-
* 2;
253+
>= 2 * PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes();
256254
}
257255

258256
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,10 @@ public long getUsedMemorySizeInBytes() {
499499
return usedMemorySizeInBytes;
500500
}
501501

502+
public long getFreeMemorySizeInBytes() {
503+
return TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes;
504+
}
505+
502506
public long getTotalMemorySizeInBytes() {
503507
return TOTAL_MEMORY_SIZE_IN_BYTES;
504508
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ public long getFileSize() {
9797
return fileSize;
9898
}
9999

100+
public long getTsFileResourceSize() {
101+
return Objects.nonNull(tsFileResource) ? tsFileResource.calculateRamSize() : 0;
102+
}
103+
100104
///////////////////// Reference Count /////////////////////
101105

102106
public int getReferenceCount() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,4 +355,23 @@ public long getTotalLinkedButDeletedTsfileSize() {
355355
return 0;
356356
}
357357
}
358+
359+
public long getTotalLinkedButDeletedTsfileResourceRamSize() {
360+
long totalLinkedButDeletedTsfileResourceRamSize = 0;
361+
try {
362+
for (final Map.Entry<String, PipeTsFileResource> resourceEntry :
363+
hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet()) {
364+
final PipeTsFileResource pipeTsFileResource = resourceEntry.getValue();
365+
// If the original TsFile is not deleted, the memory of the resource is not counted
366+
// because the memory of the resource is controlled by TsFileResourceManager.
367+
if (pipeTsFileResource.isOriginalTsFileDeleted()) {
368+
totalLinkedButDeletedTsfileResourceRamSize += pipeTsFileResource.getTsFileResourceSize();
369+
}
370+
}
371+
return totalLinkedButDeletedTsfileResourceRamSize;
372+
} catch (final Exception e) {
373+
LOGGER.warn("failed to get total size of linked but deleted TsFiles resource ram size: ", e);
374+
return totalLinkedButDeletedTsfileResourceRamSize;
375+
}
376+
}
358377
}

0 commit comments

Comments
 (0)