Skip to content

Commit c17894e

Browse files
luoluoyuyuJackieTien97
authored andcommitted
Pipe: CN adds logic to check if Pipe is out of memory (#16119)
* Pipe: CN adds logic to check if Pipe is out of memory * update AbstractOperatePipeProcedureV2 * update AbstractOperatePipeProcedureV2 * add it (cherry picked from commit d0788a8)
1 parent 065d0d7 commit c17894e

File tree

10 files changed

+173
-9
lines changed

10 files changed

+173
-9
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,12 @@ public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
567567
return this;
568568
}
569569

570+
@Override
571+
public CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion) {
572+
setProperty("datanode_memory_proportion", datanodeMemoryProportion);
573+
return this;
574+
}
575+
570576
// For part of the log directory
571577
public String getClusterConfigStr() {
572578
return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,4 +587,10 @@ public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
587587
cnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel);
588588
return this;
589589
}
590+
591+
public CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion) {
592+
dnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion);
593+
cnConfig.setDatanodeMemoryProportion(datanodeMemoryProportion);
594+
return this;
595+
}
590596
}

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,4 +406,9 @@ public CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
406406
public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
407407
return this;
408408
}
409+
410+
@Override
411+
public CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion) {
412+
return this;
413+
}
409414
}

integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,4 +183,6 @@ CommonConfig setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(
183183
default CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
184184
return this;
185185
}
186+
187+
CommonConfig setDatanodeMemoryProportion(String datanodeMemoryProportion);
186188
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
24+
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
25+
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
26+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
27+
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
28+
import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
29+
import org.apache.iotdb.rpc.TSStatusCode;
30+
31+
import org.junit.Assert;
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
import org.junit.experimental.categories.Category;
35+
import org.junit.runner.RunWith;
36+
37+
import java.util.HashMap;
38+
import java.util.Map;
39+
40+
@RunWith(IoTDBTestRunner.class)
41+
@Category({MultiClusterIT2DualTableManualBasic.class})
42+
public class IoTDBPipeMemoryIT extends AbstractPipeTableModelDualManualIT {
43+
44+
@Override
45+
@Before
46+
public void setUp() {
47+
super.setUp();
48+
}
49+
50+
@Override
51+
protected void setupConfig() {
52+
super.setupConfig();
53+
senderEnv
54+
.getConfig()
55+
.getCommonConfig()
56+
.setPipeMemoryManagementEnabled(true)
57+
.setIsPipeEnableMemoryCheck(true)
58+
.setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000");
59+
receiverEnv
60+
.getConfig()
61+
.getCommonConfig()
62+
.setPipeMemoryManagementEnabled(true)
63+
.setIsPipeEnableMemoryCheck(true)
64+
.setDatanodeMemoryProportion("1000:1000:1000:1000:1:1000");
65+
}
66+
67+
@Test
68+
public void testCreatePipeMemoryManage() {
69+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
70+
final String receiverIp = receiverDataNode.getIp();
71+
final int receiverPort = receiverDataNode.getPort();
72+
73+
try (final SyncConfigNodeIServiceClient client =
74+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
75+
final Map<String, String> extractorAttributes = new HashMap<>();
76+
final Map<String, String> processorAttributes = new HashMap<>();
77+
final Map<String, String> connectorAttributes = new HashMap<>();
78+
79+
extractorAttributes.put("capture.table", "true");
80+
extractorAttributes.put("user", "root");
81+
82+
connectorAttributes.put("connector", "iotdb-thrift-connector");
83+
connectorAttributes.put("connector.batch.enable", "false");
84+
connectorAttributes.put("connector.ip", receiverIp);
85+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
86+
87+
final TSStatus status =
88+
client.createPipe(
89+
new TCreatePipeReq("p1", connectorAttributes)
90+
.setExtractorAttributes(extractorAttributes)
91+
.setProcessorAttributes(processorAttributes));
92+
93+
Assert.assertNotEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
94+
Assert.assertNotNull(status.getMessage());
95+
Assert.assertTrue(status.getMessage().contains("Not enough memory for pipe."));
96+
97+
} catch (Exception e) {
98+
Assert.fail(e.getMessage());
99+
}
100+
}
101+
}

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ public enum TSStatusCode {
286286
PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED(1811),
287287
PIPE_TRANSFER_SLICE_OUT_OF_ORDER(1812),
288288
PIPE_PUSH_META_TIMEOUT(1813),
289+
PIPE_PUSH_META_NOT_ENOUGH_MEMORY(1814),
289290

290291
// Subscription
291292
SUBSCRIPTION_VERSION_ERROR(1900),

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,11 +428,33 @@ public static Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(
428428
public static String parsePushPipeMetaExceptionForPipe(
429429
final String pipeName, final Map<Integer, TPushPipeMetaResp> respMap) {
430430
final StringBuilder exceptionMessageBuilder = new StringBuilder();
431+
final StringBuilder enoughMemoryMessageBuilder = new StringBuilder();
431432

432433
for (final Map.Entry<Integer, TPushPipeMetaResp> respEntry : respMap.entrySet()) {
433434
final int dataNodeId = respEntry.getKey();
434435
final TPushPipeMetaResp resp = respEntry.getValue();
435436

437+
if (resp.getStatus().getCode()
438+
== TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode()) {
439+
exceptionMessageBuilder.append(String.format("DataNodeId: %s,", dataNodeId));
440+
resp.getExceptionMessages()
441+
.forEach(
442+
message -> {
443+
// Ignore the timeStamp for simplicity
444+
if (pipeName == null) {
445+
enoughMemoryMessageBuilder.append(
446+
String.format(
447+
"PipeName: %s, Message: %s",
448+
message.getPipeName(), message.getMessage()));
449+
} else if (pipeName.equals(message.getPipeName())) {
450+
enoughMemoryMessageBuilder.append(
451+
String.format("Message: %s", message.getMessage()));
452+
}
453+
});
454+
enoughMemoryMessageBuilder.append(".");
455+
continue;
456+
}
457+
436458
if (resp.getStatus().getCode() == TSStatusCode.PIPE_PUSH_META_TIMEOUT.getStatusCode()) {
437459
exceptionMessageBuilder.append(
438460
String.format(
@@ -476,6 +498,12 @@ public static String parsePushPipeMetaExceptionForPipe(
476498
}
477499
}
478500
}
501+
502+
final String enoughMemoryMessage = enoughMemoryMessageBuilder.toString();
503+
if (!enoughMemoryMessage.isEmpty()) {
504+
throw new PipeException(enoughMemoryMessage);
505+
}
506+
479507
return exceptionMessageBuilder.toString();
480508
}
481509

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,8 @@ protected void calculateMemoryUsage(
717717
if (freeMemorySizeInBytes < needMemory + reservedMemorySizeInBytes) {
718718
final String message =
719719
String.format(
720-
"Not enough memory for pipe. Need memory: %d bytes, free memory: %d bytes, reserved memory: %d bytes, total memory: %d bytes",
720+
"%s Need memory: %d bytes, free memory: %d bytes, reserved memory: %d bytes, total memory: %d bytes",
721+
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
721722
needMemory,
722723
freeMemorySizeInBytes,
723724
freeMemorySizeInBytes,
@@ -765,8 +766,10 @@ private void calculateInsertNodeQueueMemory(final PipeParameters sourceParameter
765766
if (remainingMemory < PipeConfig.getInstance().PipeInsertNodeQueueMemory()) {
766767
final String message =
767768
String.format(
768-
"Not enough memory for pipe. Need Floating memory: %d bytes, free Floating memory: %d bytes",
769-
PipeConfig.getInstance().PipeInsertNodeQueueMemory(), remainingMemory);
769+
"%s Need Floating memory: %d bytes, free Floating memory: %d bytes",
770+
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
771+
PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
772+
remainingMemory);
770773
LOGGER.warn(message);
771774
throw new PipeException(message);
772775
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.iotdb.commons.path.PathDeserializeUtil;
6262
import org.apache.iotdb.commons.path.PathPatternTree;
6363
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
64+
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
6465
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
6566
import org.apache.iotdb.commons.schema.SchemaConstant;
6667
import org.apache.iotdb.commons.schema.cache.CacheClearOptions;
@@ -1203,13 +1204,23 @@ public TPushPipeMetaResp pushSinglePipeMeta(final TPushSinglePipeMetaReq req) {
12031204
} else {
12041205
throw new Exception("Invalid TPushSinglePipeMetaReq");
12051206
}
1206-
return exceptionMessage == null
1207-
? new TPushPipeMetaResp()
1208-
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
1209-
: new TPushPipeMetaResp()
1210-
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
1207+
if (exceptionMessage != null) {
1208+
if (exceptionMessage.message != null
1209+
&& exceptionMessage.message.contains(PipeTaskAgent.MESSAGE_PIPE_NOT_ENOUGH_MEMORY)) {
1210+
return new TPushPipeMetaResp()
1211+
.setStatus(
1212+
new TSStatus(TSStatusCode.PIPE_PUSH_META_NOT_ENOUGH_MEMORY.getStatusCode()))
12111213
.setExceptionMessages(Collections.singletonList(exceptionMessage));
1212-
} catch (final Exception e) {
1214+
}
1215+
1216+
return new TPushPipeMetaResp()
1217+
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
1218+
.setExceptionMessages(Collections.singletonList(exceptionMessage));
1219+
}
1220+
1221+
return new TPushPipeMetaResp()
1222+
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
1223+
} catch (Exception e) {
12131224
LOGGER.error("Error occurred when pushing single pipe meta", e);
12141225
return new TPushPipeMetaResp()
12151226
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()));

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public abstract class PipeTaskAgent {
8181

8282
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskAgent.class);
8383

84+
public static final String MESSAGE_PIPE_NOT_ENOUGH_MEMORY = "Not enough memory for pipe.";
8485
protected static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe status %s for pipe %s";
8586
protected static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected pipe status %s: ";
8687

0 commit comments

Comments
 (0)