Skip to content

Commit 656b4fc

Browse files
authored
Pipe: Fixed the semantics for new regions & realtime / history only pipes (apache#16622)
* fix * complexityu * comp
1 parent dbf5583 commit 656b4fc

File tree

13 files changed

+234
-174
lines changed

13 files changed

+234
-174
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,14 +1003,10 @@ public int generateNextRegionGroupId() {
10031003
}
10041004

10051005
public Optional<TConsensusGroupId> generateTConsensusGroupIdByRegionId(final int regionId) {
1006-
if (configManager
1007-
.getPartitionManager()
1008-
.isRegionGroupExists(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
1006+
if (isRegionGroupExists(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId))) {
10091007
return Optional.of(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, regionId));
10101008
}
1011-
if (configManager
1012-
.getPartitionManager()
1013-
.isRegionGroupExists(new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
1009+
if (isRegionGroupExists(new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))) {
10141010
return Optional.of(new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId));
10151011
}
10161012
String msg =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
package org.apache.iotdb.confignode.persistence.pipe;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
2223
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2324
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
2425
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
2526
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
2627
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
28+
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
2729
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
2830
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMetaKeeper;
2931
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
@@ -621,13 +623,21 @@ private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan pla
621623
} else {
622624
consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId());
623625
}
624-
} else {
626+
} else if (!PipeTaskAgent.isHistoryOnlyPipe(
627+
pipeMeta.getStaticMeta().getSourceParameters())
628+
|| !consensusGroupId
629+
.getType()
630+
.equals(TConsensusGroupType.DataRegion)) {
625631
// If CN does not contain the region group, it means the data
626632
// region group is newly added.
633+
// We do not handle history only pipes for new data regions
634+
635+
// Newly added leader
627636
if (newLeader != -1) {
628637
consensusGroupIdToTaskMetaMap.put(
629638
consensusGroupId.getId(),
630-
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
639+
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader)
640+
.markAsNewlyAdded());
631641
}
632642
// else:
633643
// "The pipe task meta does not contain the data region group {} or

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
2323
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
2424
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
25+
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
2526
import org.apache.iotdb.confignode.manager.pipe.metric.overview.PipeProcedureMetrics;
2627
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
2728
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -34,6 +35,7 @@
3435
import org.apache.iotdb.confignode.service.ConfigNode;
3536
import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
3637
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
38+
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
3739
import org.apache.iotdb.pipe.api.exception.PipeException;
3840
import org.apache.iotdb.rpc.TSStatusCode;
3941

@@ -46,6 +48,7 @@
4648
import java.nio.ByteBuffer;
4749
import java.nio.charset.StandardCharsets;
4850
import java.util.ArrayList;
51+
import java.util.Collections;
4952
import java.util.List;
5053
import java.util.Map;
5154
import java.util.concurrent.TimeUnit;
@@ -532,6 +535,24 @@ protected Map<Integer, TPushPipeMetaResp> pushSinglePipeMetaToDataNodes(
532535
.serialize());
533536
}
534537

