diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java index 89bd887b73957..2e62f618091c4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java @@ -22,9 +22,11 @@ import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.AIClusterIT; +import org.apache.iotdb.itbase.env.BaseEnv; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -42,35 +44,70 @@ @Category({AIClusterIT.class}) public class AINodeClusterConfigIT { - @BeforeClass - public static void setUp() throws Exception { + @Before + public void setUp() throws Exception { // Init 1C1D1A cluster environment EnvFactory.getEnv().initClusterEnvironment(1, 1); } - @AfterClass - public static void tearDown() throws Exception { + @After + public void tearDown() throws Exception { EnvFactory.getEnv().cleanClusterEnvironment(); } @Test - public void aiNodeRegisterTest() throws SQLException { - String sql = "SHOW AINODES"; - String title = "NodeID,Status,InternalAddress,InternalPort"; - try (Connection connection = EnvFactory.getEnv().getConnection(); + public void aiNodeRegisterAndRemoveTestInTree() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); Statement statement = connection.createStatement()) { + aiNodeRegisterAndRemoveTest(statement); + } + } + + @Test + public void aiNodeRegisterAndRemoveTestInTable() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + aiNodeRegisterAndRemoveTest(statement); + } + } - try (ResultSet resultSet = statement.executeQuery(sql)) { + private void aiNodeRegisterAndRemoveTest(Statement statement) throws SQLException { + String show_sql = "SHOW AINODES"; + String title = "NodeID,Status,InternalAddress,InternalPort"; + try (ResultSet resultSet = statement.executeQuery(show_sql)) { + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + checkHeader(resultSetMetaData, title); + int count = 0; + while (resultSet.next()) { + assertEquals("2", resultSet.getString(1)); + assertEquals("Running", resultSet.getString(2)); + count++; + } + assertEquals(1, count); + } + String remove_sql = "REMOVE AINODE"; + statement.execute(remove_sql); + for (int retry = 0; retry < 500; retry++) { + try (ResultSet resultSet = statement.executeQuery(show_sql)) { ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); checkHeader(resultSetMetaData, title); int count = 0; while (resultSet.next()) { - assertEquals("2", resultSet.getString(1)); - assertEquals("Running", resultSet.getString(2)); count++; } - assertEquals(1, count); + if (count == 0) { + return; // Successfully removed the AI node + } + } + try { + Thread.sleep(1000); // Wait before retrying + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } + Assert.fail("The target AINode is not removed successfully after all retries."); } + + // TODO: We might need to add remove unknown test in the future, but current infrastructure is too + // hard to implement it. } diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java index ce21742ae5465..9ddeebac476a1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java @@ -585,7 +585,7 @@ public void testInformationSchema() throws SQLException { statement.executeQuery( "select * from information_schema.keywords where reserved > 0 limit 1"), "word,reserved,", - Collections.singleton("AINODES,1,")); + Collections.singleton("AINODE,1,")); } try (final Connection connection = @@ -703,7 +703,7 @@ public void testInformationSchema() throws SQLException { statement.executeQuery( "select * from information_schema.keywords where reserved > 0 limit 1"), "word,reserved,", - Collections.singleton("AINODES,1,")); + Collections.singleton("AINODE,1,")); TestUtils.assertResultSetEqual( statement.executeQuery("select distinct(status) from information_schema.nodes"), diff --git a/iotdb-core/ainode/ainode/core/ainode.py b/iotdb-core/ainode/ainode/core/ainode.py new file mode 100644 index 0000000000000..6380094b98a01 --- /dev/null +++ b/iotdb-core/ainode/ainode/core/ainode.py @@ -0,0 +1,167 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import os +import signal +import threading +from datetime import datetime + +import psutil + +from ainode.core.config import AINodeDescriptor +from ainode.core.constant import AINODE_SYSTEM_FILE_NAME +from ainode.core.log import Logger +from ainode.core.rpc.client import ClientManager +from ainode.core.rpc.handler import AINodeRPCServiceHandler +from ainode.core.rpc.service import AINodeRPCService +from ainode.thrift.common.ttypes import ( + TAINodeConfiguration, + TAINodeLocation, + TEndPoint, + TNodeResource, +) +from ainode.thrift.confignode.ttypes import TNodeVersionInfo + +logger = Logger() + + +def _generate_configuration() -> TAINodeConfiguration: + location = TAINodeLocation( + AINodeDescriptor().get_config().get_ainode_id(), + TEndPoint( + AINodeDescriptor().get_config().get_ain_inference_rpc_address(), + AINodeDescriptor().get_config().get_ain_inference_rpc_port(), + ), + ) + resource = TNodeResource(int(psutil.cpu_count()), int(psutil.virtual_memory()[0])) + + return TAINodeConfiguration(location, resource) + + +def _generate_version_info() -> TNodeVersionInfo: + return TNodeVersionInfo( + AINodeDescriptor().get_config().get_version_info(), + AINodeDescriptor().get_config().get_build_info(), + ) + + +def _check_path_permission(): + system_path = AINodeDescriptor().get_config().get_ain_system_dir() + if not os.path.exists(system_path): + try: + os.makedirs(system_path) + os.chmod(system_path, 0o777) + except PermissionError as e: + logger.error(e) + raise e + + +def _generate_system_properties(ainode_id: int): + return { + "ainode_id": ainode_id, + "cluster_name": AINodeDescriptor().get_config().get_cluster_name(), + "iotdb_version": AINodeDescriptor().get_config().get_version_info(), + "commit_id": AINodeDescriptor().get_config().get_build_info(), + "ain_rpc_address": AINodeDescriptor() + .get_config() + .get_ain_inference_rpc_address(), + "ain_rpc_port": AINodeDescriptor().get_config().get_ain_inference_rpc_port(), + "config_node_list": AINodeDescriptor() + .get_config() + .get_ain_target_config_node_list(), + } + + +class AINode: + def __init__(self): + self._rpc_service = None + self._rpc_handler = None + self._stop_event = None + + def start(self): + _check_path_permission() + system_properties_file = os.path.join( + AINodeDescriptor().get_config().get_ain_system_dir(), + AINODE_SYSTEM_FILE_NAME, + ) + if not os.path.exists(system_properties_file): + # If the system.properties file does not exist, the AINode will register to IoTDB cluster. + try: + logger.info("IoTDB-AINode is registering to IoTDB cluster...") + ainode_id = ( + ClientManager() + .borrow_config_node_client() + .node_register( + AINodeDescriptor().get_config().get_cluster_name(), + _generate_configuration(), + _generate_version_info(), + ) + ) + AINodeDescriptor().get_config().set_ainode_id(ainode_id) + system_properties = _generate_system_properties(ainode_id) + with open(system_properties_file, "w") as f: + f.write("#" + str(datetime.now()) + "\n") + for key, value in system_properties.items(): + f.write(key + "=" + str(value) + "\n") + except Exception as e: + logger.error( + "IoTDB-AINode failed to register to IoTDB cluster: {}".format(e) + ) + raise e + else: + # If the system.properties file does exist, the AINode will just restart. + try: + logger.info("IoTDB-AINode is restarting...") + ClientManager().borrow_config_node_client().node_restart( + AINodeDescriptor().get_config().get_cluster_name(), + _generate_configuration(), + _generate_version_info(), + ) + except Exception as e: + logger.error("IoTDB-AINode failed to restart: {}".format(e)) + raise e + + # Start the RPC service + self._rpc_handler = AINodeRPCServiceHandler(aiNode=self) + self._rpc_service = AINodeRPCService(self._rpc_handler) + self._rpc_service.start() + self._rpc_service.join(1) + if self._rpc_service.exit_code != 0: + logger.info("IoTDB-AINode failed to start, please check previous logs.") + return + + logger.info("IoTDB-AINode has successfully started.") + + # Register stop hook + self._stop_event = threading.Event() + signal.signal(signal.SIGTERM, self._handle_signal) + + def _handle_signal(self, signum, frame): + signal_name = {signal.SIGTERM: "SIGTERM", signal.SIGINT: "SIGINT"}.get( + signum, f"SIGNAL {signum}" + ) + + logger.info(f"IoTDB-AINode receives {signal_name}, initiating graceful stop...") + self.stop() + + def stop(self): + if not self._stop_event.is_set(): + self._stop_event.set() + if self._rpc_service: + self._rpc_service.stop() + self._rpc_service.join(1) + logger.info("IoTDB-AINode has successfully stopped.") diff --git a/iotdb-core/ainode/ainode/core/manager/inference_manager.py b/iotdb-core/ainode/ainode/core/manager/inference_manager.py index a8109e278db06..37e43898c21a0 100644 --- a/iotdb-core/ainode/ainode/core/manager/inference_manager.py +++ b/iotdb-core/ainode/ainode/core/manager/inference_manager.py @@ -33,8 +33,8 @@ from ainode.core.manager.model_manager import ModelManager from ainode.core.model.sundial.modeling_sundial import SundialForPrediction from ainode.core.model.timerxl.modeling_timer import TimerForPrediction +from ainode.core.rpc.status import get_status from ainode.core.util.serde import convert_to_binary -from ainode.core.util.status import get_status from ainode.thrift.ainode.ttypes import ( TForecastReq, TForecastResp, diff --git a/iotdb-core/ainode/ainode/core/manager/model_manager.py b/iotdb-core/ainode/ainode/core/manager/model_manager.py index bb589a281bf63..8914a23ec847b 100644 --- a/iotdb-core/ainode/ainode/core/manager/model_manager.py +++ b/iotdb-core/ainode/ainode/core/manager/model_manager.py @@ -28,7 +28,7 @@ from ainode.core.log import Logger from ainode.core.model.model_info import BuiltInModelType, ModelInfo, ModelStates from ainode.core.model.model_storage import ModelStorage -from ainode.core.util.status import get_status +from ainode.core.rpc.status import get_status from ainode.thrift.ainode.ttypes import ( TDeleteModelReq, TRegisterModelReq, diff --git a/iotdb-core/ainode/ainode/core/rpc/__init__.py b/iotdb-core/ainode/ainode/core/rpc/__init__.py new file mode 100644 index 0000000000000..2a1e720805f29 --- /dev/null +++ b/iotdb-core/ainode/ainode/core/rpc/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# diff --git a/iotdb-core/ainode/ainode/core/client.py b/iotdb-core/ainode/ainode/core/rpc/client.py similarity index 99% rename from iotdb-core/ainode/ainode/core/client.py rename to iotdb-core/ainode/ainode/core/rpc/client.py index 15385928a84a6..e0bf0f2e36275 100644 --- a/iotdb-core/ainode/ainode/core/client.py +++ b/iotdb-core/ainode/ainode/core/rpc/client.py @@ -24,8 +24,8 @@ from ainode.core.config import AINodeDescriptor from ainode.core.constant import TSStatusCode from ainode.core.log import Logger +from ainode.core.rpc.status import verify_success from ainode.core.util.decorator import singleton -from ainode.core.util.status import verify_success from ainode.thrift.common.ttypes import ( TAINodeConfiguration, TAINodeLocation, diff --git a/iotdb-core/ainode/ainode/core/handler.py b/iotdb-core/ainode/ainode/core/rpc/handler.py similarity index 88% rename from iotdb-core/ainode/ainode/core/handler.py rename to iotdb-core/ainode/ainode/core/rpc/handler.py index 524b80a88d8ab..c0857cf8520b8 100644 --- a/iotdb-core/ainode/ainode/core/handler.py +++ b/iotdb-core/ainode/ainode/core/rpc/handler.py @@ -15,11 +15,12 @@ # specific language governing permissions and limitations # under the License. # - +from ainode.core.constant import TSStatusCode from ainode.core.log import Logger from ainode.core.manager.cluster_manager import ClusterManager from ainode.core.manager.inference_manager import InferenceManager from ainode.core.manager.model_manager import ModelManager +from ainode.core.rpc.status import get_status from ainode.thrift.ainode import IAINodeRPCService from ainode.thrift.ainode.ttypes import ( TAIHeartbeatReq, @@ -40,10 +41,15 @@ class AINodeRPCServiceHandler(IAINodeRPCService.Iface): - def __init__(self): + def __init__(self, aiNode): + self._aiNode = aiNode self._model_manager = ModelManager() self._inference_manager = InferenceManager(model_manager=self._model_manager) + def stopAINode(self) -> TSStatus: + self._aiNode.stop() + return get_status(TSStatusCode.SUCCESS_STATUS, "AINode stopped successfully.") + def registerModel(self, req: TRegisterModelReq) -> TRegisterModelResp: return self._model_manager.register_model(req) diff --git a/iotdb-core/ainode/ainode/core/rpc/service.py b/iotdb-core/ainode/ainode/core/rpc/service.py new file mode 100644 index 0000000000000..72bd38c4adef7 --- /dev/null +++ b/iotdb-core/ainode/ainode/core/rpc/service.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import threading + +from thrift.protocol import TBinaryProtocol, TCompactProtocol +from thrift.server import TServer +from thrift.transport import TSocket, TTransport + +from ainode.core.config import AINodeDescriptor +from ainode.core.log import Logger +from ainode.core.rpc.handler import AINodeRPCServiceHandler +from ainode.thrift.ainode import IAINodeRPCService + +logger = Logger() + + +class AINodeThreadPoolServer(TServer.TThreadPoolServer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._stop_event = threading.Event() + + def serve(self) -> None: + self._stop_event.clear() + logger.info("The RPC service thread pool of IoTDB-AINode begins to serve...") + """Start a fixed number of worker threads and put client into a queue""" + for i in range(self.threads): + try: + t = threading.Thread(target=self.serveThread) + t.daemon = self.daemon + t.start() + except Exception as x: + logger.error(x) + # Pump the socket for clients + self.serverTransport.listen() + while not self._stop_event.is_set(): + try: + client = self.serverTransport.accept() + if not client: + continue + self.clients.put(client) + except Exception as x: + logger.error(x) + + def stop(self) -> None: + if not self._stop_event.is_set(): + logger.info("Stopping the RPC service thread pool of IoTDB-AINode...") + self._stop_event.set() + self.serverTransport.close() + + +class AINodeRPCService(threading.Thread): + def __init__(self, handler: AINodeRPCServiceHandler): + super().__init__() + self.exit_code = 0 + self._stop_event = threading.Event() + self._handler = handler + processor = IAINodeRPCService.Processor(handler=self._handler) + transport = TSocket.TServerSocket( + host=AINodeDescriptor().get_config().get_ain_inference_rpc_address(), + port=AINodeDescriptor().get_config().get_ain_inference_rpc_port(), + ) + transport_factory = TTransport.TFramedTransportFactory() + if AINodeDescriptor().get_config().get_ain_thrift_compression_enabled(): + protocol_factory = TCompactProtocol.TCompactProtocolFactory() + else: + protocol_factory = TBinaryProtocol.TBinaryProtocolFactory() + # Create daemon thread pool server + self.__pool_server = AINodeThreadPoolServer( + processor, transport, transport_factory, protocol_factory, daemon=True + ) + + def run(self) -> None: + logger.info("The RPC service of IoTDB-AINode begins to run...") + try: + self.__pool_server.serve() + except Exception as e: + self.exit_code = 1 + logger.error(e) + finally: + logger.info("The RPC service of IoTDB-AINode exited.") + + def stop(self) -> None: + if not self._stop_event.is_set(): + logger.info("Stopping the RPC service of IoTDB-AINode...") + self._stop_event.set() + self.__pool_server.stop() diff --git a/iotdb-core/ainode/ainode/core/util/status.py b/iotdb-core/ainode/ainode/core/rpc/status.py similarity index 100% rename from iotdb-core/ainode/ainode/core/util/status.py rename to iotdb-core/ainode/ainode/core/rpc/status.py diff --git a/iotdb-core/ainode/ainode/core/script.py b/iotdb-core/ainode/ainode/core/script.py index 84a44924828b7..51286bf36356c 100644 --- a/iotdb-core/ainode/ainode/core/script.py +++ b/iotdb-core/ainode/ainode/core/script.py @@ -18,123 +18,21 @@ import os import shutil import sys -from datetime import datetime -import psutil - -from ainode.core.client import ClientManager +from ainode.core.ainode import AINode from ainode.core.config import AINodeDescriptor -from ainode.core.constant import AINODE_SYSTEM_FILE_NAME, TSStatusCode +from ainode.core.constant import TSStatusCode from ainode.core.exception import MissingConfigError from ainode.core.log import Logger -from ainode.core.service import RPCService +from ainode.core.rpc.client import ClientManager from ainode.thrift.common.ttypes import ( - TAINodeConfiguration, TAINodeLocation, TEndPoint, - TNodeResource, ) -from ainode.thrift.confignode.ttypes import TNodeVersionInfo logger = Logger() -def _generate_configuration() -> TAINodeConfiguration: - location = TAINodeLocation( - AINodeDescriptor().get_config().get_ainode_id(), - TEndPoint( - AINodeDescriptor().get_config().get_ain_inference_rpc_address(), - AINodeDescriptor().get_config().get_ain_inference_rpc_port(), - ), - ) - resource = TNodeResource(int(psutil.cpu_count()), int(psutil.virtual_memory()[0])) - - return TAINodeConfiguration(location, resource) - - -def _generate_version_info() -> TNodeVersionInfo: - return TNodeVersionInfo( - AINodeDescriptor().get_config().get_version_info(), - AINodeDescriptor().get_config().get_build_info(), - ) - - -def _check_path_permission(): - system_path = AINodeDescriptor().get_config().get_ain_system_dir() - if not os.path.exists(system_path): - try: - os.makedirs(system_path) - os.chmod(system_path, 0o777) - except PermissionError as e: - logger.error(e) - raise e - - -def start_ainode(): - _check_path_permission() - system_properties_file = os.path.join( - AINodeDescriptor().get_config().get_ain_system_dir(), AINODE_SYSTEM_FILE_NAME - ) - if not os.path.exists(system_properties_file): - # If the system.properties file does not exist, the AINode will register to ConfigNode. - try: - logger.info("IoTDB-AINode is registering to ConfigNode...") - ainode_id = ( - ClientManager() - .borrow_config_node_client() - .node_register( - AINodeDescriptor().get_config().get_cluster_name(), - _generate_configuration(), - _generate_version_info(), - ) - ) - AINodeDescriptor().get_config().set_ainode_id(ainode_id) - system_properties = { - "ainode_id": ainode_id, - "cluster_name": AINodeDescriptor().get_config().get_cluster_name(), - "iotdb_version": AINodeDescriptor().get_config().get_version_info(), - "commit_id": AINodeDescriptor().get_config().get_build_info(), - "ain_rpc_address": AINodeDescriptor() - .get_config() - .get_ain_inference_rpc_address(), - "ain_rpc_port": AINodeDescriptor() - .get_config() - .get_ain_inference_rpc_port(), - "config_node_list": AINodeDescriptor() - .get_config() - .get_ain_target_config_node_list(), - } - with open(system_properties_file, "w") as f: - f.write("#" + str(datetime.now()) + "\n") - for key, value in system_properties.items(): - f.write(key + "=" + str(value) + "\n") - - except Exception as e: - logger.error("IoTDB-AINode failed to register to ConfigNode: {}".format(e)) - raise e - else: - # If the system.properties file does exist, the AINode will just restart. - try: - logger.info("IoTDB-AINode is restarting...") - ClientManager().borrow_config_node_client().node_restart( - AINodeDescriptor().get_config().get_cluster_name(), - _generate_configuration(), - _generate_version_info(), - ) - - except Exception as e: - logger.error("IoTDB-AINode failed to restart: {}".format(e)) - raise e - - rpc_service = RPCService() - rpc_service.start() - rpc_service.join(1) - if rpc_service.exit_code != 0: - return - - logger.info("IoTDB-AINode has successfully started.") - - def remove_ainode(arguments): # Delete the current node if len(arguments) == 2: @@ -189,10 +87,12 @@ def main(): if command == "start": try: logger.info("IoTDB-AINode is starting...") - start_ainode() + ai_node = AINode() + ai_node.start() except Exception as e: logger.error("Start AINode failed, because of: {}".format(e)) sys.exit(1) + # TODO: remove the following function, and add a destroy script elif command == "remove": try: logger.info("Removing AINode...") diff --git a/iotdb-core/ainode/ainode/core/service.py b/iotdb-core/ainode/ainode/core/service.py deleted file mode 100644 index 7602ebe9f192d..0000000000000 --- a/iotdb-core/ainode/ainode/core/service.py +++ /dev/null @@ -1,57 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import threading - -from thrift.protocol import TBinaryProtocol, TCompactProtocol -from thrift.server import TServer -from thrift.transport import TSocket, TTransport - -from ainode.core.config import AINodeDescriptor -from ainode.core.handler import AINodeRPCServiceHandler -from ainode.core.log import Logger -from ainode.thrift.ainode import IAINodeRPCService - -logger = Logger() - - -class RPCService(threading.Thread): - def __init__(self): - self.exit_code = 0 - super().__init__() - processor = IAINodeRPCService.Processor(handler=AINodeRPCServiceHandler()) - transport = TSocket.TServerSocket( - host=AINodeDescriptor().get_config().get_ain_inference_rpc_address(), - port=AINodeDescriptor().get_config().get_ain_inference_rpc_port(), - ) - transport_factory = TTransport.TFramedTransportFactory() - if AINodeDescriptor().get_config().get_ain_thrift_compression_enabled(): - protocol_factory = TCompactProtocol.TCompactProtocolFactory() - else: - protocol_factory = TBinaryProtocol.TBinaryProtocolFactory() - - self.__pool_server = TServer.TThreadPoolServer( - processor, transport, transport_factory, protocol_factory - ) - - def run(self) -> None: - logger.info("The RPC service thread begin to run...") - try: - self.__pool_server.serve() - except Exception as e: - self.exit_code = 1 - logger.error(e) diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index a30619f676280..789fac304a6ce 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -63,7 +63,7 @@ ddlStatement // Cluster | showVariables | showCluster | showRegions | showDataNodes | showConfigNodes | showClusterId | getRegionId | getTimeSlotList | countTimeSlotList | getSeriesSlotList - | migrateRegion | reconstructRegion | extendRegion | removeRegion | removeDataNode | removeConfigNode + | migrateRegion | reconstructRegion | extendRegion | removeRegion | removeDataNode | removeConfigNode | removeAINode | verifyConnection // AINode | showAINodes | createModel | dropModel | showModels | callInference @@ -562,6 +562,11 @@ removeConfigNode : REMOVE CONFIGNODE configNodeId=INTEGER_LITERAL ; +// ---- Remove AINode +removeAINode + : REMOVE AINODE (aiNodeId=INTEGER_LITERAL)? + ; + // Pipe Task ========================================================================================= createPipe : CREATE PIPE (IF NOT EXISTS)? pipeName=identifier diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 22cb1e0f539c3..787412e0812ce 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -498,6 +498,10 @@ MIGRATE : M I G R A T E ; +AINODE + : A I N O D E + ; + AINODES : A I N O D E S ; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ainode/RemoveAINodePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ainode/RemoveAINodePlan.java index 92bfb8b7017fa..bf5d8d94f0adc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ainode/RemoveAINodePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ainode/RemoveAINodePlan.java @@ -73,6 +73,11 @@ public int hashCode() { return Objects.hash(super.hashCode(), aiNodeLocation); } + @Override + public String toString() { + return "RemoveAINodePlan{" + "aiNodeLocation=" + aiNodeLocation + '}'; + } + public TAINodeLocation getAINodeLocation() { return aiNodeLocation; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index fdc0ccaeb31e6..6ad2d87fe6f11 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -82,7 +82,6 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; @@ -532,10 +531,10 @@ public TAINodeRestartResp restartAINode(TAINodeRestartReq req) { } @Override - public TSStatus removeAINode(RemoveAINodePlan removeAINodePlan) { + public TSStatus removeAINode() { TSStatus status = confirmLeader(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return nodeManager.removeAINode(removeAINodePlan); + return nodeManager.removeAINode(); } else { return status; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index d47d9375c5e3e..bc080198cb6ba 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -40,7 +40,6 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; @@ -339,10 +338,9 @@ public interface IManager { /** * Remove AINode. * - * @param removeAINodePlan RemoveAINodePlan * @return AINodeToStatusResp */ - TSStatus removeAINode(RemoveAINodePlan removeAINodePlan); + TSStatus removeAINode(); /** * Report that the specified DataNode will be shutdown. diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 2c69a78b2d9d4..15f05f2ba066e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -528,20 +528,17 @@ public synchronized DataSet registerAINode(TAINodeRegisterReq req) { return resp; } - /** - * Remove AINodes. - * - * @param removeAINodePlan removeDataNodePlan - */ - public TSStatus removeAINode(RemoveAINodePlan removeAINodePlan) { - LOGGER.info("NodeManager start to remove AINode {}", removeAINodePlan); - + /** Remove AINodes. */ + public TSStatus removeAINode() { // check if the node exists - if (!nodeInfo.containsAINode(removeAINodePlan.getAINodeLocation().getAiNodeId())) { + if (nodeInfo.getRegisteredAINodes().isEmpty()) { return new TSStatus(TSStatusCode.REMOVE_AI_NODE_ERROR.getStatusCode()) - .setMessage("AINode doesn't exist."); + .setMessage("Remove AINode failed because there is no AINode in the cluster."); } + // We remove the only AINode by default + RemoveAINodePlan removeAINodePlan = + new RemoveAINodePlan(nodeInfo.getRegisteredAINodes().get(0).getLocation()); // Add request to queue, then return to client boolean removeSucceed = configManager.getProcedureManager().removeAINode(removeAINodePlan); TSStatus status; @@ -553,8 +550,7 @@ public TSStatus removeAINode(RemoveAINodePlan removeAINodePlan) { status.setMessage("Server rejected the request, maybe requests are too many"); } - LOGGER.info( - "NodeManager submit RemoveAINodePlan finished, removeAINodePlan: {}", removeAINodePlan); + LOGGER.info("NodeManager submit RemoveAINodePlan finished, {}", removeAINodePlan); return status; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java index 41676414afeed..5f98930d07476 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java @@ -21,6 +21,8 @@ import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.ainode.AINodeClient; +import org.apache.iotdb.commons.client.ainode.AINodeClientManager; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan; import org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan; @@ -68,6 +70,28 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveAINodeState st .getConsensusManager() .write(new DropModelInNodePlan(removedAINode.aiNodeId)); // Cause the AINode is removed, so we don't need to remove the model file. + setNextState(RemoveAINodeState.NODE_STOP); + break; + case NODE_STOP: + TSStatus resp = null; + try (AINodeClient client = + AINodeClientManager.getInstance().borrowClient(removedAINode.getInternalEndPoint())) { + resp = client.stopAINode(); + } catch (Exception e) { + LOGGER.warn( + "Failed to stop AINode {}, but the remove process will continue.", + removedAINode.getInternalEndPoint()); + } + if (resp != null && resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.info("Successfully stopped AINode {}", removedAINode.getInternalEndPoint()); + } else { + if (resp != null) { + LOGGER.warn( + "Failed to stop AINode {} because {}, but the remove process will continue.", + resp.getMessage(), + removedAINode.getInternalEndPoint()); + } + } setNextState(RemoveAINodeState.NODE_REMOVE); break; case NODE_REMOVE: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveAINodeState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveAINodeState.java index eecb5a4d9d985..8a1a6a1bb03b5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveAINodeState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveAINodeState.java @@ -21,5 +21,6 @@ public enum RemoveAINodeState { MODEL_DELETE, + NODE_STOP, NODE_REMOVE } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index e8be7f574c8b0..9c2a1a3cab445 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -59,7 +59,6 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelationalPlan; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorTreePlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; @@ -346,11 +345,9 @@ public TAINodeRestartResp restartAINode(TAINodeRestartReq req) { @Override public TSStatus removeAINode(TAINodeRemoveReq req) { - LOGGER.info("ConfigNode RPC Service start to remove AINode, req: {}", req); - RemoveAINodePlan removeAINodePlan = new RemoveAINodePlan(req.getAiNodeLocation()); - TSStatus status = configManager.removeAINode(removeAINodePlan); - LOGGER.info( - "ConfigNode RPC Service finished to remove AINode, req: {}, result: {}", req, status); + LOGGER.info("ConfigNode RPC Service start to remove AINode"); + TSStatus status = configManager.removeAINode(); + LOGGER.info("ConfigNode RPC Service finished to remove AINode, result: {}", status); return status; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 8685b380b47ae..a3832fd8e972c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -502,7 +502,8 @@ public TAINodeRestartResp restartAINode(TAINodeRestartReq req) throws TException @Override public TSStatus removeAINode(TAINodeRemoveReq req) throws TException { - throw new UnsupportedOperationException(UNSUPPORTED_INVOCATION); + return executeRemoteCallWithRetry( + () -> client.removeAINode(req), status -> !updateConfigNodeLeader(status)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 0c7af068d4b3e..ca72d2f3151ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -81,6 +81,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveAINode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveConfigNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveRegion; @@ -439,6 +440,7 @@ private IQueryExecution createQueryExecutionForTableModel( || statement instanceof PipeStatement || statement instanceof RemoveDataNode || statement instanceof RemoveConfigNode + || statement instanceof RemoveAINode || statement instanceof SubscriptionStatement || statement instanceof ShowCurrentSqlDialect || statement instanceof SetSqlDialect diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index a9282a88b4397..6f204ed11cdfe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CreatePipePluginTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropFunctionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DropPipePluginTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveAINodeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveConfigNodeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveDataNodeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowClusterIdTask; @@ -158,6 +159,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveAINode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveConfigNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveRegion; @@ -198,6 +200,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewrite; import org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatement; @@ -433,6 +436,16 @@ protected IConfigTask visitRemoveConfigNode( return new RemoveConfigNodeTask(treeStatement); } + @Override + protected IConfigTask visitRemoveAINode( + final RemoveAINode removeAINode, final MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + accessControl.checkUserIsAdmin(context.getSession().getUserName()); + // As the implementation is identical, we'll simply translate to the + // corresponding tree-model variant and execute that. + return new RemoveAINodeTask(new RemoveAINodeStatement()); + } + @Override protected IConfigTask visitShowDataNodes( final ShowDataNodes showDataNodesStatement, final MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 3aa4217cd8ecc..34a943b7cddd2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetRegionIdTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetSeriesSlotListTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.GetTimeSlotListTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveAINodeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveConfigNodeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.RemoveDataNodeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.SetTTLTask; @@ -125,6 +126,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; @@ -713,6 +715,12 @@ public IConfigTask visitRemoveConfigNode( return new RemoveConfigNodeTask(removeConfigNodeStatement); } + @Override + public IConfigTask visitRemoveAINode( + RemoveAINodeStatement removeAINodeStatement, MPPQueryContext context) { + return new RemoveAINodeTask(removeAINodeStatement); + } + @Override public IConfigTask visitCreateContinuousQuery( CreateContinuousQueryStatement createContinuousQueryStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index e3c82de71eaf0..b090a2f340ea7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -74,6 +74,7 @@ import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.rpc.thrift.TAINodeRemoveReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterOrDropTableReq; import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; @@ -243,6 +244,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; @@ -3154,6 +3156,35 @@ public SettableFuture removeConfigNode( return future; } + @Override + public SettableFuture removeAINode( + RemoveAINodeStatement removeAINodeStatement) { + final SettableFuture future = SettableFuture.create(); + LOGGER.info("Starting to remove AINode"); + try (ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TShowClusterResp showClusterResp = configNodeClient.showCluster(); + if (showClusterResp.getAiNodeListSize() < 1) { + LOGGER.error("Remove AINode failed because there is no AINode in the cluster."); + future.setException( + new IOException("Remove AINode failed because there is no AINode in the cluster.")); + return future; + } + TSStatus status = configNodeClient.removeAINode(new TAINodeRemoveReq()); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + future.setException(new IOException("Remove AINode failed: " + status.getMessage())); + return future; + } else { + LOGGER.info("AINode in the cluster is removed."); + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } + } catch (Exception e) { + future.setException(e); + return future; + } + return future; + } + @Override public SettableFuture reconstructRegion( ReconstructRegionTask reconstructRegionTask) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index fe1f36b1b5d4c..8ad06a41c51e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -55,6 +55,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; @@ -270,6 +271,8 @@ SettableFuture countTimeSlotList( SettableFuture removeConfigNode( RemoveConfigNodeStatement removeConfigNodeStatement); + SettableFuture removeAINode(RemoveAINodeStatement removeAINodeStatement); + SettableFuture createContinuousQuery( CreateContinuousQueryStatement createContinuousQueryStatement, MPPQueryContext context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/RemoveAINodeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/RemoveAINodeTask.java new file mode 100644 index 0000000000000..2927aa07211fc --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/RemoveAINodeTask.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; + +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement; + +import com.google.common.util.concurrent.ListenableFuture; + +public class RemoveAINodeTask implements IConfigTask { + + protected final RemoveAINodeStatement statement; + + public RemoveAINodeTask(RemoveAINodeStatement statement) { + this.statement = statement; + } + + @Override + public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) { + // If the action is executed successfully, return the Future. + // If your operation is async, you can return the corresponding future directly. + return configTaskExecutor.removeAINode(statement); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index b30dd2971535c..706d14f052cd2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -164,6 +164,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; @@ -4293,6 +4294,11 @@ public Statement visitRemoveDataNode(IoTDBSqlParser.RemoveDataNodeContext ctx) { return new RemoveDataNodeStatement(nodeIds); } + @Override + public Statement visitRemoveAINode(IoTDBSqlParser.RemoveAINodeContext ctx) { + return new RemoveAINodeStatement(); + } + @Override public Statement visitRemoveConfigNode(IoTDBSqlParser.RemoveConfigNodeContext ctx) { Integer nodeId = Integer.parseInt(ctx.INTEGER_LITERAL().getText()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index a7a4fdbc305b8..3259e6fbc025b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -397,6 +397,10 @@ protected R visitShowAINodes(ShowAINodes node, C context) { return visitStatement(node, context); } + protected R visitRemoveAINode(RemoveAINode node, C context) { + return visitStatement(node, context); + } + protected R visitClearCache(ClearCache node, C context) { return visitStatement(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveAINode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveAINode.java new file mode 100644 index 0000000000000..1765c2badae1f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RemoveAINode.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; + +public class RemoveAINode extends Statement { + + private final int nodeId; + + public RemoveAINode() { + super(null); + this.nodeId = -1; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitRemoveAINode(this, context); + } + + @Override + public List getChildren() { + return ImmutableList.of(); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + RemoveAINode that = (RemoveAINode) o; + return nodeId == that.nodeId; + } + + @Override + public int hashCode() { + return Objects.hashCode(nodeId); + } + + @Override + public String toString() { + return "RemoveAINode{" + "nodeId=" + nodeId + '}'; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 76c22001c40b4..df0d86bf69cff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -153,6 +153,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Relation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveAINode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveConfigNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveRegion; @@ -1446,6 +1447,11 @@ public Node visitRemoveConfigNodeStatement( return new RemoveConfigNode(nodeId); } + @Override + public Node visitRemoveAINodeStatement(RelationalSqlParser.RemoveAINodeStatementContext ctx) { + return new RemoveAINode(); + } + @Override public Node visitFlushStatement(final RelationalSqlParser.FlushStatementContext ctx) { final FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index d775af001333e..588c4dd989279 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -55,6 +55,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetRegionIdStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetSeriesSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.GetTimeSlotListStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveAINodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveConfigNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.SetTTLStatement; @@ -612,6 +613,10 @@ public R visitRemoveConfigNode(RemoveConfigNodeStatement removeConfigNodeStateme return visitStatement(removeConfigNodeStatement, context); } + public R visitRemoveAINode(RemoveAINodeStatement removeAINodeStatement, C context) { + return visitStatement(removeAINodeStatement, context); + } + public R visitDeactivateTemplate( DeactivateTemplateStatement deactivateTemplateStatement, C context) { return visitStatement(deactivateTemplateStatement, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/RemoveAINodeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/RemoveAINodeStatement.java new file mode 100644 index 0000000000000..5b4b9f9ea46b0 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/RemoveAINodeStatement.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.metadata; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; + +import java.util.Collections; +import java.util.List; + +public class RemoveAINodeStatement extends Statement implements IConfigStatement { + + private final Integer nodeId; + + public RemoveAINodeStatement() { + super(); + this.nodeId = -1; + } + + public RemoveAINodeStatement(Integer nodeId) { + super(); + this.nodeId = nodeId; + } + + public Integer getNodeId() { + return nodeId; + } + + @Override + public TSStatus checkPermissionBeforeProcess(String userName) { + return AuthorityChecker.checkSuperUserOrMaintain(userName); + } + + @Override + public R accept(StatementVisitor visitor, C context) { + return visitor.visitRemoveAINode(this, context); + } + + @Override + public QueryType getQueryType() { + return QueryType.WRITE; + } + + @Override + public List getPaths() { + return Collections.emptyList(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java index e52310d1505a8..a7bcd82397c1b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java @@ -116,6 +116,22 @@ public TTransport getTransport() { return transport; } + public TSStatus stopAINode() throws TException { + try { + TSStatus status = client.stopAINode(); + if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new TException(status.message); + } + return status; + } catch (TException e) { + logger.warn( + "Failed to connect to AINode from ConfigNode when executing {}: {}", + Thread.currentThread().getStackTrace()[1].getMethodName(), + e.getMessage()); + throw new TException(MSG_CONNECTION_FAIL); + } + } + public ModelInformation registerModel(String modelName, String uri) throws LoadModelException { try { TRegisterModelReq req = new TRegisterModelReq(uri, modelName); diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index de9d62dcd2160..1cb017af4ac01 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -126,6 +126,7 @@ statement | removeRegionStatement | removeDataNodeStatement | removeConfigNodeStatement + | removeAINodeStatement // Admin Statement | showVariablesStatement @@ -591,6 +592,10 @@ removeConfigNodeStatement : REMOVE CONFIGNODE configNodeId=INTEGER_VALUE ; +removeAINodeStatement + : REMOVE AINODE (aiNodeId=INTEGER_VALUE)? + ; + // ------------------------------------------- Admin Statement --------------------------------------------------------- showVariablesStatement : SHOW VARIABLES @@ -1372,6 +1377,7 @@ ABSENT: 'ABSENT'; ADD: 'ADD'; ADMIN: 'ADMIN'; AFTER: 'AFTER'; +AINODE: 'AINODE'; AINODES: 'AINODES'; ALL: 'ALL'; ALTER: 'ALTER'; diff --git a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift index a4ccef7e75263..148b2d8d49bfc 100644 --- a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift +++ b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift @@ -112,6 +112,8 @@ struct TShowModelsResp { service IAINodeRPCService { // -------------- For Config Node -------------- + common.TSStatus stopAINode() + TShowModelsResp showModels(TShowModelsReq req) common.TSStatus deleteModel(TDeleteModelReq req) diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index bf91b410459c4..f65c71c3bceff 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -1172,7 +1172,7 @@ struct TAINodeRestartResp{ } struct TAINodeRemoveReq{ - 1: required common.TAINodeLocation aiNodeLocation + 1: optional common.TAINodeLocation aiNodeLocation } // ====================================================