Skip to content

Commit 09d526c

Browse files
CaideyipiJackieTien97
authored andcommitted
Pipe: Improved the total performance by disable some useless logic
1 parent 096d9eb commit 09d526c

File tree

12 files changed

+29
-1956
lines changed

12 files changed

+29
-1956
lines changed

.github/workflows/daily-it.yml

Lines changed: 0 additions & 900 deletions
Large diffs are not rendered by default.

.github/workflows/pipe-it.yml

Lines changed: 0 additions & 872 deletions
This file was deleted.

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -365,11 +365,6 @@ public void testAsyncConnectorUseNodeUrls() throws Exception {
365365
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
366366
}
367367

368-
@Test
369-
public void testAirGapConnectorUseNodeUrls() throws Exception {
370-
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
371-
}
372-
373368
private void doTestUseNodeUrls(String connectorName) throws Exception {
374369
senderEnv
375370
.getConfig()
@@ -413,16 +408,7 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
413408

414409
boolean insertResult = true;
415410
for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) {
416-
if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
417-
// Use default port for convenience
418-
nodeUrlsBuilder
419-
.append(wrapper.getIp())
420-
.append(":")
421-
.append(wrapper.getPipeAirGapReceiverPort())
422-
.append(",");
423-
} else {
424-
nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
425-
}
411+
nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
426412
}
427413

428414
try (final SyncConfigNodeIServiceClient client =

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -352,11 +352,6 @@ public void testAsyncConnectorUseNodeUrls() throws Exception {
352352
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
353353
}
354354

355-
@Test
356-
public void testAirGapConnectorUseNodeUrls() throws Exception {
357-
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
358-
}
359-
360355
private void doTestUseNodeUrls(String connectorName) throws Exception {
361356
senderEnv
362357
.getConfig()
@@ -391,16 +386,7 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
391386

392387
final StringBuilder nodeUrlsBuilder = new StringBuilder();
393388
for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) {
394-
if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
395-
// Use default port for convenience
396-
nodeUrlsBuilder
397-
.append(wrapper.getIp())
398-
.append(":")
399-
.append(wrapper.getPipeAirGapReceiverPort())
400-
.append(",");
401-
} else {
402-
nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
403-
}
389+
nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
404390
}
405391

