Skip to content

Commit 4862e7a

Browse files
authored
Shuffle data partition allocation strategy (#16260)
1 parent 54ca825 commit 4862e7a

File tree

9 files changed

+319
-52
lines changed

9 files changed

+319
-52
lines changed

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,12 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) {
393393
return this;
394394
}
395395

396+
@Override
397+
public CommonConfig setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy) {
398+
setProperty("data_partition_allocation_strategy", dataPartitionAllocationStrategy);
399+
return this;
400+
}
401+
396402
@Override
397403
public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) {
398404
setProperty("series_partition_executor_class", seriesPartitionExecutorClass);

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,12 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) {
396396
return this;
397397
}
398398

399+
@Override
400+
public CommonConfig setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy) {
401+
cnConfig.setDataPartitionAllocationStrategy(dataPartitionAllocationStrategy);
402+
return this;
403+
}
404+
399405
@Override
400406
public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) {
401407
cnConfig.setSeriesPartitionExecutorClass(seriesPartitionExecutorClass);

integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,11 @@ public CommonConfig setSeriesSlotNum(int seriesSlotNum) {
279279
return this;
280280
}
281281

282+
@Override
283+
public CommonConfig setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy) {
284+
return this;
285+
}
286+
282287
@Override
283288
public CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) {
284289
return this;

integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus(
126126

127127
CommonConfig setSeriesSlotNum(int seriesSlotNum);
128128

129+
CommonConfig setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy);
130+
129131
CommonConfig setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass);
130132

131133
CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate);

integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java renamed to integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritStrategyIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949

