diff --git a/iotdb-collector/collector-core/pom.xml b/iotdb-collector/collector-core/pom.xml
new file mode 100644
index 000000000000..ff75a8927148
--- /dev/null
+++ b/iotdb-collector/collector-core/pom.xml
@@ -0,0 +1,91 @@
+
+
+
+ 4.0.0
+
+ org.apache.iotdb
+ iotdb-collector
+ 2.0.0-SNAPSHOT
+
+ collector-core
+ IoTDB: Collector: Core
+
+
+ org.apache.iotdb
+ collector-openapi
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.iotdb
+ service-rpc
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.iotdb
+ pipe-api
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.iotdb
+ node-commons
+ 2.0.0-SNAPSHOT
+
+
+ io.netty
+ netty-handler
+
+
+ io.netty
+ netty-common
+
+
+ net.minidev
+ json-smart
+
+
+
+
+ jakarta.servlet
+ jakarta.servlet-api
+
+
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.eclipse.jetty
+ jetty-server
+
+
+ org.eclipse.jetty
+ jetty-servlet
+
+
+ org.glassfish.jersey.containers
+ jersey-container-servlet-core
+
+
+
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
new file mode 100644
index 000000000000..6cf6456d98cc
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector;
+
+import org.apache.iotdb.collector.config.Configuration;
+import org.apache.iotdb.collector.service.ApiService;
+import org.apache.iotdb.collector.service.IService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+
+public class Application {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
+
+ private final Configuration configuration = new Configuration();
+ private final LinkedList services = new LinkedList<>();
+
+ private Application() {
+ services.add(new ApiService());
+ }
+
+ public static void main(String[] args) {
+ LOGGER.info("[Application] Starting ...");
+ final long startTime = System.currentTimeMillis();
+
+ final Application application = new Application();
+
+ application.logAllOptions();
+ application.registerShutdownHook();
+ application.startServices();
+
+ LOGGER.info(
+ "[Application] Successfully started in {}ms", System.currentTimeMillis() - startTime);
+ }
+
+ private void logAllOptions() {
+ configuration.logAllOptions();
+ }
+
+ private void registerShutdownHook() {
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ LOGGER.warn("[Application] Exiting ...");
+
+ for (final IService service : services) {
+ try {
+ service.stop();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "[{}] Unexpected exception occurred when stopping: {}",
+ service.name(),
+ e.getMessage(),
+ e);
+ }
+ }
+
+ LOGGER.warn(
+ "[Application] JVM report: total memory {}, free memory {}, used memory {}",
+ Runtime.getRuntime().totalMemory(),
+ Runtime.getRuntime().freeMemory(),
+ Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
+ LOGGER.warn("[Application] Exited.");
+ }));
+ }
+
+ private void startServices() {
+ for (final IService service : services) {
+ service.start();
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
new file mode 100644
index 000000000000..bca4106987ff
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/CollectorAgent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent;
+
+import org.apache.iotdb.collector.agent.executor.CollectorTaskExecutorAgent;
+import org.apache.iotdb.collector.agent.plugin.CollectorPluginAgent;
+import org.apache.iotdb.collector.agent.task.CollectorTaskAgent;
+
+public class CollectorAgent {
+
+ private final CollectorTaskAgent collectorTaskAgent = CollectorTaskAgent.instance();
+ private final CollectorTaskExecutorAgent collectorTaskExecutorAgent =
+ CollectorTaskExecutorAgent.instance();
+ private final CollectorPluginAgent collectorPluginAgent = CollectorPluginAgent.instance();
+
+ private CollectorAgent() {}
+
+ public static CollectorTaskAgent task() {
+ return CollectorAgentHolder.INSTANCE.collectorTaskAgent;
+ }
+
+ public static CollectorTaskExecutorAgent executor() {
+ return CollectorAgentHolder.INSTANCE.collectorTaskExecutorAgent;
+ }
+
+ public static CollectorPluginAgent plugin() {
+ return CollectorAgentHolder.INSTANCE.collectorPluginAgent;
+ }
+
+ private static class CollectorAgentHolder {
+ private static final CollectorAgent INSTANCE = new CollectorAgent();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
new file mode 100644
index 000000000000..056685e3313a
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/collect/CollectorEventCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.collect;
+
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+public class CollectorEventCollector implements EventCollector {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectorEventCollector.class);
+
+ private final BlockingQueue pendingQueue;
+
+ public CollectorEventCollector(final BlockingQueue pendingQueue) {
+ this.pendingQueue = pendingQueue;
+ }
+
+ @Override
+ public void collect(final Event event) {
+ try {
+ pendingQueue.put(event);
+ } catch (final InterruptedException e) {
+ LOGGER.warn("collect event failed because {}", e.getMessage(), e);
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
new file mode 100644
index 000000000000..385fd606628c
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorProcessorTaskExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorProcessorTaskExecutor extends CollectorTaskExecutor {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(CollectorProcessorTaskExecutor.class);
+
+ private static final Map PROCESSOR_EXECUTOR = new ConcurrentHashMap<>();
+ private static final Map PROCESSOR_TASK_MAP = new ConcurrentHashMap<>();
+
+ public boolean validateIfAbsent(final String taskId) {
+ return !PROCESSOR_EXECUTOR.containsKey(taskId) && !PROCESSOR_TASK_MAP.containsKey(taskId);
+ }
+
+ @Override
+ public Optional getExecutor(final String taskId) {
+ return Optional.of(
+ IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-processor-executor-" + taskId));
+ }
+
+ @Override
+ public void recordExecution(
+ final CollectorTask collectorTask, final ExecutorService executorService) {
+ final String taskId = collectorTask.getTaskId();
+ PROCESSOR_EXECUTOR.putIfAbsent(taskId, executorService);
+ PROCESSOR_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+ LOGGER.info("register collector processor task {}", taskId);
+ }
+
+ @Override
+ public void eraseExecution(final String taskId) {
+ PROCESSOR_TASK_MAP.remove(taskId).stop();
+ PROCESSOR_EXECUTOR.remove(taskId).shutdownNow();
+
+ LOGGER.info("deregister collector processor task {}", taskId);
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
new file mode 100644
index 000000000000..a490e614223c
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSinkTaskExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSinkTaskExecutor extends CollectorTaskExecutor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectorSinkTaskExecutor.class);
+
+ private static final Map SINK_EXECUTOR = new ConcurrentHashMap<>();
+ private static final Map SINK_TASK_MAP = new ConcurrentHashMap<>();
+
+ public boolean validateIfAbsent(final String taskId) {
+ return !SINK_EXECUTOR.containsKey(taskId) && !SINK_TASK_MAP.containsKey(taskId);
+ }
+
+ @Override
+ public Optional getExecutor(final String taskId) {
+ return Optional.of(
+ IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-sink-executor-" + taskId));
+ }
+
+ @Override
+ public void recordExecution(
+ final CollectorTask collectorTask, final ExecutorService executorService) {
+ final String taskId = collectorTask.getTaskId();
+ SINK_EXECUTOR.putIfAbsent(taskId, executorService);
+ SINK_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+ LOGGER.info("register collector sink task {}", taskId);
+ }
+
+ @Override
+ public void eraseExecution(final String taskId) {
+ SINK_TASK_MAP.remove(taskId).stop();
+ SINK_EXECUTOR.remove(taskId).shutdownNow();
+
+ LOGGER.info("deregister collector sink task {}", taskId);
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
new file mode 100644
index 000000000000..fde823694d4a
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorSourceTaskExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class CollectorSourceTaskExecutor extends CollectorTaskExecutor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectorSourceTaskExecutor.class);
+
+ private static final Map SOURCE_EXECUTOR = new ConcurrentHashMap<>();
+ private static final Map SOURCE_TASK_MAP = new ConcurrentHashMap<>();
+
+ public boolean validateIfAbsent(final String taskId) {
+ return !SOURCE_EXECUTOR.containsKey(taskId) && !SOURCE_TASK_MAP.containsKey(taskId);
+ }
+
+ @Override
+ public Optional getExecutor(final String taskId) {
+ return Optional.of(
+ IoTDBThreadPoolFactory.newSingleThreadExecutor("collector-source-executor-" + taskId));
+ }
+
+ @Override
+ public void recordExecution(
+ final CollectorTask collectorTask, final ExecutorService executorService) {
+ final String taskId = collectorTask.getTaskId();
+ SOURCE_EXECUTOR.put(taskId, executorService);
+ SOURCE_TASK_MAP.putIfAbsent(taskId, collectorTask);
+
+ LOGGER.info("register collector source task {}", taskId);
+ }
+
+ @Override
+ public void eraseExecution(String taskId) {
+ SOURCE_TASK_MAP.remove(taskId).stop();
+ SOURCE_EXECUTOR.remove(taskId).shutdownNow();
+
+ LOGGER.info("deregister collector source task {}", taskId);
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
new file mode 100644
index 000000000000..51dc5675d373
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+import org.apache.iotdb.collector.agent.task.CollectorTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+public abstract class CollectorTaskExecutor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectorTaskExecutor.class);
+
+ public void register(final CollectorTask collectorTask) {
+ if (validateIfAbsent(collectorTask.getTaskId())) {
+ getExecutor(collectorTask.getTaskId())
+ .ifPresent(
+ executor -> {
+ executor.submit(collectorTask);
+ recordExecution(collectorTask, executor);
+ });
+ } else {
+ LOGGER.warn("task {} has existed", collectorTask.getTaskId());
+ }
+ }
+
+ public abstract boolean validateIfAbsent(final String taskId);
+
+ public abstract Optional getExecutor(final String taskId);
+
+ public abstract void recordExecution(
+ final CollectorTask collectorTask, final ExecutorService executorService);
+
+ public void deregister(final String taskId) {
+ if (!validateIfAbsent(taskId)) {
+ eraseExecution(taskId);
+ } else {
+ LOGGER.warn("task {} has not existed", taskId);
+ }
+ }
+
+ public abstract void eraseExecution(final String taskId);
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
new file mode 100644
index 000000000000..5adcacebae78
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/executor/CollectorTaskExecutorAgent.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.executor;
+
+public class CollectorTaskExecutorAgent {
+
+ private final CollectorSourceTaskExecutor sourceTaskExecutor;
+ private final CollectorProcessorTaskExecutor processorTaskExecutor;
+ private final CollectorSinkTaskExecutor sinkTaskExecutor;
+
+ private CollectorTaskExecutorAgent() {
+ sourceTaskExecutor = new CollectorSourceTaskExecutor();
+ processorTaskExecutor = new CollectorProcessorTaskExecutor();
+ sinkTaskExecutor = new CollectorSinkTaskExecutor();
+ }
+
+ public CollectorSourceTaskExecutor getSourceTaskExecutor() {
+ return CollectorTaskExecutorAgentHolder.INSTANCE.sourceTaskExecutor;
+ }
+
+ public CollectorProcessorTaskExecutor getProcessorTaskExecutor() {
+ return CollectorTaskExecutorAgentHolder.INSTANCE.processorTaskExecutor;
+ }
+
+ public CollectorSinkTaskExecutor getSinkTaskExecutor() {
+ return CollectorTaskExecutorAgentHolder.INSTANCE.sinkTaskExecutor;
+ }
+
+ public static CollectorTaskExecutorAgent instance() {
+ return CollectorTaskExecutorAgentHolder.INSTANCE;
+ }
+
+ private static class CollectorTaskExecutorAgentHolder {
+ private static final CollectorTaskExecutorAgent INSTANCE = new CollectorTaskExecutorAgent();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
new file mode 100644
index 000000000000..33ee5cae0320
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginAgent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.plugin;
+
+public class CollectorPluginAgent {
+ private final CollectorPluginConstructor collectorPluginConstructor =
+ CollectorPluginConstructor.instance();
+
+ private CollectorPluginAgent() {}
+
+ public CollectorPluginConstructor constructor() {
+ return CollectorPluginAgentHolder.INSTANCE.collectorPluginConstructor;
+ }
+
+ public static CollectorPluginAgent instance() {
+ return new CollectorPluginAgent();
+ }
+
+ private static class CollectorPluginAgentHolder {
+ private static final CollectorPluginAgent INSTANCE = new CollectorPluginAgent();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
new file mode 100644
index 000000000000..cce94f649bcf
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/plugin/CollectorPluginConstructor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.plugin;
+
+import org.apache.iotdb.collector.plugin.BuiltinCollectorPlugin;
+import org.apache.iotdb.collector.plugin.builtin.processor.DoNothingProcessor;
+import org.apache.iotdb.collector.plugin.builtin.sink.SessionSink;
+import org.apache.iotdb.collector.plugin.builtin.source.HttpSource;
+import org.apache.iotdb.pipe.api.PipePlugin;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.PipeSink;
+import org.apache.iotdb.pipe.api.PipeSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class CollectorPluginConstructor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectorPluginConstructor.class);
+
+ protected final Map> pluginConstructors = new HashMap<>();
+
+ private CollectorPluginConstructor() {
+ initConstructors();
+ }
+
+ private void initConstructors() {
+ pluginConstructors.put(
+ BuiltinCollectorPlugin.HTTP_SOURCE.getCollectorPluginName(), HttpSource::new);
+ pluginConstructors.put(
+ BuiltinCollectorPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName(),
+ DoNothingProcessor::new);
+ pluginConstructors.put(
+ BuiltinCollectorPlugin.IOTDB_SESSION_SINK.getCollectorPluginName(), SessionSink::new);
+ LOGGER.info("builtin plugin has been initialized");
+ }
+
+ public PipeSource getSource(final String pluginName) {
+ return (PipeSource) pluginConstructors.get(pluginName).get();
+ }
+
+ public PipeProcessor getProcessor(final String pluginName) {
+ return (PipeProcessor) pluginConstructors.get(pluginName).get();
+ }
+
+ public PipeSink getSink(final String pluginName) {
+ return (PipeSink) pluginConstructors.get(pluginName).get();
+ }
+
+ public static CollectorPluginConstructor instance() {
+ return CollectorPluginConstructorHolder.INSTANCE;
+ }
+
+ private static class CollectorPluginConstructorHolder {
+ private static final CollectorPluginConstructor INSTANCE = new CollectorPluginConstructor();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
new file mode 100644
index 000000000000..8ee62d5e7313
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorProcessorTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.collector.agent.collect.CollectorEventCollector;
+import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier;
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class CollectorProcessorTask extends CollectorTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectorProcessorTask.class);
+
+ private final Map processorAttribute;
+ private final PipeProcessor pipeProcessor;
+ private final EventSupplier eventSupplier;
+ private final BlockingQueue pendingQueue;
+ private final CollectorEventCollector collectorEventCollector;
+ private boolean isStarted = true;
+
+ public CollectorProcessorTask(
+ final String taskId,
+ final Map processorAttribute,
+ final PipeProcessor pipeProcessor,
+ final EventSupplier eventSupplier,
+ final BlockingQueue pendingQueue) {
+ super(taskId);
+ this.processorAttribute = processorAttribute;
+ this.pipeProcessor = pipeProcessor;
+ this.eventSupplier = eventSupplier;
+ this.pendingQueue = pendingQueue;
+ this.collectorEventCollector = new CollectorEventCollector(pendingQueue);
+ }
+
+ @Override
+ public void runMayThrow() {
+ while (isStarted) {
+ try {
+ pipeProcessor.process(eventSupplier.supply(), collectorEventCollector);
+ } catch (final Exception e) {
+ LOGGER.warn("error occur while processing event because {}", e.getMessage());
+ }
+ }
+ }
+
+ public Map getProcessorAttribute() {
+ return processorAttribute;
+ }
+
+ public PipeProcessor getPipeProcessor() {
+ return pipeProcessor;
+ }
+
+ public EventSupplier getEventSupplier() {
+ return eventSupplier;
+ }
+
+ public BlockingQueue getPendingQueue() {
+ return pendingQueue;
+ }
+
+ public void stop() {
+ isStarted = false;
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
new file mode 100644
index 000000000000..77e65b4ec469
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSinkTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.pipe.api.PipeSink;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class CollectorSinkTask extends CollectorTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectorSinkTask.class);
+
+ private final Map sinkAttribute;
+ private final PipeSink pipeSink;
+ private final BlockingQueue pendingQueue;
+ private boolean isStarted = true;
+
+ public CollectorSinkTask(
+ final String taskId,
+ final Map sinkAttribute,
+ final PipeSink pipeSink,
+ final BlockingQueue pendingQueue) {
+ super(taskId);
+ this.sinkAttribute = sinkAttribute;
+ this.pipeSink = pipeSink;
+ this.pendingQueue = pendingQueue;
+ }
+
+ @Override
+ public void runMayThrow() {
+ try {
+ pipeSink.handshake();
+ } catch (final Exception e) {
+ LOGGER.warn("handshake fail because {}", e.getMessage());
+ }
+ isStarted = true;
+ while (isStarted) {
+ try {
+ final Event event = pendingQueue.take();
+ pipeSink.transfer(event);
+ LOGGER.info("transfer event {} success, remain number is {}", event, pendingQueue.size());
+ } catch (final InterruptedException e) {
+ LOGGER.warn("interrupted while waiting for take a event");
+ } catch (final Exception e) {
+ LOGGER.warn("error occur while transfer event to endpoint");
+ }
+ }
+ }
+
+ public Map getSinkAttribute() {
+ return sinkAttribute;
+ }
+
+ public PipeSink getPipeSink() {
+ return pipeSink;
+ }
+
+ public void stop() {
+ isStarted = false;
+ }
+
+ public BlockingQueue getPendingQueue() {
+ return pendingQueue;
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
new file mode 100644
index 000000000000..26230984713c
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorSourceTask.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier;
+import org.apache.iotdb.pipe.api.PipeSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class CollectorSourceTask extends CollectorTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectorSourceTask.class);
+
+ private final Map sourceAttribute;
+ private final PipeSource pipeSource;
+
+ public CollectorSourceTask(
+ final String taskId, final Map sourceAttribute, final PipeSource pipeSource) {
+ super(taskId);
+ this.sourceAttribute = sourceAttribute;
+ this.pipeSource = pipeSource;
+ }
+
+ @Override
+ public void runMayThrow() throws Throwable {
+ pipeSource.start();
+ }
+
+ public Map getSourceAttribute() {
+ return sourceAttribute;
+ }
+
+ public PipeSource getPipeSource() {
+ return pipeSource;
+ }
+
+ public EventSupplier getEventSupplier() {
+ return pipeSource::supply;
+ }
+
+ @Override
+ public void stop() {
+ try {
+ pipeSource.close();
+ } catch (final Exception e) {
+ LOGGER.warn("failed to close pipe source {}", pipeSource, e);
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
new file mode 100644
index 000000000000..4f13ff9c4d98
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTask.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+
+public abstract class CollectorTask extends WrappedRunnable {
+
+ protected final String taskId;
+
+ protected CollectorTask(final String taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public abstract void stop();
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
new file mode 100644
index 000000000000..9cd787df0487
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/agent/task/CollectorTaskAgent.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.agent.task;
+
+import org.apache.iotdb.collector.agent.executor.CollectorProcessorTaskExecutor;
+import org.apache.iotdb.collector.agent.executor.CollectorSinkTaskExecutor;
+import org.apache.iotdb.collector.agent.executor.CollectorSourceTaskExecutor;
+import org.apache.iotdb.collector.agent.executor.CollectorTaskExecutorAgent;
+import org.apache.iotdb.collector.agent.plugin.CollectorPluginAgent;
+import org.apache.iotdb.collector.agent.plugin.CollectorPluginConstructor;
+import org.apache.iotdb.collector.plugin.BuiltinCollectorPlugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class CollectorTaskAgent {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CollectorTaskAgent.class);
+
+ private static final CollectorPluginConstructor CONSTRUCTOR =
+ CollectorPluginAgent.instance().constructor();
+ private static final CollectorSourceTaskExecutor SOURCE_TASK_EXECUTOR =
+ CollectorTaskExecutorAgent.instance().getSourceTaskExecutor();
+ private static final CollectorProcessorTaskExecutor PROCESSOR_TASK_EXECUTOR =
+ CollectorTaskExecutorAgent.instance().getProcessorTaskExecutor();
+ private static final CollectorSinkTaskExecutor SINK_TASK_EXECUTOR =
+ CollectorTaskExecutorAgent.instance().getSinkTaskExecutor();
+
+ private CollectorTaskAgent() {}
+
+ public boolean createCollectorTask(
+ final Map sourceAttribute,
+ final Map processorAttribute,
+ final Map sinkAttribute,
+ final String taskId) {
+ try {
+ final CollectorSourceTask collectorSourceTask =
+ new CollectorSourceTask(
+ taskId,
+ sourceAttribute,
+ CONSTRUCTOR.getSource(
+ sourceAttribute.getOrDefault(
+ "source-plugin",
+ BuiltinCollectorPlugin.HTTP_SOURCE.getCollectorPluginName())));
+ SOURCE_TASK_EXECUTOR.register(collectorSourceTask);
+
+ final CollectorProcessorTask collectorProcessorTask =
+ new CollectorProcessorTask(
+ taskId,
+ processorAttribute,
+ CONSTRUCTOR.getProcessor(
+ processorAttribute.getOrDefault(
+ "processor-plugin",
+ BuiltinCollectorPlugin.DO_NOTHING_PROCESSOR.getCollectorPluginName())),
+ collectorSourceTask.getEventSupplier(),
+ new LinkedBlockingQueue<>());
+ PROCESSOR_TASK_EXECUTOR.register(collectorProcessorTask);
+
+ final CollectorSinkTask collectorSinkTask =
+ new CollectorSinkTask(
+ taskId,
+ sinkAttribute,
+ CONSTRUCTOR.getSink(
+ sinkAttribute.getOrDefault(
+ "sink-plugin",
+ BuiltinCollectorPlugin.IOTDB_SESSION_SINK.getCollectorPluginName())),
+ collectorProcessorTask.getPendingQueue());
+ SINK_TASK_EXECUTOR.register(collectorSinkTask);
+ } catch (final Exception e) {
+ LOGGER.warn("create collector task error", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ public boolean stopCollectorTask(final String taskId) {
+ try {
+ SOURCE_TASK_EXECUTOR.deregister(taskId);
+ PROCESSOR_TASK_EXECUTOR.deregister(taskId);
+ SINK_TASK_EXECUTOR.deregister(taskId);
+ } catch (final Exception e) {
+ LOGGER.warn("stop collector task error", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ public static CollectorTaskAgent instance() {
+ return CollectorTaskAgentHolder.INSTANCE;
+ }
+
+ private static class CollectorTaskAgentHolder {
+ private static final CollectorTaskAgent INSTANCE = new CollectorTaskAgent();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/filter/ApiOriginFilter.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/filter/ApiOriginFilter.java
new file mode 100644
index 000000000000..008e30904194
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/filter/ApiOriginFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.filter;
+
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+
+public class ApiOriginFilter implements javax.servlet.Filter {
+ @Override
+ public void doFilter(
+ final ServletRequest request, final ServletResponse response, final FilterChain chain)
+ throws IOException, ServletException {
+ final HttpServletResponse res = (HttpServletResponse) response;
+ res.addHeader("Access-Control-Allow-Origin", "*");
+ res.addHeader("Access-Control-Allow-Methods", "GET, POST");
+ res.addHeader("Access-Control-Allow-Headers", "*");
+ chain.doFilter(request, response);
+ }
+
+ @Override
+ public void destroy() {
+ // do nothing
+ }
+
+ @Override
+ public void init(final FilterConfig filterConfig) throws ServletException {
+ // do nothing
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
new file mode 100644
index 000000000000..45d62ee5458b
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/impl/PingApiServiceImpl.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.collector.api.impl;
+
+import org.apache.iotdb.application.protocol.rest.PingApiService;
+import org.apache.iotdb.application.protocol.rest.v1.model.ExecutionStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class PingApiServiceImpl extends PingApiService {
+
+ @Override
+ public Response tryPing(final SecurityContext securityContext) {
+ return Response.ok()
+ .entity(
+ new ExecutionStatus()
+ .code(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ .message(TSStatusCode.SUCCESS_STATUS.name()))
+ .build();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
new file mode 100644
index 000000000000..b07452113f37
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/handler/RequestValidationHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.handler;
+
+import org.apache.iotdb.application.protocol.rest.v1.model.CreatePipeRequest;
+import org.apache.iotdb.application.protocol.rest.v1.model.StopPipeRequest;
+
+import java.util.Objects;
+
+public class RequestValidationHandler {
+ private RequestValidationHandler() {}
+
+ public static void validateCreateRequest(final CreatePipeRequest createPipeRequest) {
+ Objects.requireNonNull(createPipeRequest.getTaskId(), "taskId cannot be null");
+ }
+
+ public static void validateStopRequest(final StopPipeRequest stopPipeRequest) {
+ Objects.requireNonNull(stopPipeRequest.getTaskId(), "taskId cannot be null");
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
new file mode 100644
index 000000000000..a7a25dfdd6f3
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/impl/AdminApiServiceImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.api.v1.impl;
+
+import org.apache.iotdb.application.protocol.rest.v1.AdminApiService;
+import org.apache.iotdb.application.protocol.rest.v1.NotFoundException;
+import org.apache.iotdb.application.protocol.rest.v1.model.AlterPipeRequest;
+import org.apache.iotdb.application.protocol.rest.v1.model.CreatePipeRequest;
+import org.apache.iotdb.application.protocol.rest.v1.model.DropPipeRequest;
+import org.apache.iotdb.application.protocol.rest.v1.model.StartPipeRequest;
+import org.apache.iotdb.application.protocol.rest.v1.model.StopPipeRequest;
+import org.apache.iotdb.collector.agent.CollectorAgent;
+import org.apache.iotdb.collector.api.v1.handler.RequestValidationHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.SecurityContext;
+
+public class AdminApiServiceImpl extends AdminApiService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AdminApiServiceImpl.class);
+
+ @Override
+ public Response alterPipe(
+ final AlterPipeRequest alterPipeRequest, final SecurityContext securityContext)
+ throws NotFoundException {
+ return Response.ok("alterPipe").build();
+ }
+
+ @Override
+ public Response createPipe(
+ final CreatePipeRequest createPipeRequest, final SecurityContext securityContext)
+ throws NotFoundException {
+ RequestValidationHandler.validateCreateRequest(createPipeRequest);
+
+ final boolean createdResult =
+ CollectorAgent.task()
+ .createCollectorTask(
+ createPipeRequest.getSourceAttribute(),
+ createPipeRequest.getProcessorAttribute(),
+ createPipeRequest.getSinkAttribute(),
+ createPipeRequest.getTaskId());
+ if (createdResult) {
+ LOGGER.info("Create task successful");
+ return Response.status(Response.Status.OK).entity("create task success").build();
+ }
+ LOGGER.warn("Create task failed");
+ return Response.status(Response.Status.BAD_REQUEST).entity("create task fail").build();
+ }
+
+ @Override
+ public Response dropPipe(
+ final DropPipeRequest dropPipeRequest, final SecurityContext securityContext)
+ throws NotFoundException {
+ return Response.ok("dropPipe").build();
+ }
+
+ @Override
+ public Response startPipe(
+ final StartPipeRequest startPipeRequest, final SecurityContext securityContext)
+ throws NotFoundException {
+ return Response.ok("startPipe").build();
+ }
+
+ @Override
+ public Response stopPipe(
+ final StopPipeRequest stopPipeRequest, final SecurityContext securityContext)
+ throws NotFoundException {
+ RequestValidationHandler.validateStopRequest(stopPipeRequest);
+
+ final boolean stopResult = CollectorAgent.task().stopCollectorTask(stopPipeRequest.getTaskId());
+ if (stopResult) {
+ LOGGER.info("Stop task: {} successful", stopPipeRequest.getTaskId());
+ return Response.ok().entity("stop task: " + stopPipeRequest.getTaskId() + " success").build();
+ }
+ LOGGER.warn("Stop task: {} failed", stopPipeRequest.getTaskId());
+ return Response.status(Response.Status.BAD_REQUEST).entity("stop task fail").build();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
new file mode 100644
index 000000000000..2e3ee556a16a
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/ApiServiceOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+public class ApiServiceOptions extends Options {
+
+ public static final Option PORT =
+ new Option("api_service_port", 17070) {
+ @Override
+ public void setValue(String valueString) {
+ value = Integer.parseInt(valueString);
+ }
+ };
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Configuration.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Configuration.java
new file mode 100644
index 000000000000..e41144ce69d0
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Configuration.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+import org.apache.iotdb.commons.conf.TrimProperties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.Properties;
+
+public class Configuration {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Configuration.class);
+
+ private static final String CONFIG_FILE_NAME = "application.properties";
+
+ private final Options options = new Options();
+
+ public Configuration() {
+ loadProps();
+ }
+
+ private void loadProps() {
+ final Optional url = getPropsUrl();
+ if (url.isPresent()) {
+ try (final InputStream inputStream = url.get().openStream()) {
+ LOGGER.info("Start to read config file {}", url.get());
+ final Properties properties = new Properties();
+ properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
+ final TrimProperties trimProperties = new TrimProperties();
+ trimProperties.putAll(properties);
+ options.loadProperties(trimProperties);
+ } catch (final FileNotFoundException e) {
+ LOGGER.error("Fail to find config file, reject startup.", e);
+ System.exit(-1);
+ } catch (final IOException e) {
+ LOGGER.error("IO exception when reading config file, reject startup.", e);
+ System.exit(-1);
+ } catch (final Exception e) {
+ LOGGER.error("Unexpected exception when reading config file, reject startup.", e);
+ System.exit(-1);
+ }
+ } else {
+ LOGGER.warn("{} is not found, use default configuration", CONFIG_FILE_NAME);
+ }
+ }
+
+ private Optional getPropsUrl() {
+ final URL url = Options.class.getResource("/" + CONFIG_FILE_NAME);
+
+ if (url != null) {
+ return Optional.of(url);
+ } else {
+ LOGGER.warn(
+ "Cannot find IOTDB_COLLECTOR_HOME or IOTDB_COLLECTOR_CONF environment variable when loading "
+ + "config file {}, use default configuration",
+ CONFIG_FILE_NAME);
+ return Optional.empty();
+ }
+ }
+
+ public void logAllOptions() {
+ options.logAllOptions();
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
new file mode 100644
index 000000000000..bad372bf8f1d
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+import org.apache.iotdb.commons.conf.TrimProperties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Options {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Options.class);
+
+ private static final Map> OPTIONS = new ConcurrentHashMap<>();
+
+ public abstract static class Option {
+
+ private final String key;
+ private final T defaultValue;
+ protected T value;
+
+ Option(final String key, final T defaultValue) {
+ this.key = key;
+ this.defaultValue = defaultValue;
+
+ OPTIONS.put(key, this);
+ }
+
+ public String key() {
+ return key;
+ }
+
+ public boolean hasDefaultValue() {
+ return defaultValue != null;
+ }
+
+ public T defaultValue() {
+ return defaultValue;
+ }
+
+ public T value() {
+ return value == null ? defaultValue : value;
+ }
+
+ public abstract void setValue(final String valueString);
+
+ @Override
+ public String toString() {
+ return key + " = " + value();
+ }
+ }
+
+ public void loadProperties(final TrimProperties properties) {
+ properties
+ .stringPropertyNames()
+ .forEach(
+ key -> {
+ final Option> option = OPTIONS.get(key);
+ if (option != null) {
+ try {
+ option.setValue(properties.getProperty(key));
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Unexpected exception when setting value for option: {}, given value: {}",
+ key,
+ properties.getProperty(key),
+ e);
+ }
+ }
+ });
+ }
+
+ public Optional