538+
protected Map<Integer, TPushPipeMetaResp> pushSinglePipeMetaToDataNodes4Realtime(
539+
String pipeName, ConfigNodeProcedureEnv env) throws IOException {
540+
final PipeMeta pipeMeta = pipeTaskInfo.get().getPipeMetaByPipeName(pipeName);
541+
// Note that although the altered pipe has progress in it,
542+
// if we alter it to realtime we should ignore the previous data
543+
if (!pipeMeta.getStaticMeta().isSourceExternal()) {
544+
pipeMeta
545+
.getStaticMeta()
546+
.getSourceParameters()
547+
.addOrReplaceEquivalentAttributes(
548+
new PipeParameters(
549+
Collections.singletonMap(
550+
SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, Boolean.FALSE.toString())));
551+
}
552+
return env.pushSinglePipeMetaToDataNodes(
553+
copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
554+
}
555+
535556
/**
536557
* Drop a pipe on all the dataNodes.
537558
*

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
2223
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24+
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
2325
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
2426
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
2527
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
@@ -164,11 +166,11 @@ public void executeFromCalculateInfoForTask(final ConfigNodeProcedureEnv env) {
164166
new ConcurrentHashMap<>();
165167
if (currentPipeStaticMeta.isSourceExternal()) {
166168
currentConsensusGroupId2PipeTaskMeta.forEach(
167-
(taskId, pipeTaskMeta) -> {
168-
updatedConsensusGroupIdToTaskMetaMap.put(
169-
taskId,
170-
new PipeTaskMeta(pipeTaskMeta.getProgressIndex(), pipeTaskMeta.getLeaderNodeId()));
171-
});
169+
(taskId, pipeTaskMeta) ->
170+
updatedConsensusGroupIdToTaskMetaMap.put(
171+
taskId,
172+
new PipeTaskMeta(
173+
pipeTaskMeta.getProgressIndex(), pipeTaskMeta.getLeaderNodeId())));
172174
} else {
173175
// data regions & schema regions
174176
env.getConfigManager()
@@ -185,12 +187,32 @@ public void executeFromCalculateInfoForTask(final ConfigNodeProcedureEnv env) {
185187
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")
186188
&& !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
187189
&& !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + ".")
188-
&& currentPipeTaskMeta != null
189-
&& currentPipeTaskMeta.getLeaderNodeId() == regionLeaderNodeId) {
190+
&& !(PipeTaskAgent.isHistoryOnlyPipe(
191+
currentPipeStaticMeta.getSourceParameters())
192+
&& PipeTaskAgent.isHistoryOnlyPipe(
193+
updatedPipeStaticMeta.getSourceParameters())
194+
&& regionGroupId.getType() == TConsensusGroupType.DataRegion
195+
&& currentPipeTaskMeta.isNewlyAdded())) {
190196
// Pipe only collect user's data, filter metric database here.
197+
// If it is altered to "pure historical", then the regionIds are always new here,
198+
// then it will extract all existing data now, not existing data since the
199+
// original pipe was created
200+
// Similar for "pure realtime"
191201
updatedConsensusGroupIdToTaskMetaMap.put(
192202
regionGroupId.getId(),
193-
new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), regionLeaderNodeId));
203+
new PipeTaskMeta(
204+
currentPipeTaskMeta.getProgressIndex(),
205+
PipeTaskMeta.isNewlyAdded(currentPipeTaskMeta.getLeaderNodeId())
206+
&& !(!PipeTaskAgent.isHistoryOnlyPipe(
207+
currentPipeStaticMeta.getSourceParameters())
208+
&& PipeTaskAgent.isHistoryOnlyPipe(
209+
updatedPipeStaticMeta.getSourceParameters()))
210+
&& !(!PipeTaskAgent.isRealtimeOnlyPipe(
211+
currentPipeStaticMeta.getSourceParameters())
212+
&& PipeTaskAgent.isRealtimeOnlyPipe(
213+
updatedPipeStaticMeta.getSourceParameters()))
214+
? PipeTaskMeta.getRevertedLeader(regionLeaderNodeId)
215+
: regionLeaderNodeId));
194216
}
195217
});
196218

@@ -248,7 +270,12 @@ public void executeFromOperateOnDataNodes(final ConfigNodeProcedureEnv env) thro
248270
LOGGER.info("AlterPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName);
249271

250272
final String exceptionMessage =
251-
parsePushPipeMetaExceptionForPipe(pipeName, pushSinglePipeMetaToDataNodes(pipeName, env));
273+
parsePushPipeMetaExceptionForPipe(
274+
pipeName,
275+
!PipeTaskAgent.isRealtimeOnlyPipe(currentPipeStaticMeta.getSourceParameters())
276+
&& PipeTaskAgent.isRealtimeOnlyPipe(updatedPipeStaticMeta.getSourceParameters())
277+
? pushSinglePipeMetaToDataNodes4Realtime(pipeName, env)
278+
: pushSinglePipeMetaToDataNodes(pipeName, env));
252279
if (!exceptionMessage.isEmpty()) {
253280
LOGGER.warn(
254281
"Failed to alter pipe {}, details: {}, metadata will be synchronized later.",

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,6 @@
109109
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
110110
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
111111
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
112-
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
113-
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
114-
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
115-
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
116-
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
117-
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
118112
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
119113
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
120114
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
@@ -124,8 +118,6 @@
124118
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
125119
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
126120
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
127-
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
128-
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY;
129121
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
130122
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY;
131123
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
@@ -607,24 +599,6 @@ public boolean isFullSync(final PipeParameters parameters) {
607599
return isHistoryEnable && isRealtimeEnable;
608600
}
609601

610-
private boolean isSnapshotMode(final PipeParameters parameters) {
611-
final boolean isSnapshotMode;
612-
if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) {
613-
isSnapshotMode =
614-
parameters.getBooleanOrDefault(
615-
Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY),
616-
EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
617-
} else {
618-
final String sourceModeValue =
619-
parameters.getStringOrDefault(
620-
Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE);
621-
isSnapshotMode =
622-
sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
623-
|| sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
624-
}
625-
return isSnapshotMode;
626-
}
627-
628602
@Override
629603
public void runPipeTasks(
630604
final Collection<PipeTask> pipeTasks, final Consumer<PipeTask> runSingle) {
@@ -804,7 +778,7 @@ private long calculateTsFileParserMemory(
804778
// If the source is not history, we do not need to allocate memory
805779
boolean isExtractorHistory =
806780
sourceParameters.getBooleanOrDefault(
807-
SystemConstant.RESTART_KEY, SystemConstant.RESTART_DEFAULT_VALUE)
781+
SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, SystemConstant.RESTART_DEFAULT_VALUE)
808782
|| sourceParameters.getBooleanOrDefault(
809783
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY),
810784
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
@@ -888,7 +862,7 @@ private long calculateSendTsFileReadBufferMemory(
888862
// If the source is history enable, we need to transfer tsfile
889863
boolean needTransferTsFile =
890864
sourceParameters.getBooleanOrDefault(
891-
SystemConstant.RESTART_KEY, SystemConstant.RESTART_DEFAULT_VALUE)
865+
SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, SystemConstant.RESTART_DEFAULT_VALUE)
892866
|| sourceParameters.getBooleanOrDefault(
893867
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY),
894868
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);

0 commit comments

Comments
 (0)