Skip to content

Commit 8889255

Browse files
Load: Implement region operations cache for load × region migration detection (apache#15210)
Co-authored-by: Steve Yurong Su <rong@apache.org>
1 parent c746da2 commit 8889255

File tree

8 files changed

+133
-33
lines changed

8 files changed

+133
-33
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@
167167
import java.util.concurrent.ScheduledExecutorService;
168168
import java.util.concurrent.locks.ReentrantLock;
169169
import java.util.stream.Collectors;
170+
import java.util.stream.Stream;
170171

171172
public class ProcedureManager {
172173
private static final Logger LOGGER = LoggerFactory.getLogger(ProcedureManager.class);
@@ -709,16 +710,19 @@ private TSStatus checkMigrateRegion(
709710
TDataNodeLocation originalDataNode,
710711
TDataNodeLocation destDataNode,
711712
TDataNodeLocation coordinatorForAddPeer) {
712-
String failMessage =
713-
regionOperationCommonCheck(
714-
regionGroupId,
715-
destDataNode,
716-
Arrays.asList(
717-
new Pair<>("Original DataNode", originalDataNode),
718-
new Pair<>("Destination DataNode", destDataNode),
719-
new Pair<>("Coordinator for add peer", coordinatorForAddPeer)),
720-
migrateRegionReq.getModel());
721-
if (configManager
713+
String failMessage;
714+
if ((failMessage =
715+
regionOperationCommonCheck(
716+
regionGroupId,
717+
destDataNode,
718+
Arrays.asList(
719+
new Pair<>("Original DataNode", originalDataNode),
720+
new Pair<>("Destination DataNode", destDataNode),
721+
new Pair<>("Coordinator for add peer", coordinatorForAddPeer)),
722+
migrateRegionReq.getModel()))
723+
!= null) {
724+
// do nothing
725+
} else if (configManager
722726
.getPartitionManager()
723727
.getAllReplicaSets(originalDataNode.getDataNodeId())
724728
.stream()
@@ -955,10 +959,7 @@ private String checkRegionOperationWithRemoveDataNode(
955959

956960
private String checkRegionOperationDuplication(TConsensusGroupId regionId) {
957961
List<? extends RegionOperationProcedure<?>> otherRegionMemberChangeProcedures =
958-
getExecutor().getProcedures().values().stream()
959-
.filter(procedure -> !procedure.isFinished())
960-
.filter(procedure -> procedure instanceof RegionOperationProcedure)
961-
.map(procedure -> (RegionOperationProcedure<?>) procedure)
962+
getRegionOperationProcedures()
962963
.filter(
963964
regionMemberChangeProcedure ->
964965
regionId.equals(regionMemberChangeProcedure.getRegionId()))
@@ -971,6 +972,20 @@ private String checkRegionOperationDuplication(TConsensusGroupId regionId) {
971972
return null;
972973
}
973974

975+
public List<TConsensusGroupId> getRegionOperationConsensusIds() {
976+
return getRegionOperationProcedures()
977+
.map(RegionOperationProcedure::getRegionId)
978+
.distinct()
979+
.collect(Collectors.toList());
980+
}
981+
982+
private Stream<RegionOperationProcedure<?>> getRegionOperationProcedures() {
983+
return getExecutor().getProcedures().values().stream()
984+
.filter(procedure -> !procedure.isFinished())
985+
.filter(procedure -> procedure instanceof RegionOperationProcedure)
986+
.map(procedure -> (RegionOperationProcedure<?>) procedure);
987+
}
988+
974989
private String checkRegionOperationModelCorrectness(TConsensusGroupId regionId, Model model) {
975990
String databaseName = configManager.getPartitionManager().getRegionDatabase(regionId);
976991
boolean isTreeModelDatabase = databaseName.startsWith(SqlConstant.TREE_MODEL_DATABASE_PREFIX);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
4242
import org.apache.iotdb.confignode.manager.node.NodeManager;
4343
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
44+
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
4445
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
4546

4647
import org.apache.tsfile.utils.Pair;
@@ -141,6 +142,11 @@ private TDataNodeHeartbeatReq genHeartbeatReq() {
141142
/* Generate heartbeat request */
142143
TDataNodeHeartbeatReq heartbeatReq = new TDataNodeHeartbeatReq();
143144
heartbeatReq.setHeartbeatTimestamp(System.nanoTime());
145+
heartbeatReq.setLogicalClock(
146+
configManager
147+
.getConsensusManager()
148+
.getConsensusImpl()
149+
.getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID));
144150
// Always sample RegionGroups' leadership as the Region heartbeat
145151
heartbeatReq.setNeedJudgeLeader(true);
146152
// We sample DataNode's load in every 10 heartbeat loop
@@ -169,6 +175,12 @@ private TDataNodeHeartbeatReq genHeartbeatReq() {
169175
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
170176
}
171177

178+
// We broadcast region operations list every 100 heartbeat loops
179+
if (heartbeatCounter.get() % 100 == 0) {
180+
heartbeatReq.setCurrentRegionOperations(
181+
configManager.getProcedureManager().getRegionOperationConsensusIds());
182+
}
183+
172184
/* Update heartbeat counter */
173185
heartbeatCounter.getAndIncrement();
174186

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
6060
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
6161
import org.apache.iotdb.consensus.exception.ConsensusException;
62+
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
6263
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
6364
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
6465
import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
@@ -486,7 +487,15 @@ public List<TSStatus> notifyRegionMigrationToAllDataNodes(
486487
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
487488
configManager.getNodeManager().getRegisteredDataNodeLocations();
488489
final TNotifyRegionMigrationReq request =
489-
new TNotifyRegionMigrationReq(consensusGroupId, isStart);
490+
new TNotifyRegionMigrationReq(
491+
configManager
492+
.getConsensusManager()
493+
.getConsensusImpl()
494+
.getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID),
495+
System.nanoTime(),
496+
configManager.getProcedureManager().getRegionOperationConsensusIds());
497+
request.setRegionId(consensusGroupId);
498+
request.setIsStart(isStart);
490499

491500
final DataNodeAsyncRequestContext<TNotifyRegionMigrationReq, TSStatus> clientHandler =
492501
new DataNodeAsyncRequestContext<>(

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat
8181
"[pid{}][MigrateRegion] started, {} will be migrated from DataNode {} to {}.",
8282
getProcId(),
8383
regionId,
84-
handler.simplifiedLocation(originalDataNode),
85-
handler.simplifiedLocation(destDataNode));
84+
RegionMaintainHandler.simplifiedLocation(originalDataNode),
85+
RegionMaintainHandler.simplifiedLocation(destDataNode));
8686
addChildProcedure(new NotifyRegionMigrationProcedure(regionId, true));
8787
setNextState(RegionTransitionState.ADD_REGION_PEER);
8888
break;
@@ -120,8 +120,8 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat
120120
getProcId(),
121121
cleanHint,
122122
regionId,
123-
handler.simplifiedLocation(originalDataNode),
124-
handler.simplifiedLocation(destDataNode),
123+
RegionMaintainHandler.simplifiedLocation(originalDataNode),
124+
RegionMaintainHandler.simplifiedLocation(destDataNode),
125125
CommonDateTimeUtils.convertMillisecondToDurationStr(
126126
System.currentTimeMillis() - getSubmittedTime()),
127127
DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1949,6 +1949,15 @@ public TDataNodeHeartbeatResp getDataNodeHeartBeat(TDataNodeHeartbeatReq req) th
19491949
clusterTopology.updateTopology(req.getDataNodes(), req.getTopology());
19501950
}
19511951

1952+
if (req.isSetCurrentRegionOperations()) {
1953+
RegionMigrateService.getInstance()
1954+
.notifyRegionMigration(
1955+
new TNotifyRegionMigrationReq(
1956+
req.getLogicalClock(),
1957+
req.getHeartbeatTimestamp(),
1958+
req.getCurrentRegionOperations()));
1959+
}
1960+
19521961
return resp;
19531962
}
19541963

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,10 @@ public void start() {
240240
}
241241
}
242242

243-
if (RegionMigrateService.getInstance().getLastNotifyTime() > startTimeMs) {
243+
if (RegionMigrateService.getInstance().getLastNotifyMigratingTime() > startTimeMs
244+
|| RegionMigrateService.getInstance().mayHaveMigratingRegions()) {
244245
LOGGER.warn(
245-
"LoadTsFileScheduler: Region migration started or ended during loading TsFile {}, will convert to insertion to avoid data loss",
246+
"LoadTsFileScheduler: Region migration was detected during loading TsFile {}, will convert to insertion to avoid data loss",
246247
filePath);
247248
isLoadSingleTsFileSuccess = false;
248249
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@
4949
import org.slf4j.Logger;
5050
import org.slf4j.LoggerFactory;
5151

52+
import java.util.Collections;
5253
import java.util.HashMap;
5354
import java.util.List;
5455
import java.util.Map;
5556
import java.util.concurrent.ConcurrentHashMap;
5657
import java.util.concurrent.ExecutorService;
57-
import java.util.concurrent.atomic.AtomicLong;
5858
import java.util.stream.Collectors;
5959

6060
public class RegionMigrateService implements IService {
@@ -75,27 +75,76 @@ public class RegionMigrateService implements IService {
7575
private static final ConcurrentHashMap<Long, TRegionMigrateResult> taskResultMap =
7676
new ConcurrentHashMap<>();
7777

78-
private static final AtomicLong lastNotifyTime = new AtomicLong(Long.MIN_VALUE);
79-
8078
private static final TRegionMigrateResult unfinishedResult = new TRegionMigrateResult();
8179

80+
private static class RegionMigrationStatusCache {
81+
82+
private long logicalClock = -1;
83+
private long timestamp = -1;
84+
85+
private List<TConsensusGroupId> currentMigratingConsensusGroupIds = Collections.emptyList();
86+
87+
private long lastNotifyMigratingTime = Long.MIN_VALUE;
88+
89+
public synchronized void update(
90+
long newLogicalClock, long newTimestamp, List<TConsensusGroupId> currentRegionOperations) {
91+
if (newLogicalClock < logicalClock
92+
|| (newLogicalClock == logicalClock && newTimestamp <= timestamp)) {
93+
return;
94+
}
95+
logicalClock = newLogicalClock;
96+
timestamp = newTimestamp;
97+
98+
currentMigratingConsensusGroupIds = currentRegionOperations;
99+
100+
if (currentRegionOperations != null && !currentRegionOperations.isEmpty()) {
101+
lastNotifyMigratingTime = System.currentTimeMillis();
102+
}
103+
}
104+
105+
public synchronized void notifyMigrating() {
106+
lastNotifyMigratingTime = System.currentTimeMillis();
107+
}
108+
109+
public synchronized boolean hasMigratingRegions() {
110+
return currentMigratingConsensusGroupIds != null
111+
&& !currentMigratingConsensusGroupIds.isEmpty();
112+
}
113+
114+
public synchronized long getLastNotifyMigratingTime() {
115+
return lastNotifyMigratingTime;
116+
}
117+
}
118+
119+
private final RegionMigrationStatusCache regionMigrationStatusCache =
120+
new RegionMigrationStatusCache();
121+
82122
private RegionMigrateService() {}
83123

84124
public static RegionMigrateService getInstance() {
85125
return Holder.INSTANCE;
86126
}
87127

88128
public void notifyRegionMigration(TNotifyRegionMigrationReq req) {
89-
lastNotifyTime.set(System.currentTimeMillis());
90-
if (req.isIsStart()) {
91-
LOGGER.info("Region {} is notified to begin migrating", req.getRegionId());
92-
} else {
93-
LOGGER.info("Region {} is notified to finish migrating", req.getRegionId());
129+
regionMigrationStatusCache.update(
130+
req.getLogicalClock(), req.getTimestamp(), req.getCurrentRegionOperations());
131+
132+
if (req.isSetIsStart() && req.isSetRegionId()) {
133+
regionMigrationStatusCache.notifyMigrating();
134+
if (req.isIsStart()) {
135+
LOGGER.info("Region {} is notified to begin migrating", req.getRegionId());
136+
} else {
137+
LOGGER.info("Region {} is notified to finish migrating", req.getRegionId());
138+
}
94139
}
95140
}
96141

97-
public long getLastNotifyTime() {
98-
return lastNotifyTime.get();
142+
public long getLastNotifyMigratingTime() {
143+
return regionMigrationStatusCache.getLastNotifyMigratingTime();
144+
}
145+
146+
public boolean mayHaveMigratingRegions() {
147+
return regionMigrationStatusCache.hasMigratingRegions();
99148
}
100149

101150
/**

iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ struct TRegionMigrateResult {
5353
}
5454

5555
struct TNotifyRegionMigrationReq {
56-
1: required common.TConsensusGroupId regionId
57-
2: required bool isStart
56+
1: required i64 logicalClock
57+
2: required i64 timestamp
58+
3: optional common.TConsensusGroupId regionId
59+
4: optional bool isStart
60+
5: required list<common.TConsensusGroupId> currentRegionOperations
5861
}
5962

6063
struct TCreatePeerReq {
@@ -279,6 +282,8 @@ struct TDataNodeHeartbeatReq {
279282
11: optional set<common.TEndPoint> configNodeEndPoints
280283
12: optional map<i32, common.TDataNodeLocation> dataNodes
281284
13: optional map<i32, set<i32>> topology
285+
14: required i64 logicalClock
286+
15: optional list<common.TConsensusGroupId> currentRegionOperations
282287
}
283288

284289
struct TDataNodeActivation {

0 commit comments

Comments
 (0)