Skip to content

Commit b34b714

Browse files
authored
[IoTConsensusV2 X Region Migration] Start consensus pipe in proper time rather than automatically start it when create it
1 parent 4fddd58 commit b34b714

File tree

8 files changed

+56
-14
lines changed

8 files changed

+56
-14
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,9 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
203203
}
204204

205205
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
206-
pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
206+
if (!createPipeRequest.needManuallyStart) {
207+
pipeRuntimeMeta.getStatus().set(PipeStatus.RUNNING);
208+
}
207209
}
208210

209211
@Override

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,8 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
332332
impl.setRemotePeerActive(peer, false);
333333

334334
// step 2: notify all the other Peers to create consensus pipes to newPeer
335-
// NOTE: For this step, coordinator(thisNode) will transfer its full data snapshot to target,
336-
// while other peers will only transmit data(may contain both historical and realtime data)
337-
// after the snapshot progress to target.
335+
// NOTE: For this step, coordinator(thisNode) will transfer its full data snapshot to target
336+
// while other peers record the coordinator's progress.
338337
LOGGER.info("[{}] notify current peers to create consensus pipes...", CLASS_NAME);
339338
impl.notifyPeersToCreateConsensusPipes(peer, impl.getThisNodePeer());
340339
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
@@ -343,7 +342,13 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
343342
LOGGER.info("[{}] wait until all the other peers finish transferring...", CLASS_NAME);
344343
impl.waitPeersToTargetPeerTransmissionCompleted(peer);
345344

346-
// step 4: active new Peer to let new Peer receive snapshot
345+
// step 4. start other peers' consensus pipe to target peer to transfer remaining data
346+
// NOTE: For this step, other peers will start to transmit data(may contain both historical
347+
// and realtime data) after the snapshot progress to target.
348+
LOGGER.info("[{}] start transfer remaining data from other peers", CLASS_NAME);
349+
impl.startOtherConsensusPipesToTargetPeer(peer);
350+
351+
// step 5: active new Peer to let new Peer receive client requests
347352
LOGGER.info("[{}] activate new peer...", CLASS_NAME);
348353
impl.setRemotePeerActive(peer, true);
349354
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ public void notifyPeersToCreateConsensusPipes(Peer targetPeer, Peer coordinatorP
425425
try {
426426
// This node which acts as coordinator will transfer complete historical snapshot to new
427427
// target.
428-
createConsensusPipeToTargetPeer(targetPeer, thisNode);
428+
createConsensusPipeToTargetPeer(targetPeer, thisNode, false);
429429
} catch (Exception e) {
430430
LOGGER.warn(
431431
"{} cannot create consensus pipe to {}, may because target peer is unknown currently, please manually check!",
@@ -437,11 +437,11 @@ public void notifyPeersToCreateConsensusPipes(Peer targetPeer, Peer coordinatorP
437437
}
438438

439439
public synchronized void createConsensusPipeToTargetPeer(
440-
Peer targetPeer, Peer regionMigrationCoordinatorPeer)
440+
Peer targetPeer, Peer regionMigrationCoordinatorPeer, boolean needManuallyStart)
441441
throws ConsensusGroupModifyPeerException {
442442
try {
443443
consensusPipeManager.createConsensusPipe(
444-
thisNode, targetPeer, regionMigrationCoordinatorPeer);
444+
thisNode, targetPeer, regionMigrationCoordinatorPeer, needManuallyStart);
445445
peerManager.addAndPersist(targetPeer);
446446
} catch (IOException e) {
447447
LOGGER.warn("{} cannot persist peer {}", thisNode, targetPeer, e);
@@ -510,6 +510,24 @@ public synchronized void dropConsensusPipeToTargetPeer(Peer targetPeer)
510510
}
511511
}
512512

513+
public void startOtherConsensusPipesToTargetPeer(Peer targetPeer)
514+
throws ConsensusGroupModifyPeerException {
515+
final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
516+
for (Peer peer : otherPeers) {
517+
if (peer.equals(targetPeer)) {
518+
continue;
519+
}
520+
try {
521+
consensusPipeManager.updateConsensusPipe(
522+
new ConsensusPipeName(peer, targetPeer), PipeStatus.RUNNING);
523+
} catch (Exception e) {
524+
LOGGER.warn("{} cannot start consensus pipe to {}", peer, targetPeer, e);
525+
throw new ConsensusGroupModifyPeerException(
526+
String.format("%s cannot start consensus pipe to %s", peer, targetPeer), e);
527+
}
528+
}
529+
}
530+
513531
public void waitPeersToTargetPeerTransmissionCompleted(Peer targetPeer)
514532
throws ConsensusGroupModifyPeerException {
515533
boolean isTransmissionCompleted = false;

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeDispatcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ void createPipe(
2626
String pipeName,
2727
Map<String, String> extractorAttributes,
2828
Map<String, String> processorAttributes,
29-
Map<String, String> connectorAttributes)
29+
Map<String, String> connectorAttributes,
30+
boolean needManuallyStart)
3031
throws Exception;
3132

3233
void startPipe(String pipeName) throws Exception;

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,29 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer) throws Excep
7171
Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, ImmutableMap<String, String>>
7272
params = buildPipeParams(senderPeer, receiverPeer, senderPeer);
7373
dispatcher.createPipe(
74-
consensusPipeName.toString(), params.getLeft(), params.getMiddle(), params.getRight());
74+
consensusPipeName.toString(),
75+
params.getLeft(),
76+
params.getMiddle(),
77+
params.getRight(),
78+
false);
7579
}
7680

