Skip to content

Commit b2103e8

Browse files
committed
Merge branch 'dev/1.3' of https://github.com/apache/iotdb into cp-bishop
2 parents 4a54421 + 9be174b commit b2103e8

File tree

237 files changed

+7480
-3504
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

237 files changed

+7480
-3504
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
551551

552552
@Override
553553
public CommonConfig setDefaultStorageGroupLevel(int defaultStorageGroupLevel) {
554-
setProperty("default_storage_group_level", String.valueOf(defaultStorageGroupLevel));
554+
setProperty("default_database_level", String.valueOf(defaultStorageGroupLevel));
555555
return this;
556556
}
557557

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,16 @@ public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad)
100100
setProperty("cache_last_values_for_load", String.valueOf(cacheLastValuesForLoad));
101101
return this;
102102
}
103+
104+
@Override
105+
public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
106+
setProperty("wal_throttle_threshold_in_byte", String.valueOf(walThrottleSize));
107+
return this;
108+
}
109+
110+
@Override
111+
public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) {
112+
setProperty("delete_wal_files_period_in_ms", String.valueOf(deleteWalFilesPeriodInMs));
113+
return this;
114+
}
103115
}

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,4 +758,8 @@ public long getPid() {
758758
return -1;
759759
}
760760
}
761+
762+
public Process getInstance() {
763+
return instance;
764+
}
761765
}

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,14 @@ public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
6363
public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) {
6464
return this;
6565
}
66+
67+
@Override
68+
public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
69+
return this;
70+
}
71+
72+
@Override
73+
public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) {
74+
return this;
75+
}
6676
}

integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,8 @@ DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
3939
DataNodeConfig setLoadLastCacheStrategy(String strategyName);
4040

4141
DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
42+
43+
DataNodeConfig setWalThrottleSize(long walThrottleSize);
44+
45+
DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs);
4246
}

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import org.apache.thrift.TException;
4646
import org.apache.tsfile.read.common.Field;
47+
import org.apache.tsfile.utils.Pair;
4748
import org.awaitility.Awaitility;
4849
import org.awaitility.core.ConditionTimeoutException;
4950
import org.junit.After;
@@ -307,7 +308,7 @@ public void generalTestWithAllOptions(
307308
}
308309
}
309310

