Skip to content

Commit 5c6da1b

Browse files
authored
Pipe: Improved the total performance by disable some useless logic
1 parent a90d3e5 commit 5c6da1b

File tree

10 files changed

+30
-616
lines changed

10 files changed

+30
-616
lines changed

.github/workflows/pipe-it-2cluster.yml

Lines changed: 0 additions & 466 deletions
This file was deleted.

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,6 @@ public void testAsyncConnectorUseNodeUrls() throws Exception {
344344
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
345345
}
346346

347-
@Test
348-
public void testAirGapConnectorUseNodeUrls() throws Exception {
349-
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
350-
}
351-
352347
private void doTestUseNodeUrls(String connectorName) throws Exception {
353348
senderEnv
354349
.getConfig()
@@ -383,16 +378,7 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
383378

384379
final StringBuilder nodeUrlsBuilder = new StringBuilder();
385380
for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) {
386-
if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
387-
// Use default port for convenience
388-
nodeUrlsBuilder
389-
.append(wrapper.getIp())
390-
.append(":")
391-
.append(wrapper.getPipeAirGapReceiverPort())
392-
.append(",");
393-
} else {
394-
nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
395-
}
381+
nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
396382
}
397383

398384
try (final SyncConfigNodeIServiceClient client =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionConnectorConstructor.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector;
2424
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor;
25-
import org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionAirGapConnector;
2625
import org.apache.iotdb.confignode.manager.pipe.connector.protocol.IoTDBConfigRegionConnector;
2726
import org.apache.iotdb.pipe.api.PipeConnector;
2827

@@ -42,9 +41,6 @@ protected void initConstructors() {
4241
pluginConstructors.put(
4342
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
4443
IoTDBConfigRegionConnector::new);
45-
pluginConstructors.put(
46-
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
47-
IoTDBConfigRegionAirGapConnector::new);
4844
pluginConstructors.put(
4945
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new);
5046

@@ -59,9 +55,6 @@ protected void initConstructors() {
5955
pluginConstructors.put(
6056
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(),
6157
IoTDBConfigRegionConnector::new);
62-
pluginConstructors.put(
63-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(),
64-
IoTDBConfigRegionAirGapConnector::new);
6558
pluginConstructors.put(
6659
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new);
6760
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector;
2424
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor;
2525
import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
26-
import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBDataRegionAirGapConnector;
2726
import org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
28-
import org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaConnector;
29-
import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
3027
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
3128
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
3229
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
@@ -59,15 +56,8 @@ protected void initConstructors() {
5956
pluginConstructors.put(
6057
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
6158
IoTDBLegacyPipeConnector::new);
62-
pluginConstructors.put(
63-
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
64-
IoTDBDataRegionAirGapConnector::new);
6559
pluginConstructors.put(
6660
BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketConnector::new);
67-
pluginConstructors.put(
68-
BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaConnector::new);
69-
pluginConstructors.put(
70-
BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), OpcDaConnector::new);
7161
pluginConstructors.put(
7262
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new);
7363
pluginConstructors.put(
@@ -88,13 +78,8 @@ protected void initConstructors() {
8878
pluginConstructors.put(
8979
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(),
9080
IoTDBLegacyPipeConnector::new);
91-
pluginConstructors.put(
92-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(),
93-
IoTDBDataRegionAirGapConnector::new);
9481
pluginConstructors.put(
9582
BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketConnector::new);
96-
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaConnector::new);
97-
pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), OpcDaConnector::new);
9883
pluginConstructors.put(
9984
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new);
10085
pluginConstructors.put(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,9 @@
2121

2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
24-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
2524
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor;
2625
import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
27-
import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor;
28-
import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor;
29-
import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
30-
import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor;
31-
import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
32-
import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
3326
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
34-
import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
3527

3628
class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
3729

@@ -43,28 +35,6 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
4335
protected void initConstructors() {
4436
pluginConstructors.put(
4537
BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), DoNothingProcessor::new);
46-
pluginConstructors.put(
47-
BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(),
48-
TumblingTimeSamplingProcessor::new);
49-
pluginConstructors.put(
50-
BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
51-
SwingingDoorTrendingSamplingProcessor::new);
52-
pluginConstructors.put(
53-
BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(),
54-
ChangingValueSamplingProcessor::new);
55-
pluginConstructors.put(
56-
BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(),
57-
ThrowingExceptionProcessor::new);
58-
pluginConstructors.put(
59-
BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(), AggregateProcessor::new);
60-
pluginConstructors.put(
61-
BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(),
62-
StandardStatisticsOperatorProcessor::new);
63-
pluginConstructors.put(
64-
BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(),
65-
TumblingWindowingProcessor::new);
66-
pluginConstructors.put(
67-
BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new);
6838
pluginConstructors.put(
6939
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
7040
PipeConsensusProcessor::new);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionConnectorConstructor.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector;
2424
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConstructor;
25-
import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBSchemaRegionAirGapConnector;
2625
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBSchemaRegionConnector;
2726
import org.apache.iotdb.pipe.api.PipeConnector;
2827

@@ -42,9 +41,6 @@ protected void initConstructors() {
4241
pluginConstructors.put(
4342
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
4443
IoTDBSchemaRegionConnector::new);
45-
pluginConstructors.put(
46-
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
47-
IoTDBSchemaRegionAirGapConnector::new);
4844
pluginConstructors.put(
4945
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingConnector::new);
5046

@@ -59,9 +55,6 @@ protected void initConstructors() {
5955
pluginConstructors.put(
6056
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(),
6157
IoTDBSchemaRegionConnector::new);
62-
pluginConstructors.put(
63-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(),
64-
IoTDBSchemaRegionAirGapConnector::new);
6558
pluginConstructors.put(
6659
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingConnector::new);
6760
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.apache.iotdb.db.pipe.extractor.dataregion;
2121

2222
import org.apache.iotdb.commons.consensus.DataRegionId;
23+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
24+
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
2325
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
2426
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
2527
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
@@ -45,6 +47,7 @@
4547
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
4648
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
4749
import org.apache.iotdb.pipe.api.exception.PipeException;
50+
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
4851

4952
import org.apache.tsfile.utils.Pair;
5053
import org.slf4j.Logger;
@@ -107,6 +110,21 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor {
107110
public void validate(final PipeParameterValidator validator) throws Exception {
108111
super.validate(validator);
109112

113+
final boolean forwardingPipeRequests =
114+
validator
115+
.getParameters()
116+
.getBooleanOrDefault(
117+
Arrays.asList(
118+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
119+
PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
120+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
121+
if (!forwardingPipeRequests) {
122+
throw new PipeParameterNotValidException(
123+
String.format(
124+
"The parameter %s cannot be set to false.",
125+
PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY));
126+
}
127+
110128
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
111129
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
112130
validator.getParameters());
@@ -170,26 +188,6 @@ public void validate(final PipeParameterValidator validator) throws Exception {
170188
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
171189
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
172190

173-
// Validate extractor.realtime.mode
174-
if (validator
175-
.getParameters()
176-
.getBooleanOrDefault(
177-
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
178-
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
179-
|| validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, SOURCE_END_TIME_KEY)) {
180-
validator.validateAttributeValueRange(
181-
validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
182-
? EXTRACTOR_REALTIME_MODE_KEY
183-
: SOURCE_REALTIME_MODE_KEY,
184-
true,
185-
EXTRACTOR_REALTIME_MODE_FILE_VALUE,
186-
EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
187-
EXTRACTOR_REALTIME_MODE_LOG_VALUE,
188-
EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
189-
EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
190-
EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
191-
}
192-
193191
// Validate source.start-time and source.end-time
194192
if (validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, SOURCE_END_TIME_KEY)
195193
&& validator
@@ -276,6 +274,13 @@ private void constructRealtimeExtractor(final PipeParameters parameters) {
276274
return;
277275
}
278276

277+
if (!(pipeName != null
278+
&& (pipeName.startsWith(PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX)
279+
|| pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)))) {
280+
realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
281+
return;
282+
}
283+
279284
// Use hybrid mode by default
280285
if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) {
281286
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
2525
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2626
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
27-
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
2827
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
2928
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
3029
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -107,7 +106,7 @@ public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
107106
private final AtomicReference<Pair<Long, Long>> dataRegionTimePartitionIdBound =
108107
new AtomicReference<>();
109108

110-
protected boolean isForwardingPipeRequests;
109+
protected boolean isForwardingPipeRequests = true;
111110

112111
private boolean shouldTransferModFile; // Whether to transfer mods
113112

@@ -232,12 +231,7 @@ public void customize(
232231
? TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime)
233232
: TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1;
234233

235-
isForwardingPipeRequests =
236-
parameters.getBooleanOrDefault(
237-
Arrays.asList(
238-
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
239-
PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
240-
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
234+
isForwardingPipeRequests = true;
241235

242236
shouldTransferModFile =
243237
parameters.getBooleanOrDefault(

0 commit comments

Comments
 (0)