7781
/** This method is used when executing region migration */
7882
public void createConsensusPipe(
79-
Peer senderPeer, Peer receiverPeer, Peer regionMigrationCoordinatorPeer) throws Exception {
83+
Peer senderPeer,
84+
Peer receiverPeer,
85+
Peer regionMigrationCoordinatorPeer,
86+
boolean needManuallyStart)
87+
throws Exception {
8088
ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, receiverPeer);
8189
Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, ImmutableMap<String, String>>
8290
params = buildPipeParams(senderPeer, receiverPeer, regionMigrationCoordinatorPeer);
8391
dispatcher.createPipe(
84-
consensusPipeName.toString(), params.getLeft(), params.getMiddle(), params.getRight());
92+
consensusPipeName.toString(),
93+
params.getLeft(),
94+
params.getMiddle(),
95+
params.getRight(),
96+
needManuallyStart);
8597
}
8698

8799
public Triple<

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public TNotifyPeerToCreateConsensusPipeResp notifyPeerToCreateConsensusPipe(
117117
new Peer(
118118
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId),
119119
req.coordinatorPeerNodeId,
120-
req.coordinatorPeerEndPoint));
120+
req.coordinatorPeerEndPoint),
121+
true);
121122
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
122123
} catch (ConsensusGroupModifyPeerException e) {
123124
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,15 @@ public void createPipe(
4949
String pipeName,
5050
Map<String, String> extractorAttributes,
5151
Map<String, String> processorAttributes,
52-
Map<String, String> connectorAttributes)
52+
Map<String, String> connectorAttributes,
53+
boolean needManuallyStart)
5354
throws Exception {
5455
try (ConfigNodeClient configNodeClient =
5556
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
5657
TCreatePipeReq req =
5758
new TCreatePipeReq()
5859
.setPipeName(pipeName)
60+
.setNeedManuallyStart(needManuallyStart)
5961
.setExtractorAttributes(extractorAttributes)
6062
.setProcessorAttributes(processorAttributes)
6163
.setConnectorAttributes(connectorAttributes);

iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ struct TCreatePipeReq {
737737
3: optional map<string, string> processorAttributes
738738
4: required map<string, string> connectorAttributes
739739
5: optional bool ifNotExistsCondition
740+
6: optional bool needManuallyStart
740741
}
741742

742743
struct TAlterPipeReq {

0 commit comments

Comments
 (0)