Skip to content

Commit adba099

Browse files
authored
Pipe: bind memory block for PipeTransferTsFileHandler and PipeInsertNodeTabletInsertionEvent & unify the memory threshold judgment for tablet and tsfile block & close data container when internally decrease reference count of PipeTsFileInsertionEvent (apache#14873)
1 parent 9e973b7 commit adba099

File tree

10 files changed

+184
-61
lines changed

10 files changed

+184
-61
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtaskWorker.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.Collections;
2828
import java.util.Set;
2929
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.stream.Collectors;
3130

3231
public class PipeProcessorSubtaskWorker extends WrappedRunnable {
3332

@@ -53,10 +52,7 @@ public void runMayThrow() {
5352
}
5453

5554
private void cleanupClosedSubtasksIfNecessary() {
56-
subtasks.stream()
57-
.filter(PipeProcessorSubtask::isClosed)
58-
.collect(Collectors.toList())
59-
.forEach(subtasks::remove);
55+
subtasks.removeIf(PipeProcessorSubtask::isClosed);
6056
}
6157

6258
private boolean runSubtasks() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
5757
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
5858

59+
import com.google.common.collect.ImmutableSet;
5960
import org.apache.tsfile.exception.write.WriteProcessException;
6061
import org.apache.tsfile.utils.Pair;
6162
import org.slf4j.Logger;
@@ -184,7 +185,7 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
184185

185186
private void transferInBatchWithoutCheck(
186187
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch)
187-
throws IOException, WriteProcessException {
188+
throws IOException, WriteProcessException, InterruptedException {
188189
final PipeTabletEventBatch batch = endPointAndBatch.getRight();
189190

190191
if (batch instanceof PipeTabletEventPlainBatch) {
@@ -473,7 +474,8 @@ private void transferQueuedEventsIfNecessary() throws Exception {
473474
}
474475

475476
/** Try its best to commit data in order. Flush can also be a trigger to transfer batched data. */
476-
private void transferBatchedEventsIfNecessary() throws IOException, WriteProcessException {
477+
private void transferBatchedEventsIfNecessary()
478+
throws IOException, WriteProcessException, InterruptedException {
477479
if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
478480
return;
479481
}
@@ -565,8 +567,12 @@ public synchronized void close() {
565567

566568
// ensure all on-the-fly handlers have been cleared
567569
if (hasPendingHandlers()) {
568-
pendingHandlers.forEach((handler, _handler) -> handler.clearEventsReferenceCount());
569-
pendingHandlers.clear();
570+
ImmutableSet.copyOf(pendingHandlers.keySet())
571+
.forEach(
572+
handler -> {
573+
handler.clearEventsReferenceCount();
574+
eliminateHandler(handler);
575+
});
570576
}
571577

572578
try {
@@ -621,6 +627,7 @@ public void trackHandler(final PipeTransferTrackableHandler handler) {
621627
}
622628

623629
public void eliminateHandler(final PipeTransferTrackableHandler handler) {
630+
handler.close();
624631
pendingHandlers.remove(handler);
625632
}
626633

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.thrift.async.AsyncMethodCallback;
2929

3030
public abstract class PipeTransferTrackableHandler
31-
implements AsyncMethodCallback<TPipeTransferResp> {
31+
implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
3232

3333
protected final IoTDBDataRegionAsyncConnector connector;
3434

@@ -38,25 +38,30 @@ public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncConnector connecto
3838

3939
@Override
4040
public void onComplete(final TPipeTransferResp response) {
41-
if (!connector.isClosed()) {
42-
if (onCompleteInternal(response)) {
43-
// eliminate handler only when all transmissions corresponding to the handler have been
44-
// completed
45-
connector.eliminateHandler(this);
46-
}
47-
} else {
41+
if (connector.isClosed()) {
4842
clearEventsReferenceCount();
4943
connector.eliminateHandler(this);
44+
return;
45+
}
46+
47+
if (onCompleteInternal(response)) {
48+
// eliminate handler only when all transmissions corresponding to the handler have been
49+
// completed
50+
// NOTE: We should not clear the reference count of events, as this would cause the
51+
// `org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3` test to fail.
52+
connector.eliminateHandler(this);
5053
}
5154
}
5255

5356
@Override
5457
public void onError(final Exception exception) {
55-
if (!connector.isClosed()) {
56-
onErrorInternal(exception);
57-
} else {
58+
if (connector.isClosed()) {
5859
clearEventsReferenceCount();
60+
connector.eliminateHandler(this);
61+
return;
5962
}
63+
64+
onErrorInternal(exception);
6065
connector.eliminateHandler(this);
6166
}
6267

@@ -76,6 +81,7 @@ protected boolean tryTransfer(
7681
connector.trackHandler(this);
7782
if (connector.isClosed()) {
7883
clearEventsReferenceCount();
84+
connector.eliminateHandler(this);
7985
return false;
8086
}
8187
doTransfer(client, req);
@@ -95,4 +101,9 @@ protected abstract void doTransfer(
95101
throws TException;
96102

97103
public abstract void clearEventsReferenceCount();
104+
105+
@Override
106+
public void close() {
107+
// do nothing
108+
}
98109
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
3333
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
3434
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
35+
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
36+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
37+
import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
38+
import org.apache.iotdb.pipe.api.exception.PipeException;
3539
import org.apache.iotdb.rpc.TSStatusCode;
3640
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
3741
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -76,6 +80,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler {
7680
private final String dataBaseName;
7781

7882
private final int readFileBufferSize;
83+
private final PipeTsFileMemoryBlock memoryBlock;
7984
private final byte[] readBuffer;
8085
private long position;
8186

@@ -96,7 +101,7 @@ public PipeTransferTsFileHandler(
96101
final File modFile,
97102
final boolean transferMod,
98103
final String dataBaseName)
99-
throws FileNotFoundException {
104+
throws FileNotFoundException, InterruptedException {
100105
super(connector);
101106

102107
this.pipeName2WeightMap = pipeName2WeightMap;
@@ -111,7 +116,14 @@ public PipeTransferTsFileHandler(
111116
this.dataBaseName = dataBaseName;
112117
currentFile = transferMod ? modFile : tsFile;
113118

114-
readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
119+
waitForResourceEnough4Slicing(Integer.MAX_VALUE);
120+
readFileBufferSize =
121+
(int)
122+
Math.min(
123+
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(),
124+
transferMod ? Math.max(tsFile.length(), modFile.length()) : tsFile.length());
125+
memoryBlock =
126+
PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(readFileBufferSize);
115127
readBuffer = new byte[readFileBufferSize];
116128
position = 0;
117129

@@ -275,6 +287,7 @@ protected boolean onCompleteInternal(final TPipeTransferResp response) {
275287
client.returnSelf();
276288
}
277289
}
290+
278291
return true;
279292
}
280293

@@ -379,4 +392,53 @@ protected void doTransfer(
379392
public void clearEventsReferenceCount() {
380393
events.forEach(event -> event.clearReferenceCount(PipeTransferTsFileHandler.class.getName()));
381394
}
395+
396+
@Override
397+
public void close() {
398+
super.close();
399+
memoryBlock.close();
400+
}
401+
402+
private void waitForResourceEnough4Slicing(final long timeoutMs) throws InterruptedException {
403+
final PipeMemoryManager memoryManager = PipeDataNodeResourceManager.memory();
404+
if (memoryManager.isEnough4TsFileSlicing()) {
405+
return;
406+
}
407+
408+
final long startTime = System.currentTimeMillis();
409+
long lastRecordTime = startTime;
410+
411+
final long memoryCheckIntervalMs =
412+
PipeConfig.getInstance().getPipeCheckMemoryEnoughIntervalMs();
413+
while (!memoryManager.isEnough4TsFileSlicing()) {
414+
Thread.sleep(memoryCheckIntervalMs);
415+
416+
final long currentTime = System.currentTimeMillis();
417+
final double elapsedRecordTimeSeconds = (currentTime - lastRecordTime) / 1000.0;
418+
final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
419+
if (elapsedRecordTimeSeconds > 10.0) {
420+
LOGGER.info(
421+
"Wait for resource enough for slicing tsfile {} for {} seconds.",
422+
tsFile,
423+
waitTimeSeconds);
424+
lastRecordTime = currentTime;
425+
} else if (LOGGER.isDebugEnabled()) {
426+
LOGGER.debug(
427+
"Wait for resource enough for slicing tsfile {} for {} seconds.",
428+
tsFile,
429+
waitTimeSeconds);
430+
}
431+
432+
if (waitTimeSeconds * 1000 > timeoutMs) {
433+
// should contain 'TimeoutException' in exception message
434+
throw new PipeException(
435+
String.format("TimeoutException: Waited %s seconds", waitTimeSeconds));
436+
}
437+
}
438+
439+
final long currentTime = System.currentTimeMillis();
440+
final double waitTimeSeconds = (currentTime - startTime) / 1000.0;
441+
LOGGER.info(
442+
"Wait for resource enough for slicing tsfile {} for {} seconds.", tsFile, waitTimeSeconds);
443+
}
382444
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser;
3535
import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
3636
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
37+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
38+
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
3739
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
3840
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
3941
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -82,6 +84,8 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent
8284
private final boolean isAligned;
8385
private final boolean isGeneratedByPipe;
8486

87+
private final PipeTabletMemoryBlock allocatedMemoryBlock;
88+
8589
private List<TabletInsertionEventParser> eventParsers;
8690

8791
private final PartialPath devicePath;
@@ -143,6 +147,10 @@ private PipeInsertNodeTabletInsertionEvent(
143147
this.progressIndex = progressIndex;
144148
this.isAligned = isAligned;
145149
this.isGeneratedByPipe = isGeneratedByPipe;
150+
151+
// Allocate empty memory block, will be resized later.
152+
this.allocatedMemoryBlock =
153+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
146154
}
147155

148156
public InsertNode getInsertNode() throws WALPipeException {
@@ -191,11 +199,12 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
191199
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
192200
try {
193201
PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
194-
// Release the parsers' memory.
202+
// release the parsers' memory and close memory block
195203
if (eventParsers != null) {
196204
eventParsers.clear();
197205
eventParsers = null;
198206
}
207+
allocatedMemoryBlock.close();
199208
return true;
200209
} catch (final Exception e) {
201210
LOGGER.warn(
@@ -368,9 +377,18 @@ public boolean isAligned(final int i) {
368377

369378
// TODO: for table model insertion, we need to get the database name
370379
public List<Tablet> convertToTablets() {
371-
return initEventParsers().stream()
372-
.map(TabletInsertionEventParser::convertToTablet)
373-
.collect(Collectors.toList());
380+
final List<Tablet> tablets =
381+
initEventParsers().stream()
382+
.map(TabletInsertionEventParser::convertToTablet)
383+
.collect(Collectors.toList());
384+
PipeDataNodeResourceManager.memory()
385+
.forceResize(
386+
allocatedMemoryBlock,
387+
tablets.stream()
388+
.map(PipeMemoryWeightUtil::calculateTabletSizeInBytes)
389+
.reduce(Long::sum)
390+
.orElse(0L));
391+
return tablets;
374392
}
375393

376394
/////////////////////////// event parser ///////////////////////////
@@ -490,7 +508,7 @@ protected void trackResource() {
490508
@Override
491509
public PipeEventResource eventResourceBuilder() {
492510
return new PipeInsertNodeTabletInsertionEventResource(
493-
this.isReleased, this.referenceCount, this.walEntryHandler);
511+
this.isReleased, this.referenceCount, this.walEntryHandler, this.allocatedMemoryBlock);
494512
}
495513

496514
// Notes:
@@ -508,20 +526,23 @@ public long ramBytesUsed() {
508526
private static class PipeInsertNodeTabletInsertionEventResource extends PipeEventResource {
509527

510528
private final WALEntryHandler walEntryHandler;
529+
private final PipeTabletMemoryBlock allocatedMemoryBlock;
511530

512531
private PipeInsertNodeTabletInsertionEventResource(
513532
final AtomicBoolean isReleased,
514533
final AtomicInteger referenceCount,
515-
final WALEntryHandler walEntryHandler) {
534+
final WALEntryHandler walEntryHandler,
535+
final PipeTabletMemoryBlock allocatedMemoryBlock) {
516536
super(isReleased, referenceCount);
517537
this.walEntryHandler = walEntryHandler;
538+
this.allocatedMemoryBlock = allocatedMemoryBlock;
518539
}
519540

520541
@Override
521542
protected void finalizeResource() {
522543
try {
523544
PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
524-
// no need to release the containers' memory because it has already been GCed
545+
allocatedMemoryBlock.close();
525546
} catch (final Exception e) {
526547
LOGGER.warn(
527548
String.format(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,10 +283,6 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
283283
if (isWithMod) {
284284
modFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, null);
285285
}
286-
if (Objects.nonNull(pipeName)) {
287-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
288-
.increaseTsFileEventCount(pipeName, creationTime);
289-
}
290286
return true;
291287
} catch (final Exception e) {
292288
LOGGER.warn(
@@ -295,6 +291,11 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
295291
tsFile, modFile, holderMessage),
296292
e);
297293
return false;
294+
} finally {
295+
if (Objects.nonNull(pipeName)) {
296+
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
297+
.increaseTsFileEventCount(pipeName, creationTime);
298+
}
298299
}
299300
}
300301

@@ -305,6 +306,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
305306
if (isWithMod) {
306307
PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile);
307308
}
309+
close();
308310
return true;
309311
} catch (final Exception e) {
310312
LOGGER.warn(
@@ -527,7 +529,7 @@ private void waitForResourceEnough4Parsing(final long timeoutMs) throws Interrup
527529
long lastRecordTime = startTime;
528530

529531
final long memoryCheckIntervalMs =
530-
PipeConfig.getInstance().getPipeTsFileParserCheckMemoryEnoughIntervalMs();
532+
PipeConfig.getInstance().getPipeCheckMemoryEnoughIntervalMs();
531533
while (!memoryManager.isEnough4TabletParsing()) {
532534
Thread.sleep(memoryCheckIntervalMs);
533535

0 commit comments

Comments
 (0)