Skip to content

Commit 0aed7f8

Browse files
Region operation "extend" and "remove" support multi regions in one SQL (apache#16196)
1 parent edb1719 commit 0aed7f8

File tree

15 files changed

+434
-57
lines changed

15 files changed

+434
-57
lines changed

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDBRegionGroupExpandAndShrinkForIoTV1IT.java

Lines changed: 328 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,14 @@
3737

3838
import java.sql.Connection;
3939
import java.sql.Statement;
40+
import java.util.ArrayList;
41+
import java.util.List;
4042
import java.util.Map;
4143
import java.util.Optional;
4244
import java.util.Set;
4345
import java.util.concurrent.TimeUnit;
4446
import java.util.function.Predicate;
47+
import java.util.stream.Collectors;
4548

4649
import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
4750

@@ -51,6 +54,8 @@ public class IoTDBRegionGroupExpandAndShrinkForIoTV1IT
5154
extends IoTDBRegionOperationReliabilityITFramework {
5255
private static final String EXPAND_FORMAT = "extend region %d to %d";
5356
private static final String SHRINK_FORMAT = "remove region %d from %d";
57+
private static final String MULTI_EXPAND_FORMAT = "extend region %s to %d";
58+
private static final String MULTI_SHRINK_FORMAT = "remove region %s from %d";
5459

5560
private static Logger LOGGER =
5661
LoggerFactory.getLogger(IoTDBRegionGroupExpandAndShrinkForIoTV1IT.class);
@@ -65,7 +70,7 @@ public class IoTDBRegionGroupExpandAndShrinkForIoTV1IT
6570
* <p>4. Check
6671
*/
6772
@Test
68-
public void normal1C5DTest() throws Exception {
73+
public void singleRegionTest() throws Exception {
6974
EnvFactory.getEnv()
7075
.getConfig()
7176
.getCommonConfig()
@@ -181,4 +186,326 @@ private void regionGroupShrink(
181186

182187
LOGGER.info("Region {} has shrunk from DataNode {}", selectedRegion, targetDataNode);
183188
}
189+
190+
/**
191+
* Test multi-region expand and shrink operations with normal flow: 1. Multi-expand: expand
192+
* multiple regions to target DataNode 2. Multi-shrink: shrink multiple regions from target
193+
* DataNode
194+
*/
195+
@Test
196+
public void multiRegionNormalTest() throws Exception {
197+
EnvFactory.getEnv()
198+
.getConfig()
199+
.getCommonConfig()
200+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
201+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
202+
.setDataReplicationFactor(1)
203+
.setSchemaReplicationFactor(1);
204+
205+
EnvFactory.getEnv().initClusterEnvironment(1, 5);
206+
207+
try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
208+
final Statement statement = makeItCloseQuietly(connection.createStatement());
209+
SyncConfigNodeIServiceClient client =
210+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
211+
// prepare data
212+
statement.execute(INSERTION1);
213+
statement.execute(FLUSH_COMMAND);
214+
215+
// collect necessary information
216+
Map<Integer, Set<Integer>> regionMap = getAllRegionMap(statement);
217+
Set<Integer> allDataNodeId = getAllDataNodes(statement);
218+
219+
// expect one data region, one schema region
220+
// plus one system data region, one system schema region
221+
Assert.assertEquals(4, regionMap.size());
222+
223+
// select multiple regions for testing
224+
List<Integer> selectedRegions = new ArrayList<>(regionMap.keySet());
225+
selectedRegions = selectedRegions.subList(0, Math.min(3, selectedRegions.size()));
226+
227+
// find target DataNode that doesn't contain any of the selected regions
228+
int targetDataNode =
229+
findDataNodeNotContainsAnyRegion(allDataNodeId, regionMap, selectedRegions);
230+
231+
LOGGER.info("Selected regions for multi-region test: {}", selectedRegions);
232+
LOGGER.info("Target DataNode: {}", targetDataNode);
233+
234+
// multi-expand: expand all selected regions to target DataNode
235+
multiRegionGroupExpand(statement, client, selectedRegions, targetDataNode);
236+
237+
// verify expand result
238+
regionMap = getAllRegionMap(statement);
239+
for (int regionId : selectedRegions) {
240+
Assert.assertTrue(
241+
"Region " + regionId + " should contain target DataNode " + targetDataNode,
242+
regionMap.get(regionId).contains(targetDataNode));
243+
}
244+
LOGGER.info("Multi-region expand test passed");
245+
246+
// multi-shrink: shrink all selected regions from target DataNode
247+
multiRegionGroupShrink(statement, client, selectedRegions, targetDataNode);
248+
249+
// verify shrink result
250+
regionMap = getAllRegionMap(statement);
251+
for (int regionId : selectedRegions) {
252+
Assert.assertFalse(
253+
"Region " + regionId + " should not contain target DataNode " + targetDataNode,
254+
regionMap.get(regionId).contains(targetDataNode));
255+
}
256+
LOGGER.info("Multi-region shrink test passed");
257+
}
258+
}
259+
260+
/** Test multi-region expand with partial regions already in target DataNode */
261+
@Test
262+
public void multiRegionExpandPartialExistTest() throws Exception {
263+
EnvFactory.getEnv()
264+
.getConfig()
265+
.getCommonConfig()
266+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
267+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
268+
.setDataReplicationFactor(1)
269+
.setSchemaReplicationFactor(1);
270+
271+
EnvFactory.getEnv().initClusterEnvironment(1, 5);
272+
273+
try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
274+
final Statement statement = makeItCloseQuietly(connection.createStatement());
275+
SyncConfigNodeIServiceClient client =
276+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
277+
// prepare data
278+
statement.execute(INSERTION1);
279+
statement.execute(FLUSH_COMMAND);
280+
281+
Map<Integer, Set<Integer>> regionMap = getAllRegionMap(statement);
282+
Set<Integer> allDataNodeId = getAllDataNodes(statement);
283+
284+
List<Integer> allRegions = new ArrayList<>(regionMap.keySet());
285+
List<Integer> selectedRegions = allRegions.subList(0, Math.min(3, allRegions.size()));
286+
287+
int targetDataNode =
288+
findDataNodeNotContainsAnyRegion(allDataNodeId, regionMap, selectedRegions);
289+
290+
// first expand some regions individually
291+
List<Integer> preExpandRegions =
292+
selectedRegions.subList(0, Math.min(2, selectedRegions.size()));
293+
for (int regionId : preExpandRegions) {
294+
regionGroupExpand(statement, client, regionId, targetDataNode);
295+
}
296+
297+
// now try to expand all regions (including already expanded ones)
298+
LOGGER.info(
299+
"Testing multi-expand with regions {} to DataNode {}, where {} already exist",
300+
selectedRegions,
301+
targetDataNode,
302+
preExpandRegions);
303+
304+
multiRegionGroupExpand(statement, client, selectedRegions, targetDataNode);
305+
306+
// verify all regions are in target DataNode
307+
regionMap = getAllRegionMap(statement);
308+
for (int regionId : selectedRegions) {
309+
Assert.assertTrue(
310+
"Region " + regionId + " should contain target DataNode " + targetDataNode,
311+
regionMap.get(regionId).contains(targetDataNode));
312+
}
313+
LOGGER.info("Multi-region expand partial exist test passed");
314+
}
315+
}
316+
317+
/** Test multi-region shrink with partial regions not in target DataNode */
318+
@Test
319+
public void multiRegionShrinkPartialNotExistTest() throws Exception {
320+
EnvFactory.getEnv()
321+
.getConfig()
322+
.getCommonConfig()
323+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
324+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
325+
.setDataReplicationFactor(1)
326+
.setSchemaReplicationFactor(1);
327+
328+
EnvFactory.getEnv().initClusterEnvironment(1, 5);
329+
330+
try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
331+
final Statement statement = makeItCloseQuietly(connection.createStatement());
332+
SyncConfigNodeIServiceClient client =
333+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
334+
// prepare data
335+
statement.execute(INSERTION1);
336+
statement.execute(FLUSH_COMMAND);
337+
338+
Map<Integer, Set<Integer>> regionMap = getAllRegionMap(statement);
339+
Set<Integer> allDataNodeId = getAllDataNodes(statement);
340+
341+
List<Integer> allRegions = new ArrayList<>(regionMap.keySet());
342+
List<Integer> selectedRegions = allRegions.subList(0, Math.min(3, allRegions.size()));
343+
344+
int targetDataNode =
345+
findDataNodeNotContainsAnyRegion(allDataNodeId, regionMap, selectedRegions);
346+
347+
// first expand all regions to target DataNode
348+
multiRegionGroupExpand(statement, client, selectedRegions, targetDataNode);
349+
350+
// then shrink some regions individually
351+
List<Integer> preShrinkRegions =
352+
selectedRegions.subList(0, Math.min(2, selectedRegions.size()));
353+
for (int regionId : preShrinkRegions) {
354+
regionGroupShrink(statement, client, regionId, targetDataNode);
355+
}
356+
357+
// now try to shrink all regions (including already shrunk ones)
358+
LOGGER.info(
359+
"Testing multi-shrink with regions {} from DataNode {}, where {} already removed",
360+
selectedRegions,
361+
targetDataNode,
362+
preShrinkRegions);
363+
364+
multiRegionGroupShrink(statement, client, selectedRegions, targetDataNode);
365+
366+
// verify all regions are not in target DataNode
367+
regionMap = getAllRegionMap(statement);
368+
for (int regionId : selectedRegions) {
369+
Assert.assertFalse(
370+
"Region " + regionId + " should not contain target DataNode " + targetDataNode,
371+
regionMap.get(regionId).contains(targetDataNode));
372+
}
373+
LOGGER.info("Multi-region shrink partial not exist test passed");
374+
}
375+
}
376+
377+
private void multiRegionGroupExpand(
378+
Statement statement,
379+
SyncConfigNodeIServiceClient client,
380+
List<Integer> regionIds,
381+
int targetDataNode)
382+
throws Exception {
383+
String command = buildMultiRegionCommand(MULTI_EXPAND_FORMAT, regionIds, targetDataNode);
384+
385+
Predicate<TShowRegionResp> expandPredicate =
386+
tShowRegionResp -> {
387+
Map<Integer, Set<Integer>> newRegionMap =
388+
getRunningRegionMap(tShowRegionResp.getRegionInfoList());
389+
return regionIds.stream()
390+
.allMatch(
391+
regionId -> {
392+
Set<Integer> dataNodes = newRegionMap.get(regionId);
393+
return dataNodes != null && dataNodes.contains(targetDataNode);
394+
});
395+
};
396+
397+
executeMultiRegionOperation(
398+
statement,
399+
client,
400+
command,
401+
regionIds,
402+
expandPredicate,
403+
Optional.of(targetDataNode),
404+
Optional.empty(),
405+
"expand");
406+
}
407+
408+
private void multiRegionGroupShrink(
409+
Statement statement,
410+
SyncConfigNodeIServiceClient client,
411+
List<Integer> regionIds,
412+
int targetDataNode)
413+
throws Exception {
414+
String command = buildMultiRegionCommand(MULTI_SHRINK_FORMAT, regionIds, targetDataNode);
415+
416+
Predicate<TShowRegionResp> shrinkPredicate =
417+
tShowRegionResp -> {
418+
Map<Integer, Set<Integer>> newRegionMap =
419+
getRegionMap(tShowRegionResp.getRegionInfoList());
420+
return regionIds.stream()
421+
.allMatch(
422+
regionId -> {
423+
Set<Integer> dataNodes = newRegionMap.get(regionId);
424+
return dataNodes == null || !dataNodes.contains(targetDataNode);
425+
});
426+
};
427+
428+
executeMultiRegionOperation(
429+
statement,
430+
client,
431+
command,
432+
regionIds,
433+
shrinkPredicate,
434+
Optional.empty(),
435+
Optional.of(targetDataNode),
436+
"shrink");
437+
}
438+
439+
private String buildMultiRegionCommand(
440+
String format, List<Integer> regionIds, int targetDataNode) {
441+
String regionIdStr = regionIds.stream().map(String::valueOf).collect(Collectors.joining(","));
442+
return String.format(format, regionIdStr, targetDataNode);
443+
}
444+
445+
private void executeMultiRegionOperation(
446+
Statement statement,
447+
SyncConfigNodeIServiceClient client,
448+
String command,
449+
List<Integer> regionIds,
450+
Predicate<TShowRegionResp> predicate,
451+
Optional<Integer> expectedDataNode,
452+
Optional<Integer> notExpectedDataNode,
453+
String operationType) {
454+
455+
LOGGER.info("Executing multi-region {} command: {}", operationType, command);
456+
457+
Awaitility.await()
458+
.atMost(30, TimeUnit.SECONDS)
459+
.pollInterval(2, TimeUnit.SECONDS)
460+
.until(
461+
() -> {
462+
try {
463+
statement.execute(command);
464+
return true;
465+
} catch (Exception e) {
466+
String errorMessage = e.getMessage();
467+
// If error message contains both "successfully submitted" and "failed to submit",
468+
// consider it as partial success and continue
469+
if (errorMessage != null
470+
&& errorMessage.contains("successfully submitted")
471+
&& errorMessage.contains("failed to submit")) {
472+
LOGGER.warn(
473+
"Multi-region {} partially succeeded: {}", operationType, errorMessage);
474+
return true;
475+
}
476+
LOGGER.warn(
477+
"Multi-region {} command execution failed, retrying: {}",
478+
operationType,
479+
errorMessage);
480+
return false;
481+
}
482+
});
483+
484+
// Use the first region for awaitUntilSuccess (framework limitation)
485+
awaitUntilSuccess(client, regionIds.get(0), predicate, expectedDataNode, notExpectedDataNode);
486+
487+
String targetDescription =
488+
expectedDataNode.isPresent()
489+
? "to DataNode " + expectedDataNode.get()
490+
: "from DataNode " + notExpectedDataNode.get();
491+
LOGGER.info(
492+
"Regions {} have {} {}",
493+
regionIds,
494+
operationType.equals("expand") ? "expanded" : "shrunk",
495+
targetDescription);
496+
}
497+
498+
private int findDataNodeNotContainsAnyRegion(
499+
Set<Integer> allDataNodeId, Map<Integer, Set<Integer>> regionMap, List<Integer> regionIds) {
500+
return allDataNodeId.stream()
501+
.filter(
502+
dataNodeId ->
503+
regionIds.stream()
504+
.noneMatch(regionId -> regionMap.get(regionId).contains(dataNodeId)))
505+
.findFirst()
506+
.orElseThrow(
507+
() ->
508+
new RuntimeException(
509+
"Cannot find DataNode that doesn't contain any of the regions"));
510+
}
184511
}

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -541,11 +541,11 @@ reconstructRegion
541541
;
542542

543543
extendRegion
544-
: EXTEND REGION regionId=INTEGER_LITERAL TO targetDataNodeId=INTEGER_LITERAL
544+
: EXTEND REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* TO targetDataNodeId=INTEGER_LITERAL
545545
;
546546

547547
removeRegion
548-
: REMOVE REGION regionId=INTEGER_LITERAL FROM targetDataNodeId=INTEGER_LITERAL
548+
: REMOVE REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* FROM targetDataNodeId=INTEGER_LITERAL
549549
;
550550

551551
verifyConnection

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2489,15 +2489,15 @@ public TSStatus reconstructRegion(TReconstructRegionReq req) {
24892489
public TSStatus extendRegion(TExtendRegionReq req) {
24902490
TSStatus status = confirmLeader();
24912491
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
2492-
? procedureManager.extendRegion(req)
2492+
? procedureManager.extendRegions(req)
24932493
: status;
24942494
}
24952495

24962496
@Override
24972497
public TSStatus removeRegion(TRemoveRegionReq req) {
24982498
TSStatus status = confirmLeader();
24992499
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
2500-
? procedureManager.removeRegion(req)
2500+
? procedureManager.removeRegions(req)
25012501
: status;
25022502
}
25032503

0 commit comments

Comments
 (0)