Skip to content

Commit b098e34

Browse files
authored
Pipe: Fixed the bug that the schema may be sent twice when split-enabled & database may be null for non-first schema pipes (apache#16586)
1 parent a29bf52 commit b098e34

File tree

4 files changed

+29
-15
lines changed

4 files changed

+29
-15
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,14 @@ public void start() throws Exception {
115115
== 1) {
116116
SchemaRegionConsensusImpl.getInstance()
117117
.write(schemaRegionId, new PipeOperateSchemaQueueNode(new PlanNodeId(""), true));
118-
database = SchemaEngine.getInstance().getSchemaRegion(schemaRegionId).getDatabaseFullPath();
118+
} else if (!PipeDataNodeAgent.runtime().schemaListener(schemaRegionId).isOpened()) {
119+
// This may be being concurrently opened, we should not continue to start or else the snapshot
120+
// may not be listened
121+
PipeDataNodeAgent.runtime().decreaseAndGetSchemaListenerReferenceCount(schemaRegionId);
122+
return;
119123
}
120124

125+
database = SchemaEngine.getInstance().getSchemaRegion(schemaRegionId).getDatabaseFullPath();
121126
super.start();
122127
}
123128

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2105,10 +2105,10 @@ public SettableFuture<ConfigTaskResult> createPipe(
21052105

21062106
// Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, or both realtime
21072107
// and history are true), the pipe is split into history-only and realtime–only modes.
2108-
final PipeParameters extractorPipeParameters =
2108+
final PipeParameters sourcePipeParameters =
21092109
new PipeParameters(createPipeStatement.getExtractorAttributes());
21102110
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
2111-
&& PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
2111+
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
21122112
try (final ConfigNodeClient configNodeClient =
21132113
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
21142114
// 1. Send request to create the real-time data synchronization pipeline
@@ -2120,7 +2120,7 @@ public SettableFuture<ConfigTaskResult> createPipe(
21202120
.setIfNotExistsCondition(true)
21212121
// Use extractor parameters for real-time data
21222122
.setExtractorAttributes(
2123-
extractorPipeParameters
2123+
sourcePipeParameters
21242124
.addOrReplaceEquivalentAttributesWithClone(
21252125
new PipeParameters(
21262126
ImmutableMap.of(
@@ -2145,16 +2145,23 @@ public SettableFuture<ConfigTaskResult> createPipe(
21452145
// Append suffix to the pipeline name for historical data
21462146
.setPipeName(createPipeStatement.getPipeName() + "_history")
21472147
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
2148-
// Use extractor parameters for historical data
2148+
// Use source parameters for historical data
21492149
.setExtractorAttributes(
2150-
extractorPipeParameters
2150+
sourcePipeParameters
21512151
.addOrReplaceEquivalentAttributesWithClone(
21522152
new PipeParameters(
21532153
ImmutableMap.of(
21542154
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
21552155
Boolean.toString(false),
21562156
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
2157-
Boolean.toString(true))))
2157+
Boolean.toString(true),
2158+
// We force the historical pipe to transfer data only
2159+
// Thus we can transfer schema only once
2160+
// And may drop the historical pipe on successfully transferred
2161+
PipeSourceConstant.SOURCE_INCLUSION_KEY,
2162+
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
2163+
PipeSourceConstant.SOURCE_EXCLUSION_KEY,
2164+
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
21582165
.getAttribute())
21592166
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
21602167
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,12 @@ protected synchronized void tryListen(final List<PipeSnapshotEvent> events) {
7676
}
7777
}
7878

79-
public synchronized Pair<Long, List<PipeSnapshotEvent>> findAvailableSnapshots() {
80-
if (queueTailIndex2SnapshotsCache.getLeft()
81-
< queue.getTailIndex()
82-
- PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) {
79+
public synchronized Pair<Long, List<PipeSnapshotEvent>> findAvailableSnapshots(
80+
final boolean mayClear) {
81+
if (mayClear
82+
&& queueTailIndex2SnapshotsCache.getLeft()
83+
< queue.getTailIndex()
84+
- PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) {
8385
clearSnapshots();
8486
}
8587
return queueTailIndex2SnapshotsCache;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ public void start() throws Exception {
112112
private long getNextIndexAfterSnapshot() {
113113
long nextIndex;
114114
if (needTransferSnapshot()) {
115-
nextIndex = findSnapshot();
115+
nextIndex = findSnapshot(true);
116116
if (nextIndex == Long.MIN_VALUE) {
117117
triggerSnapshot();
118-
nextIndex = findSnapshot();
118+
nextIndex = findSnapshot(false);
119119
if (nextIndex == Long.MIN_VALUE) {
120120
throw new PipeException("Cannot get the newest snapshot after triggering one.");
121121
}
@@ -128,9 +128,9 @@ private long getNextIndexAfterSnapshot() {
128128
return nextIndex;
129129
}
130130

131-
private long findSnapshot() {
131+
private long findSnapshot(final boolean mayClear) {
132132
final Pair<Long, List<PipeSnapshotEvent>> queueTailIndex2Snapshots =
133-
getListeningQueue().findAvailableSnapshots();
133+
getListeningQueue().findAvailableSnapshots(mayClear);
134134
final long nextIndex =
135135
Objects.nonNull(queueTailIndex2Snapshots.getLeft())
136136
&& queueTailIndex2Snapshots.getLeft() != Long.MIN_VALUE

0 commit comments

Comments
 (0)