5050
@RunWith(IoTDBTestRunner.class)
5151
@Category({ClusterIT.class})
52-
public class IoTDBPartitionInheritPolicyIT {
52+
public class IoTDBPartitionInheritStrategyIT {
5353

5454
private static final String testDataRegionConsensusProtocolClass =
5555
ConsensusFactory.RATIS_CONSENSUS;
@@ -91,7 +91,7 @@ public static void tearDown() {
9191
}
9292

9393
@Test
94-
public void testDataPartitionInheritPolicy() throws Exception {
94+
public void testDataPartitionInheritStrategy() throws Exception {
9595
final long baseStartTime = 1000;
9696
Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable1 = new ConcurrentHashMap<>();
9797

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.it.partition;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24+
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
25+
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
26+
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
27+
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
28+
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
29+
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
30+
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
31+
import org.apache.iotdb.consensus.ConsensusFactory;
32+
import org.apache.iotdb.it.env.EnvFactory;
33+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
34+
import org.apache.iotdb.itbase.category.ClusterIT;
35+
import org.apache.iotdb.rpc.TSStatusCode;
36+
37+
import org.junit.AfterClass;
38+
import org.junit.Assert;
39+
import org.junit.BeforeClass;
40+
import org.junit.Test;
41+
import org.junit.experimental.categories.Category;
42+
import org.junit.runner.RunWith;
43+
44+
import java.util.ArrayList;
45+
import java.util.Collections;
46+
import java.util.List;
47+
import java.util.Map;
48+
49+
@RunWith(IoTDBTestRunner.class)
50+
@Category({ClusterIT.class})
51+
public class IoTDBPartitionShuffleStrategyIT {
52+
53+
private static final String testDataRegionConsensusProtocolClass =
54+
ConsensusFactory.RATIS_CONSENSUS;
55+
private static final int testReplicationFactor = 1;
56+
private static final String testDataPartitionAllocationStrategy = "SHUFFLE";
57+
private static final int testSeriesSlotNum = 1000;
58+
private static final long testTimePartitionInterval = 604800000;
59+
private static final double testDataRegionPerDataNode = 5.0;
60+
61+
private static final String database = "root.database";
62+
private static final int testTimePartitionSlotsNum = 100;
63+
64+
@BeforeClass
65+
public static void setUp() throws Exception {
66+
EnvFactory.getEnv()
67+
.getConfig()
68+
.getCommonConfig()
69+
.setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass)
70+
.setDataReplicationFactor(testReplicationFactor)
71+
.setTimePartitionInterval(testTimePartitionInterval)
72+
.setSeriesSlotNum(testSeriesSlotNum)
73+
.setDataPartitionAllocationStrategy(testDataPartitionAllocationStrategy)
74+
.setDataRegionPerDataNode(testDataRegionPerDataNode);
75+
76+
// Init 1C1D environment
77+
EnvFactory.getEnv().initClusterEnvironment(1, 1);
78+
79+
// Set Database
80+
try (SyncConfigNodeIServiceClient client =
81+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
82+
TSStatus status = client.setDatabase(new TDatabaseSchema(database));
83+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
84+
}
85+
}
86+
87+
@AfterClass
88+
public static void tearDown() {
89+
EnvFactory.getEnv().cleanClusterEnvironment();
90+
}
91+
92+
@Test
93+
public void testDataPartitionShuffleStrategy() throws Exception {
94+
List<Integer> randomTimeSlotList = new ArrayList<>();
95+
for (int i = 0; i < testTimePartitionSlotsNum; i++) {
96+
randomTimeSlotList.add(i);
97+
}
98+
Collections.shuffle(randomTimeSlotList);
99+
for (int timeSlotId : randomTimeSlotList) {
100+
// To test the shuffle strategy, we merely need to use a random time slot order
101+
ConfigNodeTestUtils.getOrCreateDataPartitionWithRetry(
102+
database, 0, testSeriesSlotNum, timeSlotId, timeSlotId + 1, testTimePartitionInterval);
103+
}
104+
TDataPartitionTableResp dataPartitionTableResp;
105+
try (SyncConfigNodeIServiceClient client =
106+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
107+
dataPartitionTableResp =
108+
client.getDataPartitionTable(
109+
new TDataPartitionReq(
110+
ConfigNodeTestUtils.constructPartitionSlotsMap(
111+
database,
112+
0,
113+
testSeriesSlotNum,
114+
0,
115+
testTimePartitionSlotsNum,
116+
testTimePartitionInterval)));
117+
}
118+
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
119+
partitionTable = dataPartitionTableResp.getDataPartitionTable();
120+
for (long currentStartTime = testTimePartitionInterval;
121+
currentStartTime < testTimePartitionInterval * testTimePartitionSlotsNum;
122+
currentStartTime += testTimePartitionInterval) {
123+
TTimePartitionSlot precedingTimeSlot =
124+
new TTimePartitionSlot(currentStartTime - testTimePartitionInterval);
125+
TTimePartitionSlot currentTimeSlot = new TTimePartitionSlot(currentStartTime);
126+
for (int seriesSlotId = 0; seriesSlotId < testSeriesSlotNum; seriesSlotId++) {
127+
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(seriesSlotId);
128+
List<TConsensusGroupId> precedingRegionGroupIds =
129+
partitionTable.get(database).get(seriesPartitionSlot).get(precedingTimeSlot);
130+
List<TConsensusGroupId> currentRegionGroupIds =
131+
partitionTable.get(database).get(seriesPartitionSlot).get(currentTimeSlot);
132+
Assert.assertEquals(precedingRegionGroupIds.size(), currentRegionGroupIds.size());
133+
for (int i = 0; i < precedingRegionGroupIds.size(); i++) {
134+
// Ensure that the RegionGroupId is different in two adjacent TimePartitionSlots
135+
Assert.assertNotEquals(precedingRegionGroupIds.get(i), currentRegionGroupIds.get(i));
136+
}
137+
}
138+
}
139+
}
140+
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ public class ConfigNodeConfig {
8282
private String seriesPartitionExecutorClass =
8383
"org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
8484

85+
private String dataPartitionAllocationStrategy = "INHERIT";
86+
8587
/** The policy of extension SchemaRegionGroup for each Database. */
8688
private RegionGroupExtensionPolicy schemaRegionGroupExtensionPolicy =
8789
RegionGroupExtensionPolicy.AUTO;
@@ -423,6 +425,14 @@ public void setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass)
423425
this.seriesPartitionExecutorClass = seriesPartitionExecutorClass;
424426
}
425427

428+
public String getDataPartitionAllocationStrategy() {
429+
return dataPartitionAllocationStrategy;
430+
}
431+
432+
public void setDataPartitionAllocationStrategy(String dataPartitionAllocationStrategy) {
433+
this.dataPartitionAllocationStrategy = dataPartitionAllocationStrategy;
434+
}
435+
426436
public int getCnRpcMaxConcurrentClientNum() {
427437
return rpcMaxConcurrentClientNum;
428438
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio
182182
properties.getProperty(
183183
"series_partition_executor_class", conf.getSeriesPartitionExecutorClass()));
184184

185+
conf.setDataPartitionAllocationStrategy(
186+
properties.getProperty(
187+
"data_partition_allocation_strategy", conf.getDataPartitionAllocationStrategy()));
188+
185189
conf.setConfigNodeConsensusProtocolClass(
186190
properties.getProperty(
187191
"config_node_consensus_protocol_class", conf.getConfigNodeConsensusProtocolClass()));

0 commit comments

Comments
 (0)