406392
try (final SyncConfigNodeIServiceClient client =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
2424
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
25-
import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionAirGapSink;
2625
import org.apache.iotdb.confignode.manager.pipe.sink.protocol.IoTDBConfigRegionSink;
2726
import org.apache.iotdb.pipe.api.PipeConnector;
2827

@@ -41,9 +40,6 @@ protected void initConstructors() {
4140
pluginConstructors.put(
4241
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
4342
IoTDBConfigRegionSink::new);
44-
pluginConstructors.put(
45-
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
46-
IoTDBConfigRegionAirGapSink::new);
4743
pluginConstructors.put(
4844
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new);
4945

@@ -55,8 +51,6 @@ protected void initConstructors() {
5551
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new);
5652
pluginConstructors.put(
5753
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new);
58-
pluginConstructors.put(
59-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBConfigRegionAirGapSink::new);
6054
pluginConstructors.put(
6155
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new);
6256
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,10 @@
2121

2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
24-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
2524
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor;
2625
import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
27-
import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor;
28-
import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor;
29-
import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
30-
import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor;
31-
import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
32-
import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
3326
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
3427
import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor;
35-
import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
3628

3729
class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
3830

@@ -44,28 +36,6 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
4436
protected void initConstructors() {
4537
pluginConstructors.put(
4638
BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), DoNothingProcessor::new);
47-
pluginConstructors.put(
48-
BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(),
49-
TumblingTimeSamplingProcessor::new);
50-
pluginConstructors.put(
51-
BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
52-
SwingingDoorTrendingSamplingProcessor::new);
53-
pluginConstructors.put(
54-
BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(),
55-
ChangingValueSamplingProcessor::new);
56-
pluginConstructors.put(
57-
BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(),
58-
ThrowingExceptionProcessor::new);
59-
pluginConstructors.put(
60-
BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(), AggregateProcessor::new);
61-
pluginConstructors.put(
62-
BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(),
63-
StandardStatisticsOperatorProcessor::new);
64-
pluginConstructors.put(
65-
BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(),
66-
TumblingWindowingProcessor::new);
67-
pluginConstructors.put(
68-
BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new);
6939
pluginConstructors.put(
7040
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
7141
PipeConsensusProcessor::new);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
2424
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
2525
import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
26-
import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBDataRegionAirGapSink;
2726
import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink;
28-
import org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaSink;
29-
import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
3027
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink;
3128
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
3229
import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
@@ -59,13 +56,8 @@ protected void initConstructors() {
5956
pluginConstructors.put(
6057
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
6158
IoTDBLegacyPipeSink::new);
62-
pluginConstructors.put(
63-
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
64-
IoTDBDataRegionAirGapSink::new);
6559
pluginConstructors.put(
6660
BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketSink::new);
67-
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaSink::new);
68-
pluginConstructors.put(BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), OpcDaSink::new);
6961
pluginConstructors.put(
7062
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new);
7163
pluginConstructors.put(
@@ -82,12 +74,8 @@ protected void initConstructors() {
8274
IoTDBDataRegionAsyncSink::new);
8375
pluginConstructors.put(
8476
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(), IoTDBLegacyPipeSink::new);
85-
pluginConstructors.put(
86-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBDataRegionAirGapSink::new);
8777
pluginConstructors.put(
8878
BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketSink::new);
89-
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaSink::new);
90-
pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), OpcDaSink::new);
9179
pluginConstructors.put(
9280
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new);
9381
pluginConstructors.put(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
2424
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeSinkConstructor;
25-
import org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink;
2625
import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink;
2726
import org.apache.iotdb.pipe.api.PipeConnector;
2827

@@ -41,9 +40,6 @@ protected void initConstructors() {
4140
pluginConstructors.put(
4241
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
4342
IoTDBSchemaRegionSink::new);
44-
pluginConstructors.put(
45-
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
46-
IoTDBSchemaRegionAirGapSink::new);
4743
pluginConstructors.put(
4844
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new);
4945

@@ -55,8 +51,6 @@ protected void initConstructors() {
5551
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new);
5652
pluginConstructors.put(
5753
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new);
58-
pluginConstructors.put(
59-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBSchemaRegionAirGapSink::new);
6054
pluginConstructors.put(
6155
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new);
6256
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.source.dataregion;
2121

2222
import org.apache.iotdb.commons.consensus.DataRegionId;
23+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
2324
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
2425
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
2526
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
@@ -141,6 +142,21 @@ public class IoTDBDataRegionSource extends IoTDBSource {
141142
public void validate(final PipeParameterValidator validator) throws Exception {
142143
super.validate(validator);
143144

145+
final boolean forwardingPipeRequests =
146+
validator
147+
.getParameters()
148+
.getBooleanOrDefault(
149+
Arrays.asList(
150+
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
151+
PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
152+
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
153+
if (!forwardingPipeRequests) {
154+
throw new PipeParameterNotValidException(
155+
String.format(
156+
"The parameter %s cannot be set to false.",
157+
PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY));
158+
}
159+
144160
final boolean isTreeDialect =
145161
validator
146162
.getParameters()
@@ -262,32 +278,6 @@ public void validate(final PipeParameterValidator validator) throws Exception {
262278
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
263279
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
264280

265-
// Validate extractor.realtime.mode
266-
if (validator
267-
.getParameters()
268-
.getBooleanOrDefault(
269-
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
270-
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
271-
|| validator
272-
.getParameters()
273-
.hasAnyAttributes(
274-
SOURCE_START_TIME_KEY,
275-
EXTRACTOR_START_TIME_KEY,
276-
SOURCE_END_TIME_KEY,
277-
EXTRACTOR_END_TIME_KEY)) {
278-
validator.validateAttributeValueRange(
279-
validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
280-
? EXTRACTOR_REALTIME_MODE_KEY
281-
: SOURCE_REALTIME_MODE_KEY,
282-
true,
283-
EXTRACTOR_REALTIME_MODE_FILE_VALUE,
284-
EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
285-
EXTRACTOR_REALTIME_MODE_LOG_VALUE,
286-
EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
287-
EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
288-
EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
289-
}
290-
291281
checkInvalidParameters(validator);
292282

293283
constructHistoricalExtractor();
@@ -447,6 +437,13 @@ private void constructRealtimeExtractor(final PipeParameters parameters) {
447437
return;
448438
}
449439

440+
if (!(pipeName != null
441+
&& (pipeName.startsWith(PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX)
442+
|| pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)))) {
443+
realtimeExtractor = new PipeRealtimeDataRegionTsFileSource();
444+
return;
445+
}
446+
450447
// Use hybrid mode by default
451448
if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)
452449
&& !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor {
114114
private final AtomicReference<Pair<Long, Long>> dataRegionTimePartitionIdBound =
115115
new AtomicReference<>();
116116

117-
protected boolean isForwardingPipeRequests;
117+
protected boolean isForwardingPipeRequests = true;
118118

119119
private boolean shouldTransferModFile; // Whether to transfer mods
120120

@@ -246,22 +246,7 @@ public void customize(
246246
? TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime)
247247
: TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1;
248248

249-
final boolean isDoubleLiving =
250-
parameters.getBooleanOrDefault(
251-
Arrays.asList(
252-
PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
253-
PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
254-
PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
255-
if (isDoubleLiving) {
256-
isForwardingPipeRequests = false;
257-
} else {
258-
isForwardingPipeRequests =
259-
parameters.getBooleanOrDefault(
260-
Arrays.asList(
261-
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
262-
PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
263-
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
264-
}
249+
isForwardingPipeRequests = true;
265250

266251
if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) {
267252
shouldTransferModFile =

0 commit comments

Comments
 (0)