Skip to content

Commit 19810e5

Browse files
authored
Pipe: Fixed the default param of single entry of disruptor queue & Banned memory checks from some missing pipe ITs & Do not check non-user pipes (apache#16069)
* Update CommonConfig.java * refactor * try-fix * test * revert-pom * fix * fix
1 parent a367839 commit 19810e5

File tree

27 files changed

+173
-96
lines changed

27 files changed

+173
-96
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,12 @@ public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) {
453453
return this;
454454
}
455455

456+
@Override
457+
public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) {
458+
setProperty("pipe_memory_management_enabled", String.valueOf(pipeMemoryManagementEnabled));
459+
return this;
460+
}
461+
456462
@Override
457463
public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) {
458464
setProperty("pipe_enable_memory_checked", String.valueOf(isPipeEnableMemoryCheck));

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,13 @@ public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) {
466466
return this;
467467
}
468468

469+
@Override
470+
public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) {
471+
dnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled);
472+
cnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled);
473+
return this;
474+
}
475+
469476
@Override
470477
public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) {
471478
dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck);

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,11 @@ public CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode) {
328328
return this;
329329
}
330330

331+
@Override
332+
public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) {
333+
return this;
334+
}
335+
331336
@Override
332337
public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) {
333338
return this;

integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus(
146146

147147
CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode);
148148

149+
CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled);
150+
149151
CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck);
150152

