Skip to content

Commit b8fd089

Browse files
authored
IoTV2: Refactor replicate index so that it is shared at the pipe task level & Add some log for delete local peer (apache#15815)
* use custom replicate index for each consensus pipe * test conf * fix * Revert "test conf" This reverts commit f0f13af. * fix review
1 parent 88abd3e commit b8fd089

File tree

12 files changed

+55
-102
lines changed

12 files changed

+55
-102
lines changed

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,13 +375,14 @@ public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException
375375
if (!stateMachineMap.containsKey(groupId)) {
376376
throw new ConsensusGroupNotExistException(groupId);
377377
}
378-
378+
LOGGER.info("[{}] start to delete local peer for group {}", CLASS_NAME, groupId);
379379
final PipeConsensusServerImpl consensus = stateMachineMap.get(groupId);
380380
consensus.clear();
381381
stateMachineMap.remove(groupId);
382382

383383
FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
384384
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
385+
LOGGER.info("[{}] finish deleting local peer for group {}", CLASS_NAME, groupId);
385386
} finally {
386387
stateMachineMapLock.readLock().unlock();
387388
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.iotdb.consensus.pipe.consensuspipe;
2020

2121
public interface ConsensusPipeConnector {
22-
long getConsensusPipeCommitProgress();
22+
long getLeaderReplicateProgress();
2323

24-
long getConsensusPipeReplicateProgress();
24+
long getFollowerApplyProgress();
2525
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ public interface ReplicateProgressManager {
3232
long getSyncLagForSpecificConsensusPipe(
3333
ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName);
3434

35-
void pinCommitIndexForMigration(
35+
void pinReplicateIndexForRegionMigration(
3636
ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName);
3737
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ public long getSyncLagForRegionMigration(
4848
return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
4949
.map(
5050
consensusPipeConnector ->
51-
Math.max(
52-
pinnedCommitIndex - consensusPipeConnector.getConsensusPipeReplicateProgress(),
53-
0L))
51+
Math.max(pinnedCommitIndex - consensusPipeConnector.getFollowerApplyProgress(), 0L))
5452
.orElse(0L);
5553
}
5654

@@ -62,16 +60,16 @@ public long getSyncLagForSpecificConsensusPipe(ConsensusPipeName consensusPipeNa
6260
return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
6361
.map(
6462
consensusPipeConnector -> {
65-
long userWriteProgress = consensusPipeConnector.getConsensusPipeCommitProgress();
66-
long replicateProgress = consensusPipeConnector.getConsensusPipeReplicateProgress();
63+
long userWriteProgress = consensusPipeConnector.getLeaderReplicateProgress();
64+
long replicateProgress = consensusPipeConnector.getFollowerApplyProgress();
6765
return Math.max(userWriteProgress - replicateProgress, 0L);
6866
})
6967
.orElse(0L);
7068
}
7169

72-
public long getCurrentCommitIndex(ConsensusPipeName consensusPipeName) {
70+
public long getCurrentLeaderReplicateIndex(ConsensusPipeName consensusPipeName) {
7371
return Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
74-
.map(ConsensusPipeConnector::getConsensusPipeCommitProgress)
72+
.map(ConsensusPipeConnector::getLeaderReplicateProgress)
7573
.orElse(0L);
7674
}
7775

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
3232
import org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor;
3333
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
34+
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
3435
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
3536
import org.apache.iotdb.db.storageengine.StorageEngine;
3637
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -201,6 +202,9 @@ public synchronized void deregister(
201202
}
202203

203204
PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, regionId);
205+
// Reset IoTV2 replicate index to prevent index jumps. Do this when a consensus pipe no longer
206+
// replicates data, since extractor and processor are already dropped now.
207+
ReplicateProgressDataNodeManager.resetReplicateIndexForIoTV2(pipeName);
204208
}
205209

206210
public synchronized void start(final String attributeSortedString) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
2929
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
3030
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
31-
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
3231
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
3332
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
3433
import org.apache.iotdb.commons.service.metric.MetricService;
@@ -39,7 +38,6 @@
3938
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
4039
import org.apache.iotdb.db.conf.IoTDBConfig;
4140
import org.apache.iotdb.db.conf.IoTDBDescriptor;
42-
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
4341
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusDeleteEventHandler;
4442
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletBatchEventHandler;
4543
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
@@ -48,6 +46,7 @@
4846
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusDeleteNodeReq;
4947
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
5048
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
49+
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
5150
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
5251
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
5352
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -726,16 +725,12 @@ public int getRetryBufferSize() {
726725
}
727726

728727
@Override
729-
public long getConsensusPipeCommitProgress() {
730-
return PipeEventCommitManager.getInstance()
731-
.getGivenConsensusPipeCommitId(
732-
consensusPipeName,
733-
PipeDataNodeAgent.task().getPipeCreationTime(consensusPipeName),
734-
consensusGroupId);
728+
public long getLeaderReplicateProgress() {
729+
return ReplicateProgressDataNodeManager.getReplicateIndexForIoTV2(consensusPipeName);
735730
}
736731

737732
@Override
738-
public long getConsensusPipeReplicateProgress() {
733+
public long getFollowerApplyProgress() {
739734
return currentReplicateProgress;
740735
}
741736

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444

4545
public class ReplicateProgressDataNodeManager implements ReplicateProgressManager {
4646
private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
47-
private static final Map<String, AtomicLong> groupId2ReplicateIndex = new ConcurrentHashMap<>();
47+
private static final Map<String, AtomicLong> consensusPipe2ReplicateIndex =
48+
new ConcurrentHashMap<>();
4849
private final Map<ConsensusGroupId, ProgressIndex> groupId2MaxProgressIndex;
4950
private final Map<ConsensusPipeName, Long> consensusPipe2pinnedCommitIndexForMigration;
5051

@@ -55,12 +56,20 @@ public ReplicateProgressDataNodeManager() {
5556
recoverMaxProgressIndexFromDataRegion();
5657
}
5758

58-
public static long assignReplicateIndexForIoTV2(String groupId) {
59-
return groupId2ReplicateIndex
60-
.compute(groupId, (k, v) -> v == null ? new AtomicLong(0) : v)
59+
public static long assignReplicateIndexForIoTV2(String consensusPipeName) {
60+
return consensusPipe2ReplicateIndex
61+
.compute(consensusPipeName, (k, v) -> v == null ? new AtomicLong(0) : v)
6162
.incrementAndGet();
6263
}
6364

65+
public static void resetReplicateIndexForIoTV2(String consensusPipeName) {
66+
consensusPipe2ReplicateIndex.put(consensusPipeName, new AtomicLong(0));
67+
}
68+
69+
public static long getReplicateIndexForIoTV2(String consensusPipeName) {
70+
return consensusPipe2ReplicateIndex.getOrDefault(consensusPipeName, new AtomicLong(0)).get();
71+
}
72+
6473
public static ProgressIndex extractLocalSimpleProgressIndex(ProgressIndex progressIndex) {
6574
if (progressIndex instanceof RecoverProgressIndex) {
6675
final Map<Integer, SimpleProgressIndex> dataNodeId2LocalIndex =
@@ -151,11 +160,11 @@ public long getSyncLagForSpecificConsensusPipe(
151160
}
152161

153162
@Override
154-
public void pinCommitIndexForMigration(
163+
public void pinReplicateIndexForRegionMigration(
155164
ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName) {
156165
this.consensusPipe2pinnedCommitIndexForMigration.put(
157166
consensusPipeName,
158167
PipeConsensusSyncLagManager.getInstance(consensusGroupId.toString())
159-
.getCurrentCommitIndex(consensusPipeName));
168+
.getCurrentLeaderReplicateIndex(consensusPipeName));
160169
}
161170
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,24 @@
2020
package org.apache.iotdb.db.pipe.event.realtime;
2121

2222
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
23-
import org.apache.iotdb.consensus.pipe.PipeConsensus;
24-
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
25-
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
2623
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
2724
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
2825
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
2926
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3027
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpochManager;
31-
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
3228
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
3329
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
3430
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
3531
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
3632
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3733
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
3834

39-
import org.slf4j.Logger;
40-
import org.slf4j.LoggerFactory;
41-
4235
import java.util.stream.Collectors;
4336

4437
public class PipeRealtimeEventFactory {
45-
46-
private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeEventFactory.class);
4738
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager();
4839

4940
public static PipeRealtimeEvent createRealtimeEvent(
50-
final String dataRegionId,
5141
final Boolean isTableModel,
5242
final String databaseNameFromDataRegion,
5343
final TsFileResource resource,
@@ -56,23 +46,10 @@ public static PipeRealtimeEvent createRealtimeEvent(
5646
new PipeTsFileInsertionEvent(
5747
isTableModel, databaseNameFromDataRegion, resource, isLoaded, false);
5848

59-
// if using IoTV2, assign a replicateIndex for this event
60-
if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
61-
&& PipeConsensusProcessor.isShouldReplicate(tsFileInsertionEvent)) {
62-
tsFileInsertionEvent.setReplicateIndexForIoTV2(
63-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
64-
LOGGER.info(
65-
"[Region{}]Set {} for event {}",
66-
dataRegionId,
67-
tsFileInsertionEvent.getReplicateIndexForIoTV2(),
68-
tsFileInsertionEvent);
69-
}
70-
7149
return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(tsFileInsertionEvent, resource);
7250
}
7351

7452
public static PipeRealtimeEvent createRealtimeEvent(
75-
final String dataRegionId,
7653
final Boolean isTableModel,
7754
final String databaseNameFromDataRegion,
7855
final WALEntryHandler walEntryHandler,
@@ -98,18 +75,6 @@ public static PipeRealtimeEvent createRealtimeEvent(
9875
insertNode.isAligned(),
9976
insertNode.isGeneratedByPipe());
10077

101-
// if using IoTV2, assign a replicateIndex for this event
102-
if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
103-
&& PipeConsensusProcessor.isShouldReplicate(insertionEvent)) {
104-
insertionEvent.setReplicateIndexForIoTV2(
105-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
106-
LOGGER.info(
107-
"[Region{}]Set {} for event {}",
108-
dataRegionId,
109-
insertionEvent.getReplicateIndexForIoTV2(),
110-
insertionEvent);
111-
}
112-
11378
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
11479
insertionEvent, insertNode, resource);
11580
}
@@ -120,23 +85,10 @@ public static PipeRealtimeEvent createRealtimeEvent(
12085
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null);
12186
}
12287

123-
public static PipeRealtimeEvent createRealtimeEvent(
124-
final String dataRegionId, final AbstractDeleteDataNode node) {
88+
public static PipeRealtimeEvent createRealtimeEvent(final AbstractDeleteDataNode node) {
12589
PipeDeleteDataNodeEvent deleteDataNodeEvent =
12690
new PipeDeleteDataNodeEvent(node, node.isGeneratedByPipe());
12791

128-
// if using IoTV2, assign a replicateIndex for this event
129-
if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
130-
&& PipeConsensusProcessor.isShouldReplicate(deleteDataNodeEvent)) {
131-
deleteDataNodeEvent.setReplicateIndexForIoTV2(
132-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
133-
LOGGER.info(
134-
"[Region{}]Set {} for event {}",
135-
dataRegionId,
136-
deleteDataNodeEvent.getReplicateIndexForIoTV2(),
137-
deleteDataNodeEvent);
138-
}
139-
14092
return new PipeRealtimeEvent(deleteDataNodeEvent, null, null);
14193
}
14294

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -877,13 +877,9 @@ private Event supplyTsFileEvent(final TsFileResource resource) {
877877
if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
878878
&& PipeConsensusProcessor.isShouldReplicate(event)) {
879879
event.setReplicateIndexForIoTV2(
880-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
881-
resource.getDataRegionId()));
880+
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
882881
LOGGER.info(
883-
"[Region{}]Set {} for event {}",
884-
resource.getDataRegionId(),
885-
event.getReplicateIndexForIoTV2(),
886-
event);
882+
"[{}]Set {} for historical event {}", pipeName, event.getReplicateIndexForIoTV2(), event);
887883
}
888884

889885
if (sloppyPattern || isDbNameCoveredByPattern) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
2828
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
2929
import org.apache.iotdb.commons.utils.PathUtils;
30+
import org.apache.iotdb.consensus.pipe.PipeConsensus;
31+
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
32+
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
3033
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
3134
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
3235
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -40,6 +43,7 @@
4043
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.PipeDataRegionMatcher;
4144
import org.apache.iotdb.db.pipe.metric.source.PipeAssignerMetrics;
4245
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
46+
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
4347
import org.apache.iotdb.db.storageengine.StorageEngine;
4448
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
4549
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -191,8 +195,18 @@ private void assignToExtractor(
191195
extractor.getRealtimeDataExtractionStartTime(),
192196
extractor.getRealtimeDataExtractionEndTime());
193197
final EnrichedEvent innerEvent = copiedEvent.getEvent();
194-
// Bind replicateIndex for IoTV2
195-
innerEvent.setReplicateIndexForIoTV2(event.getEvent().getReplicateIndexForIoTV2());
198+
// if using IoTV2, assign a replicateIndex for this realtime event
199+
if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
200+
&& PipeConsensusProcessor.isShouldReplicate(innerEvent)) {
201+
innerEvent.setReplicateIndexForIoTV2(
202+
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
203+
extractor.getPipeName()));
204+
LOGGER.info(
205+
"[{}]Set {} for realtime event {}",
206+
extractor.getPipeName(),
207+
innerEvent.getReplicateIndexForIoTV2(),
208+
innerEvent);
209+
}
196210

197211
if (innerEvent instanceof PipeTsFileInsertionEvent) {
198212
final PipeTsFileInsertionEvent tsFileInsertionEvent =

0 commit comments

Comments
 (0)