Skip to content

Commit 97a08c8

Browse files
Pipe Plugin: intro class annotations for pipe plugin & tree/table model isolation for drop/show pipe plugin operations (#14673)
Co-authored-by: Steve Yurong Su <rong@apache.org>
1 parent b349f56 commit 97a08c8

File tree

91 files changed

+1105
-140
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+1105
-140
lines changed

example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
2424
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2525
import org.apache.iotdb.pipe.api.PipeProcessor;
26+
import org.apache.iotdb.pipe.api.annotation.TreeModel;
2627
import org.apache.iotdb.pipe.api.collector.EventCollector;
2728
import org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
2829
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -37,6 +38,7 @@
3738
import java.util.Collections;
3839
import java.util.concurrent.atomic.AtomicLong;
3940

41+
@TreeModel
4042
public class CountPointProcessor implements PipeProcessor {
4143
private static final String AGGREGATE_SERIES_KEY = "aggregate-series";
4244
private static final AtomicLong writePointCount = new AtomicLong(0);

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipePlugin.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,41 @@
1919

2020
package org.apache.iotdb.pipe.api;
2121

22+
/**
23+
* {@link PipePlugin}
24+
*
25+
* <p>{@link PipePlugin} represents a customizable component that can serve as a data extraction
26+
* plugin, data processing plugin, or data sending plugin within a pipeline framework.
27+
*
28+
* <p>Developers can implement different plugin functionalities according to specific requirements,
29+
* such as collecting data from various sources, transforming the data, or forwarding the data to
30+
* external systems.
31+
*
32+
* <p>Usage Model:
33+
*
34+
* <ul>
35+
* <li>By default, a {@link PipePlugin} can operate under a tree model only, if no model
36+
* annotation under {@link org.apache.iotdb.pipe.api.annotation} is specified.
37+
* <li>To extend its applicability, a {@link PipePlugin} can be configured to support table model,
38+
* or both tree and table models concurrently.
39+
* </ul>
40+
*
41+
* <p>Lifecycle:
42+
*
43+
* <ul>
44+
* <li>When the pipeline framework loads, the plugin's configuration is parsed and validated.
45+
* <li>As part of the setup, methods can be provided to prepare connections or resources required
46+
* by the plugin (e.g., reading external configurations, establishing data routes).
47+
* <li>During data processing, the plugin performs its core functionality (extraction,
48+
* transformation, or sending).
49+
* <li>When the pipeline is stopped or destroyed, any allocated resources must be released
50+
* accurately, and {@link #close()} will be invoked to ensure a clean shutdown.
51+
* </ul>
52+
*
53+
* <p>Example: {@link org.apache.iotdb.CountPointProcessor}
54+
*
55+
* <p>Implementations of {@link PipePlugin} should follow best practices for resource management and
56+
* gracefully handle exceptions, especially when running in long-lived or continuously operating
57+
* environments.
58+
*/
2259
public interface PipePlugin extends AutoCloseable {}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.api.annotation;
21+
22+
import java.lang.annotation.ElementType;
23+
import java.lang.annotation.Retention;
24+
import java.lang.annotation.RetentionPolicy;
25+
import java.lang.annotation.Target;
26+
27+
/**
28+
* Indicates that a plugin can be used in table model environments.
29+
*
30+
* <p>When implementing a custom {@link org.apache.iotdb.pipe.api.PipePlugin} that needs to operate
31+
* under table model settings, declare this annotation on the plugin class. Through the {@code
32+
* CREATE PIPEPLUGIN} statement, a plugin annotated with {@link TableModel} is valid for both tree
33+
* model connections and table model connections.
34+
*
35+
* @since 2.0.0
36+
*/
37+
@Target(ElementType.TYPE)
38+
@Retention(RetentionPolicy.RUNTIME)
39+
public @interface TableModel {}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.api.annotation;
21+
22+
import java.lang.annotation.ElementType;
23+
import java.lang.annotation.Retention;
24+
import java.lang.annotation.RetentionPolicy;
25+
import java.lang.annotation.Target;
26+
27+
/**
28+
* Indicates that a plugin can be used in tree model environments.
29+
*
30+
* <p>When implementing a custom {@link org.apache.iotdb.pipe.api.PipePlugin} that needs to operate
31+
* under tree model settings, declare this annotation on the plugin class. Through the {@code CREATE
32+
* PIPEPLUGIN} statement, a plugin annotated with {@link TreeModel} is valid for both tree model
33+
* connections and tree model connections.
34+
*
35+
* @since 2.0.0
36+
*/
37+
@Target(ElementType.TYPE)
38+
@Retention(RetentionPolicy.RUNTIME)
39+
public @interface TreeModel {}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,31 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
24+
import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
25+
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
2426
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
2527
import org.apache.iotdb.consensus.common.DataSet;
2628

2729
import java.io.IOException;
2830
import java.nio.ByteBuffer;
2931
import java.util.ArrayList;
3032
import java.util.List;
33+
import java.util.Map;
34+
import java.util.stream.Collectors;
3135

3236
public class PipePluginTableResp implements DataSet {
3337

3438
private final TSStatus status;
3539
private final List<PipePluginMeta> allPipePluginMeta;
40+
private final Map<String, Visibility> pipePluginNameToVisibilityMap;
3641

37-
public PipePluginTableResp(TSStatus status, List<PipePluginMeta> allPipePluginMeta) {
42+
public PipePluginTableResp(
43+
TSStatus status,
44+
List<PipePluginMeta> allPipePluginMeta,
45+
Map<String, Visibility> pipePluginNameToVisibilityMap) {
3846
this.status = status;
3947
this.allPipePluginMeta = allPipePluginMeta;
48+
this.pipePluginNameToVisibilityMap = pipePluginNameToVisibilityMap;
4049
}
4150

4251
public TGetPipePluginTableResp convertToThriftResponse() throws IOException {
@@ -46,4 +55,20 @@ public TGetPipePluginTableResp convertToThriftResponse() throws IOException {
4655
}
4756
return new TGetPipePluginTableResp(status, pipePluginInformationByteBuffers);
4857
}
58+
59+
public PipePluginTableResp filter(final boolean isTableModel) {
60+
return new PipePluginTableResp(
61+
status,
62+
allPipePluginMeta.stream()
63+
.filter(
64+
meta -> {
65+
final String pipePluginName = meta.getPluginName();
66+
final Visibility visibility =
67+
pipePluginNameToVisibilityMap.getOrDefault(
68+
pipePluginName, Visibility.TREE_ONLY);
69+
return VisibilityUtils.isCompatible(visibility, isTableModel);
70+
})
71+
.collect(Collectors.toList()),
72+
pipePluginNameToVisibilityMap);
73+
}
4974
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public PipeTableResp filter(final Boolean whereClause, final String pipeName) {
9797
public PipeTableResp filter(
9898
final Boolean whereClause, final String pipeName, final boolean isTableModel) {
9999
final PipeTableResp resp = filter(whereClause, pipeName);
100-
resp.allPipeMeta.removeIf(meta -> !meta.visibleUnder(isTableModel));
100+
resp.allPipeMeta.removeIf(meta -> !meta.getStaticMeta().visibleUnder(isTableModel));
101101
return resp;
102102
}
103103

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@
215215
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
216216
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
217217
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
218+
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
218219
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
219220
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
220221
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
@@ -1609,6 +1610,14 @@ public TGetPipePluginTableResp getPipePluginTable() {
16091610
: new TGetPipePluginTableResp(status, Collections.emptyList());
16101611
}
16111612

1613+
@Override
1614+
public TGetPipePluginTableResp getPipePluginTableExtended(TShowPipePluginReq req) {
1615+
TSStatus status = confirmLeader();
1616+
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
1617+
? pipeManager.getPipePluginCoordinator().getPipePluginTableExtended(req)
1618+
: new TGetPipePluginTableResp(status, Collections.emptyList());
1619+
}
1620+
16121621
@Override
16131622
public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) {
16141623
TSStatus status = confirmLeader();

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@
138138
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
139139
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
140140
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
141+
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
141142
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
142143
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
143144
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
@@ -556,6 +557,9 @@ TDataPartitionTableResp getOrCreateDataPartition(
556557
/** Show pipe plugins. */
557558
TGetPipePluginTableResp getPipePluginTable();
558559

560+
/** Show pipe plugins. */
561+
TGetPipePluginTableResp getPipePluginTableExtended(TShowPipePluginReq req);
562+
559563
/** Get pipe plugin jar. */
560564
TGetJarInListResp getPipePluginJar(TGetJarInListReq req);
561565

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionWritePlanEvent;
3434
import org.apache.iotdb.confignode.service.ConfigNode;
3535
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
36+
import org.apache.iotdb.pipe.api.annotation.TableModel;
37+
import org.apache.iotdb.pipe.api.annotation.TreeModel;
3638
import org.apache.iotdb.pipe.api.event.Event;
3739
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3840
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -48,6 +50,8 @@
4850
import java.util.HashMap;
4951
import java.util.Objects;
5052

53+
@TreeModel
54+
@TableModel
5155
public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector {
5256

5357
private static final Logger LOGGER =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionSnapshotEvent;
3434
import org.apache.iotdb.confignode.manager.pipe.event.PipeConfigRegionWritePlanEvent;
3535
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
36+
import org.apache.iotdb.pipe.api.annotation.TableModel;
37+
import org.apache.iotdb.pipe.api.annotation.TreeModel;
3638
import org.apache.iotdb.pipe.api.event.Event;
3739
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
3840
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -52,6 +54,8 @@
5254
import java.util.List;
5355
import java.util.Objects;
5456

57+
@TreeModel
58+
@TableModel
5559
public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector {
5660

5761
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConfigRegionConnector.class);

0 commit comments

Comments
 (0)