310-
protected Set<Integer> getAllDataNodes(Statement statement) throws Exception {
311+
public static Set<Integer> getAllDataNodes(Statement statement) throws Exception {
311312
ResultSet result = statement.executeQuery(SHOW_DATANODES);
312313
Set<Integer> allDataNodeId = new HashSet<>();
313314
while (result.next()) {
@@ -444,6 +445,26 @@ public static Map<Integer, Set<Integer>> getDataRegionMap(Statement statement) t
444445
return regionMap;
445446
}
446447

448+
public static Map<Integer, Pair<Integer, Set<Integer>>> getDataRegionMapWithLeader(
449+
Statement statement) throws Exception {
450+
ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
451+
Map<Integer, Pair<Integer, Set<Integer>>> regionMap = new HashMap<>();
452+
while (showRegionsResult.next()) {
453+
if (String.valueOf(TConsensusGroupType.DataRegion)
454+
.equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) {
455+
int regionId = showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID);
456+
int dataNodeId = showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID);
457+
Pair<Integer, Set<Integer>> leaderNodesPair =
458+
regionMap.computeIfAbsent(regionId, id -> new Pair<>(-1, new HashSet<>()));
459+
leaderNodesPair.getRight().add(dataNodeId);
460+
if (showRegionsResult.getString(ColumnHeaderConstant.ROLE).equals("Leader")) {
461+
leaderNodesPair.setLeft(dataNodeId);
462+
}
463+
}
464+
}
465+
return regionMap;
466+
}
467+
447468
public static Map<Integer, Set<Integer>> getAllRegionMap(Statement statement) throws Exception {
448469
ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
449470
Map<Integer, Set<Integer>> regionMap = new HashMap<>();
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.it;
21+
22+
import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
23+
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
24+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
25+
import org.apache.iotdb.itbase.category.ClusterIT;
26+
import org.apache.iotdb.rpc.IoTDBConnectionException;
27+
import org.apache.iotdb.rpc.StatementExecutionException;
28+
import org.apache.iotdb.session.Session;
29+
30+
import org.junit.Test;
31+
import org.junit.experimental.categories.Category;
32+
import org.junit.runner.RunWith;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.sql.Connection;
37+
import java.sql.ResultSet;
38+
import java.sql.SQLException;
39+
import java.sql.Statement;
40+
import java.util.Date;
41+
42+
import static org.junit.Assert.fail;
43+
44+
/** Tests that may not be satisfied with the default cluster settings. */
45+
@RunWith(IoTDBTestRunner.class)
46+
@Category({ClusterIT.class})
47+
public class IoTDBCustomizedClusterIT {
48+
49+
private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class);
50+
51+
/**
52+
* When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try
53+
* deleting wal forever, which will block the DataNode from exiting, because task of deleting wal
54+
* submitted by the ShutdownHook cannot be executed. This test ensures that this blocking is
55+
* fixed.
56+
*/
57+
@Test
58+
public void testWalThrottleStuck()
59+
throws SQLException,
60+
IoTDBConnectionException,
61+
StatementExecutionException,
62+
InterruptedException {
63+
SimpleEnv simpleEnv = new SimpleEnv();
64+
simpleEnv
65+
.getConfig()
66+
.getDataNodeConfig()
67+
.setWalThrottleSize(1)
68+
.setDeleteWalFilesPeriodInMs(100);
69+
simpleEnv
70+
.getConfig()
71+
.getCommonConfig()
72+
.setDataReplicationFactor(3)
73+
.setSchemaReplicationFactor(3)
74+
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
75+
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
76+
try {
77+
simpleEnv.initClusterEnvironment(1, 3);
78+
79+
int leaderIndex = -1;
80+
try (Connection connection = simpleEnv.getConnection();
81+
Statement statement = connection.createStatement()) {
82+
// write the first data
83+
statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
84+
// find the leader of the data region
85+
int port = -1;
86+
87+
ResultSet resultSet = statement.executeQuery("SHOW REGIONS");
88+
while (resultSet.next()) {
89+
String regionType = resultSet.getString("Type");
90+
if (regionType.equals("DataRegion")) {
91+
String role = resultSet.getString("Role");
92+
if (role.equals("Leader")) {
93+
port = resultSet.getInt("RpcPort");
94+
break;
95+
}
96+
}
97+
}
98+
99+
if (port == -1) {
100+
fail("Leader not found");
101+
}
102+
103+
for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) {
104+
if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) {
105+
leaderIndex = i;
106+
break;
107+
}
108+
}
109+
}
110+
111+
// stop a follower
112+
int followerIndex = (leaderIndex + 1) % simpleEnv.getDataNodeWrapperList().size();
113+
simpleEnv.getDataNodeWrapperList().get(followerIndex).stop();
114+
System.out.println(
115+
new Date()
116+
+ ":Stopping data node "
117+
+ simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString());
118+
119+
DataNodeWrapper leader = simpleEnv.getDataNodeWrapperList().get(leaderIndex);
120+
// write to the leader to generate wal that cannot be synced
121+
try (Session session = new Session(leader.getIp(), leader.getPort())) {
122+
session.open();
123+
124+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
125+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
126+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
127+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
128+
session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
129+
}
130+
131+
// wait for wal-delete thread to be scheduled
132+
Thread.sleep(1000);
133+
134+
// stop the leader
135+
leader.getInstance().destroy();
136+
System.out.println(new Date() + ":Stopping data node " + leader.getIpAndPortString());
137+
// confirm the death of the leader
138+
long startTime = System.currentTimeMillis();
139+
while (leader.isAlive()) {
140+
if (System.currentTimeMillis() - startTime > 30000) {
141+
fail("Leader does not exit after 30s");
142+
}
143+
}
144+
} finally {
145+
simpleEnv.cleanClusterEnvironment();
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)