File tree Expand file tree Collapse file tree 1 file changed +6
-0
lines changed
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch Expand file tree Collapse file tree 1 file changed +6
-0
lines changed Original file line number Diff line number Diff line change 2020package org .apache .iotdb .db .pipe .sink .payload .evolvable .batch ;
2121
2222import org .apache .iotdb .commons .pipe .event .EnrichedEvent ;
23+ import org .apache .iotdb .db .pipe .resource .PipeDataNodeResourceManager ;
24+ import org .apache .iotdb .db .pipe .resource .memory .PipeMemoryBlock ;
2325import org .apache .iotdb .db .pipe .sink .protocol .thrift .async .IoTDBDataRegionAsyncSink ;
2426import org .apache .iotdb .db .storageengine .dataregion .wal .exception .WALPipeException ;
2527import org .apache .iotdb .pipe .api .event .Event ;
@@ -45,6 +47,7 @@ public abstract class PipeTabletEventBatch implements AutoCloseable {
4547 private long firstEventProcessingTime = Long .MIN_VALUE ;
4648
4749 protected long totalBufferSize = 0 ;
50+ private final PipeMemoryBlock allocatedMemoryBlock ;
4851
4952 protected volatile boolean isClosed = false ;
5053
@@ -56,6 +59,8 @@ protected PipeTabletEventBatch(
5659
5760 // limit in buffer size
5861 this .maxBatchSizeInBytes = requestMaxBatchSizeInBytes ;
62+ this .allocatedMemoryBlock =
63+ PipeDataNodeResourceManager .memory ().forceAllocate (requestMaxBatchSizeInBytes );
5964 if (recordMetric != null ) {
6065 this .recordMetric = recordMetric ;
6166 } else {
@@ -142,6 +147,7 @@ public synchronized void close() {
142147
143148 clearEventsReferenceCount (PipeTabletEventBatch .class .getName ());
144149 events .clear ();
150+ allocatedMemoryBlock .close ();
145151 }
146152
147153 /**
You can’t perform that action at this time.
0 commit comments