Skip to content

Commit 174b2cb

Browse files
authored
IoTConsensusV2: Transfer table deletion without any parse or filter (apache#14988)
* transfer table deletion for iotv2 * minor fix
1 parent 82e9361 commit 174b2cb

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public class PipeEventCollector implements EventCollector {
5757

5858
private final boolean skipParseTsFile;
5959

60+
private final boolean isUsedForConsensusPipe;
61+
6062
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
6163
private boolean hasNoGeneratedEvent = true;
6264
private boolean isFailedToIncreaseReferenceCount = false;
@@ -66,12 +68,14 @@ public PipeEventCollector(
6668
final long creationTime,
6769
final int regionId,
6870
final boolean forceTabletFormat,
69-
final boolean skipParseTsFile) {
71+
final boolean skipParseTsFile,
72+
final boolean isUsedInConsensusPipe) {
7073
this.pendingQueue = pendingQueue;
7174
this.creationTime = creationTime;
7275
this.regionId = regionId;
7376
this.forceTabletFormat = forceTabletFormat;
7477
this.skipParseTsFile = skipParseTsFile;
78+
this.isUsedForConsensusPipe = isUsedInConsensusPipe;
7579
}
7680

7781
@Override
@@ -153,6 +157,13 @@ private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent parsed
153157
}
154158

155159
private void parseAndCollectEvent(final PipeDeleteDataNodeEvent deleteDataEvent) {
160+
// For IoTConsensusV2, there is no need to parse. So we can directly transfer deleteDataEvent
161+
if (isUsedForConsensusPipe) {
162+
hasNoGeneratedEvent = false;
163+
collectEvent(deleteDataEvent);
164+
return;
165+
}
166+
156167
// Only used by events containing delete data node, no need to bind progress index here since
157168
// delete data event does not have progress index currently
158169
(deleteDataEvent.getDeleteDataNode() instanceof DeleteDataNode

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2424
import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier;
2525
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
26+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
2627
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2728
import org.apache.iotdb.commons.pipe.agent.task.stage.PipeTaskStage;
2829
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
@@ -99,13 +100,15 @@ public PipeTaskProcessorStage(
99100
// removed, the new subtask will have the same pipeName and regionId as the
100101
// old one, so we need creationTime to make their hash code different in the map.
101102
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
103+
final boolean isUsedForConsensusPipe = pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX);
102104
final PipeEventCollector pipeConnectorOutputEventCollector =
103105
new PipeEventCollector(
104106
pipeConnectorOutputPendingQueue,
105107
creationTime,
106108
regionId,
107109
forceTabletFormat,
108-
skipParseTsFile);
110+
skipParseTsFile,
111+
isUsedForConsensusPipe);
109112
this.pipeProcessorSubtask =
110113
new PipeProcessorSubtask(
111114
taskId,

0 commit comments

Comments
 (0)