Skip to content

Commit 0875fb2

Browse files
committed
Pipe: Improved the total performance by disable some useless logic
1 parent 19885ea commit 0875fb2

File tree

10 files changed

+28
-678
lines changed

10 files changed

+28
-678
lines changed

.github/workflows/pipe-it-2cluster.yml

Lines changed: 0 additions & 539 deletions
This file was deleted.

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProtocolIT.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -332,11 +332,6 @@ public void testAsyncConnectorUseNodeUrls() throws Exception {
332332
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
333333
}
334334

335-
@Test
336-
public void testAirGapConnectorUseNodeUrls() throws Exception {
337-
doTestUseNodeUrls(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName());
338-
}
339-
340335
private void doTestUseNodeUrls(String connectorName) throws Exception {
341336
senderEnv
342337
.getConfig()
@@ -371,16 +366,7 @@ private void doTestUseNodeUrls(String connectorName) throws Exception {
371366

372367
final StringBuilder nodeUrlsBuilder = new StringBuilder();
373368
for (final DataNodeWrapper wrapper : receiverEnv.getDataNodeWrapperList()) {
374-
if (connectorName.equals(BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
375-
// Use default port for convenience
376-
nodeUrlsBuilder
377-
.append(wrapper.getIp())
378-
.append(":")
379-
.append(wrapper.getPipeAirGapReceiverPort())
380-
.append(",");
381-
} else {
382-
nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
383-
}
369+
nodeUrlsBuilder.append(wrapper.getIpAndPortString()).append(",");
384370
}
385371

386372
try (final SyncConfigNodeIServiceClient client =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/plugin/PipeConfigRegionSinkConstructor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ protected void initConstructors() {
4141
pluginConstructors.put(
4242
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
4343
IoTDBConfigRegionSink::new);
44-
pluginConstructors.put(
45-
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
46-
IoTDBConfigRegionAirGapSink::new);
4744
pluginConstructors.put(
4845
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new);
4946

@@ -55,8 +52,6 @@ protected void initConstructors() {
5552
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new);
5653
pluginConstructors.put(
5754
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBConfigRegionSink::new);
58-
pluginConstructors.put(
59-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBConfigRegionAirGapSink::new);
6055
pluginConstructors.put(
6156
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new);
6257
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,9 @@
2121

2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2323
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
24-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
2524
import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor;
2625
import org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
27-
import org.apache.iotdb.db.pipe.processor.aggregate.AggregateProcessor;
28-
import org.apache.iotdb.db.pipe.processor.aggregate.operator.processor.StandardStatisticsOperatorProcessor;
29-
import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.TumblingWindowingProcessor;
30-
import org.apache.iotdb.db.pipe.processor.downsampling.changing.ChangingValueSamplingProcessor;
31-
import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
32-
import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
3326
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
34-
import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;
3527

3628
class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
3729

@@ -43,28 +35,6 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
4335
protected void initConstructors() {
4436
pluginConstructors.put(
4537
BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName(), DoNothingProcessor::new);
46-
pluginConstructors.put(
47-
BuiltinPipePlugin.TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName(),
48-
TumblingTimeSamplingProcessor::new);
49-
pluginConstructors.put(
50-
BuiltinPipePlugin.SDT_SAMPLING_PROCESSOR.getPipePluginName(),
51-
SwingingDoorTrendingSamplingProcessor::new);
52-
pluginConstructors.put(
53-
BuiltinPipePlugin.CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName(),
54-
ChangingValueSamplingProcessor::new);
55-
pluginConstructors.put(
56-
BuiltinPipePlugin.THROWING_EXCEPTION_PROCESSOR.getPipePluginName(),
57-
ThrowingExceptionProcessor::new);
58-
pluginConstructors.put(
59-
BuiltinPipePlugin.AGGREGATE_PROCESSOR.getPipePluginName(), AggregateProcessor::new);
60-
pluginConstructors.put(
61-
BuiltinPipePlugin.STANDARD_STATISTICS_PROCESSOR.getPipePluginName(),
62-
StandardStatisticsOperatorProcessor::new);
63-
pluginConstructors.put(
64-
BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(),
65-
TumblingWindowingProcessor::new);
66-
pluginConstructors.put(
67-
BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new);
6838
pluginConstructors.put(
6939
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
7040
PipeConsensusProcessor::new);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,8 @@ protected void initConstructors() {
5959
pluginConstructors.put(
6060
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
6161
IoTDBLegacyPipeSink::new);
62-
pluginConstructors.put(
63-
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
64-
IoTDBDataRegionAirGapSink::new);
6562
pluginConstructors.put(
6663
BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketSink::new);
67-
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaSink::new);
68-
pluginConstructors.put(BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), OpcDaSink::new);
6964
pluginConstructors.put(
7065
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new);
7166
pluginConstructors.put(
@@ -82,12 +77,8 @@ protected void initConstructors() {
8277
IoTDBDataRegionAsyncSink::new);
8378
pluginConstructors.put(
8479
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_SINK.getPipePluginName(), IoTDBLegacyPipeSink::new);
85-
pluginConstructors.put(
86-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBDataRegionAirGapSink::new);
8780
pluginConstructors.put(
8881
BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), WebSocketSink::new);
89-
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), OpcUaSink::new);
90-
pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), OpcDaSink::new);
9182
pluginConstructors.put(
9283
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new);
9384
pluginConstructors.put(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/schemaregion/PipeSchemaRegionSinkConstructor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ protected void initConstructors() {
4141
pluginConstructors.put(
4242
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
4343
IoTDBSchemaRegionSink::new);
44-
pluginConstructors.put(
45-
BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName(),
46-
IoTDBSchemaRegionAirGapSink::new);
4744
pluginConstructors.put(
4845
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), DoNothingSink::new);
4946

@@ -55,8 +52,6 @@ protected void initConstructors() {
5552
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new);
5653
pluginConstructors.put(
5754
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_SINK.getPipePluginName(), IoTDBSchemaRegionSink::new);
58-
pluginConstructors.put(
59-
BuiltinPipePlugin.IOTDB_AIR_GAP_SINK.getPipePluginName(), IoTDBSchemaRegionAirGapSink::new);
6055
pluginConstructors.put(
6156
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), DoNothingSink::new);
6257
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
4646
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
4747
import org.apache.iotdb.pipe.api.exception.PipeException;
48+
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
4849

4950
import org.apache.tsfile.utils.Pair;
5051
import org.slf4j.Logger;
@@ -100,6 +101,21 @@ public class IoTDBDataRegionSource extends IoTDBSource {
100101
public void validate(final PipeParameterValidator validator) throws Exception {
101102
super.validate(validator);
102103

104+
final boolean forwardingPipeRequests =
105+
validator
106+
.getParameters()
107+
.getBooleanOrDefault(
108+
Arrays.asList(
109+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
110+
PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
111+
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
112+
if (!forwardingPipeRequests) {
113+
throw new PipeParameterNotValidException(
114+
String.format(
115+
"The parameter %s cannot be set to false.",
116+
PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY));
117+
}
118+
103119
final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
104120
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
105121
validator.getParameters());
@@ -163,26 +179,6 @@ public void validate(final PipeParameterValidator validator) throws Exception {
163179
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
164180
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE));
165181

166-
// Validate source.realtime.mode
167-
if (validator
168-
.getParameters()
169-
.getBooleanOrDefault(
170-
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
171-
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)
172-
|| validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, SOURCE_END_TIME_KEY)) {
173-
validator.validateAttributeValueRange(
174-
validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY)
175-
? EXTRACTOR_REALTIME_MODE_KEY
176-
: SOURCE_REALTIME_MODE_KEY,
177-
true,
178-
EXTRACTOR_REALTIME_MODE_FILE_VALUE,
179-
EXTRACTOR_REALTIME_MODE_HYBRID_VALUE,
180-
EXTRACTOR_REALTIME_MODE_LOG_VALUE,
181-
EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE,
182-
EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE,
183-
EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE);
184-
}
185-
186182
// Validate source.start-time and source.end-time
187183
if (validator
188184
.getParameters()
@@ -260,6 +256,13 @@ private void constructRealtimeExtractor(final PipeParameters parameters) {
260256
return;
261257
}
262258

259+
if (!(pipeName != null
260+
&& (pipeName.startsWith(PipeStaticMeta.SUBSCRIPTION_PIPE_PREFIX)
261+
|| pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)))) {
262+
realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
263+
return;
264+
}
265+
263266
// Use hybrid mode by default
264267
if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) {
265268
realtimeExtractor = new PipeRealtimeDataRegionHybridSource();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor {
107107
private final AtomicReference<Pair<Long, Long>> dataRegionTimePartitionIdBound =
108108
new AtomicReference<>();
109109

110-
protected boolean isForwardingPipeRequests;
110+
protected boolean isForwardingPipeRequests = true;
111111

112112
private boolean shouldTransferModFile; // Whether to transfer mods
113113

@@ -234,12 +234,7 @@ public void customize(
234234
? TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime)
235235
: TimePartitionUtils.getTimePartitionId(realtimeDataExtractionEndTime) - 1;
236236

237-
isForwardingPipeRequests =
238-
parameters.getBooleanOrDefault(
239-
Arrays.asList(
240-
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
241-
PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
242-
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
237+
isForwardingPipeRequests = true;
243238

244239
shouldTransferModFile =
245240
parameters.getBooleanOrDefault(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,15 @@
1919

2020
package org.apache.iotdb.commons.pipe.agent.plugin.builtin;
2121

22-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor;
23-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor;
24-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor;
2522
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
26-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.ChangingValueSamplingProcessor;
27-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.SwingingDoorTrendingSamplingProcessor;
28-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.downsampling.TumblingTimeSamplingProcessor;
2923
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.pipeconsensus.PipeConsensusProcessor;
30-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
31-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.twostage.TwoStageCountProcessor;
3224
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.donothing.DoNothingSink;
33-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.airgap.IoTDBAirGapSink;
3425
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.consensus.PipeConsensusAsyncSink;
3526
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBLegacyPipeSink;
3627
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftAsyncSink;
3728
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink;
3829
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSslSink;
3930
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSyncSink;
40-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcda.OpcDaSink;
41-
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcua.OpcUaSink;
4231
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.websocket.WebSocketSink;
4332
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.writeback.WriteBackSink;
4433
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.source.donothing.DoNothingSource;
@@ -60,18 +49,8 @@ public enum BuiltinPipePlugin {
6049

6150
// processors
6251
DO_NOTHING_PROCESSOR("do-nothing-processor", DoNothingProcessor.class),
63-
TUMBLING_TIME_SAMPLING_PROCESSOR(
64-
"tumbling-time-sampling-processor", TumblingTimeSamplingProcessor.class),
65-
SDT_SAMPLING_PROCESSOR("sdt-sampling-processor", SwingingDoorTrendingSamplingProcessor.class),
66-
CHANGING_VALUE_SAMPLING_PROCESSOR(
67-
"changing-value-sampling-processor", ChangingValueSamplingProcessor.class),
68-
THROWING_EXCEPTION_PROCESSOR("throwing-exception-processor", ThrowingExceptionProcessor.class),
69-
AGGREGATE_PROCESSOR("aggregate-processor", AggregateProcessor.class),
70-
COUNT_POINT_PROCESSOR("count-point-processor", TwoStageCountProcessor.class),
7152

7253
// Hidden-processors, which are plugins of the processors
73-
STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class),
74-
TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class),
7554
PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", PipeConsensusProcessor.class),
7655

7756
// connectors
@@ -81,12 +60,9 @@ public enum BuiltinPipePlugin {
8160
IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector", IoTDBThriftSyncSink.class),
8261
IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector", IoTDBThriftAsyncSink.class),
8362
IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector", IoTDBLegacyPipeSink.class),
84-
IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class),
8563
PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", PipeConsensusAsyncSink.class),
8664

8765
WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class),
88-
OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class),
89-
OPC_DA_CONNECTOR("opc-da-connector", OpcDaSink.class),
9066
WRITE_BACK_CONNECTOR("write-back-connector", WriteBackSink.class),
9167

9268
DO_NOTHING_SINK("do-nothing-sink", DoNothingSink.class),
@@ -95,10 +71,7 @@ public enum BuiltinPipePlugin {
9571
IOTDB_THRIFT_SYNC_SINK("iotdb-thrift-sync-sink", IoTDBThriftSyncSink.class),
9672
IOTDB_THRIFT_ASYNC_SINK("iotdb-thrift-async-sink", IoTDBThriftAsyncSink.class),
9773
IOTDB_LEGACY_PIPE_SINK("iotdb-legacy-pipe-sink", IoTDBLegacyPipeSink.class),
98-
IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapSink.class),
9974
WEBSOCKET_SINK("websocket-sink", WebSocketSink.class),
100-
OPC_UA_SINK("opc-ua-sink", OpcUaSink.class),
101-
OPC_DA_SINK("opc-da-sink", OpcDaSink.class),
10275
WRITE_BACK_SINK("write-back-sink", WriteBackSink.class),
10376
SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class),
10477
PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", PipeConsensusAsyncSink.class),
@@ -136,14 +109,6 @@ public String getClassName() {
136109
// Sources
137110
DO_NOTHING_SOURCE.getPipePluginName().toUpperCase(),
138111
// Processors
139-
TUMBLING_TIME_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
140-
SDT_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
141-
CHANGING_VALUE_SAMPLING_PROCESSOR.getPipePluginName().toUpperCase(),
142-
THROWING_EXCEPTION_PROCESSOR.getPipePluginName().toUpperCase(),
143-
AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(),
144-
COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(),
145-
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
146-
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
147112
PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
148113
// Connectors
149114
DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(),
@@ -152,19 +117,14 @@ public String getClassName() {
152117
IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName().toUpperCase(),
153118
IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
154119
IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName().toUpperCase(),
155-
IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
156120
WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
157-
OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
158-
OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(),
159121
WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
160122
PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
161123
// Sinks
162124
IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(),
163125
IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(),
164126
IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
165127
WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
166-
OPC_UA_SINK.getPipePluginName().toUpperCase(),
167-
OPC_DA_SINK.getPipePluginName().toUpperCase(),
168128
SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(),
169129
PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase())));
170130
}

0 commit comments

Comments
 (0)