151153
CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled);

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ protected void setupConfig() {
5050
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
5151
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
5252
.setEnforceStrongPassword(false)
53+
.setPipeMemoryManagementEnabled(false)
5354
.setIsPipeEnableMemoryCheck(false);
5455
receiverEnv
5556
.getConfig()
@@ -58,6 +59,7 @@ protected void setupConfig() {
5859
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
5960
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
6061
.setEnforceStrongPassword(false)
62+
.setPipeMemoryManagementEnabled(false)
6163
.setIsPipeEnableMemoryCheck(false);
6264

6365
// 10 min, assert that the operations will not time out

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public void setUp() {
6969
.setTimestampPrecision("ms")
7070
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7171
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
72+
.setPipeMemoryManagementEnabled(false)
7273
.setIsPipeEnableMemoryCheck(false);
7374
receiverEnv
7475
.getConfig()
@@ -78,6 +79,7 @@ public void setUp() {
7879
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7980
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
8081
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
82+
.setPipeMemoryManagementEnabled(false)
8183
.setIsPipeEnableMemoryCheck(false)
8284
.setSchemaReplicationFactor(3)
8385
.setDataReplicationFactor(2);

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ private void innerSetUp(
7676
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7777
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
7878
.setSchemaReplicationFactor(schemaRegionReplicationFactor)
79-
.setDataReplicationFactor(dataRegionReplicationFactor);
79+
.setDataReplicationFactor(dataRegionReplicationFactor)
80+
.setDnConnectionTimeoutMs(600000)
81+
.setPipeMemoryManagementEnabled(false)
82+
.setIsPipeEnableMemoryCheck(false);
8083
receiverEnv
8184
.getConfig()
8285
.getCommonConfig()
@@ -85,11 +88,10 @@ private void innerSetUp(
8588
.setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
8689
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
8790
.setSchemaReplicationFactor(schemaRegionReplicationFactor)
88-
.setDataReplicationFactor(dataRegionReplicationFactor);
89-
90-
// 10 min, assert that the operations will not time out
91-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
92-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
91+
.setDataReplicationFactor(dataRegionReplicationFactor)
92+
.setDnConnectionTimeoutMs(600000)
93+
.setPipeMemoryManagementEnabled(false)
94+
.setIsPipeEnableMemoryCheck(false);
9395

9496
senderEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
9597
receiverEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
@@ -170,7 +172,10 @@ public void testPipeOnBothSenderAndReceiver() throws Exception {
170172
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
171173
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
172174
.setSchemaReplicationFactor(3)
173-
.setDataReplicationFactor(2);
175+
.setDataReplicationFactor(2)
176+
.setDnConnectionTimeoutMs(600000)
177+
.setPipeMemoryManagementEnabled(false)
178+
.setIsPipeEnableMemoryCheck(false);
174179
receiverEnv
175180
.getConfig()
176181
.getCommonConfig()
@@ -179,11 +184,10 @@ public void testPipeOnBothSenderAndReceiver() throws Exception {
179184
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
180185
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
181186
.setSchemaReplicationFactor(1)
182-
.setDataReplicationFactor(1);
183-
184-
// 10 min, assert that the operations will not time out
185-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
186-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
187+
.setDataReplicationFactor(1)
188+
.setDnConnectionTimeoutMs(600000)
189+
.setPipeMemoryManagementEnabled(false)
190+
.setIsPipeEnableMemoryCheck(false);
187191

188192
senderEnv.initClusterEnvironment(3, 3);
189193
receiverEnv.initClusterEnvironment(1, 1);
@@ -379,7 +383,10 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
379383
.setDataReplicationFactor(1)
380384
.setEnableSeqSpaceCompaction(false)
381385
.setEnableUnseqSpaceCompaction(false)
382-
.setEnableCrossSpaceCompaction(false);
386+
.setEnableCrossSpaceCompaction(false)
387+
.setDnConnectionTimeoutMs(600000)
388+
.setPipeMemoryManagementEnabled(false)
389+
.setIsPipeEnableMemoryCheck(false);
383390
receiverEnv
384391
.getConfig()
385392
.getCommonConfig()
@@ -389,11 +396,10 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
389396
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
390397
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
391398
.setSchemaReplicationFactor(3)
392-
.setDataReplicationFactor(2);
393-
394-
// 10 min, assert that the operations will not time out
395-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
396-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
399+
.setDataReplicationFactor(2)
400+
.setDnConnectionTimeoutMs(600000)
401+
.setPipeMemoryManagementEnabled(false)
402+
.setIsPipeEnableMemoryCheck(false);
397403

398404
senderEnv.initClusterEnvironment(1, 1);
399405
receiverEnv.initClusterEnvironment(1, 3);

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,21 @@ public void setUp() {
7171
// Disable sender compaction for tsfile determination in loose range test
7272
.setEnableSeqSpaceCompaction(false)
7373
.setEnableUnseqSpaceCompaction(false)
74-
.setEnableCrossSpaceCompaction(false);
74+
.setEnableCrossSpaceCompaction(false)
75+
.setDnConnectionTimeoutMs(600000)
76+
.setPipeMemoryManagementEnabled(false)
77+
.setIsPipeEnableMemoryCheck(false);
7578
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
7679

7780
receiverEnv
7881
.getConfig()
7982
.getCommonConfig()
8083
.setAutoCreateSchemaEnabled(true)
8184
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
82-
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
83-
84-
// 10 min, assert that the operations will not time out
85-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
86-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
85+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
86+
.setDnConnectionTimeoutMs(600000)
87+
.setPipeMemoryManagementEnabled(false)
88+
.setIsPipeEnableMemoryCheck(false);
8789

8890
senderEnv.initClusterEnvironment();
8991
receiverEnv.initClusterEnvironment();

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ public void setUp() {
7474
.setAutoCreateSchemaEnabled(true)
7575
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7676
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
77-
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
77+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
78+
.setDnConnectionTimeoutMs(600000)
79+
.setPipeMemoryManagementEnabled(false)
80+
.setIsPipeEnableMemoryCheck(false);
7881

7982
receiverEnv
8083
.getConfig()
@@ -84,11 +87,10 @@ public void setUp() {
8487
.setSchemaReplicationFactor(3)
8588
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
8689
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
87-
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
88-
89-
// 10 min, assert that the operations will not time out
90-
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
91-
receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
90+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
91+
.setDnConnectionTimeoutMs(600000)
92+
.setPipeMemoryManagementEnabled(false)
93+
.setIsPipeEnableMemoryCheck(false);
9294

9395
senderEnv.initClusterEnvironment(3, 3, 180);
9496
receiverEnv.initClusterEnvironment(3, 3, 180);

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,15 @@ public void setUp() {
6262
.setTimestampPrecision("ms")
6363
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
6464
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
65+
.setPipeMemoryManagementEnabled(false)
6566
.setIsPipeEnableMemoryCheck(false);
6667
receiverEnv
6768
.getConfig()
6869
.getCommonConfig()
6970
.setAutoCreateSchemaEnabled(true)
7071
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
7172
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
73+
.setPipeMemoryManagementEnabled(false)
7274
.setIsPipeEnableMemoryCheck(false);
7375

7476
// 10 min, assert that the operations will not time out

0 commit comments

Comments
 (0)