Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.AIClusterIT;
import org.apache.iotdb.itbase.env.BaseEnv;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand All @@ -42,35 +44,70 @@
@Category({AIClusterIT.class})
public class AINodeClusterConfigIT {

@BeforeClass
public static void setUp() throws Exception {
@Before
public void setUp() throws Exception {
// Init 1C1D1A cluster environment
EnvFactory.getEnv().initClusterEnvironment(1, 1);
}

@AfterClass
public static void tearDown() throws Exception {
@After
public void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}

@Test
public void aiNodeRegisterTest() throws SQLException {
String sql = "SHOW AINODES";
String title = "NodeID,Status,InternalAddress,InternalPort";
try (Connection connection = EnvFactory.getEnv().getConnection();
public void aiNodeRegisterAndRemoveTestInTree() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
aiNodeRegisterAndRemoveTest(statement);
}
}

@Test
public void aiNodeRegisterAndRemoveTestInTable() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
aiNodeRegisterAndRemoveTest(statement);
}
}

try (ResultSet resultSet = statement.executeQuery(sql)) {
private void aiNodeRegisterAndRemoveTest(Statement statement) throws SQLException {
String show_sql = "SHOW AINODES";
String title = "NodeID,Status,InternalAddress,InternalPort";
try (ResultSet resultSet = statement.executeQuery(show_sql)) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
checkHeader(resultSetMetaData, title);
int count = 0;
while (resultSet.next()) {
assertEquals("2", resultSet.getString(1));
assertEquals("Running", resultSet.getString(2));
count++;
}
assertEquals(1, count);
}
String remove_sql = "REMOVE AINODE";
statement.execute(remove_sql);
for (int retry = 0; retry < 500; retry++) {
try (ResultSet resultSet = statement.executeQuery(show_sql)) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
checkHeader(resultSetMetaData, title);
int count = 0;
while (resultSet.next()) {
assertEquals("2", resultSet.getString(1));
assertEquals("Running", resultSet.getString(2));
count++;
}
assertEquals(1, count);
if (count == 0) {
return; // Successfully removed the AI node
}
}
try {
Thread.sleep(1000); // Wait before retrying
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
Assert.fail("The target AINode is not removed successfully after all retries.");
}

// TODO: We might need to add remove unknown test in the future, but current infrastructure is too
// hard to implement it.
}
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public void testInformationSchema() throws SQLException {
statement.executeQuery(
"select * from information_schema.keywords where reserved > 0 limit 1"),
"word,reserved,",
Collections.singleton("AINODES,1,"));
Collections.singleton("AINODE,1,"));
}

try (final Connection connection =
Expand Down Expand Up @@ -703,7 +703,7 @@ public void testInformationSchema() throws SQLException {
statement.executeQuery(
"select * from information_schema.keywords where reserved > 0 limit 1"),
"word,reserved,",
Collections.singleton("AINODES,1,"));
Collections.singleton("AINODE,1,"));

TestUtils.assertResultSetEqual(
statement.executeQuery("select distinct(status) from information_schema.nodes"),
Expand Down
167 changes: 167 additions & 0 deletions iotdb-core/ainode/ainode/core/ainode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import signal
import threading
from datetime import datetime

import psutil

from ainode.core.config import AINodeDescriptor
from ainode.core.constant import AINODE_SYSTEM_FILE_NAME
from ainode.core.log import Logger
from ainode.core.rpc.client import ClientManager
from ainode.core.rpc.handler import AINodeRPCServiceHandler
from ainode.core.rpc.service import AINodeRPCService
from ainode.thrift.common.ttypes import (
TAINodeConfiguration,
TAINodeLocation,
TEndPoint,
TNodeResource,
)
from ainode.thrift.confignode.ttypes import TNodeVersionInfo

logger = Logger()


def _generate_configuration() -> TAINodeConfiguration:
location = TAINodeLocation(
AINodeDescriptor().get_config().get_ainode_id(),
TEndPoint(
AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
),
)
resource = TNodeResource(int(psutil.cpu_count()), int(psutil.virtual_memory()[0]))

return TAINodeConfiguration(location, resource)


def _generate_version_info() -> TNodeVersionInfo:
return TNodeVersionInfo(
AINodeDescriptor().get_config().get_version_info(),
AINodeDescriptor().get_config().get_build_info(),
)


def _check_path_permission():
system_path = AINodeDescriptor().get_config().get_ain_system_dir()
if not os.path.exists(system_path):
try:
os.makedirs(system_path)
os.chmod(system_path, 0o777)
except PermissionError as e:
logger.error(e)
raise e


def _generate_system_properties(ainode_id: int):
return {
"ainode_id": ainode_id,
"cluster_name": AINodeDescriptor().get_config().get_cluster_name(),
"iotdb_version": AINodeDescriptor().get_config().get_version_info(),
"commit_id": AINodeDescriptor().get_config().get_build_info(),
"ain_rpc_address": AINodeDescriptor()
.get_config()
.get_ain_inference_rpc_address(),
"ain_rpc_port": AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
"config_node_list": AINodeDescriptor()
.get_config()
.get_ain_target_config_node_list(),
}


class AINode:
def __init__(self):
self._rpc_service = None
self._rpc_handler = None
self._stop_event = None

def start(self):
_check_path_permission()
system_properties_file = os.path.join(
AINodeDescriptor().get_config().get_ain_system_dir(),
AINODE_SYSTEM_FILE_NAME,
)
if not os.path.exists(system_properties_file):
# If the system.properties file does not exist, the AINode will register to IoTDB cluster.
try:
logger.info("IoTDB-AINode is registering to IoTDB cluster...")
ainode_id = (
ClientManager()
.borrow_config_node_client()
.node_register(
AINodeDescriptor().get_config().get_cluster_name(),
_generate_configuration(),
_generate_version_info(),
)
)
AINodeDescriptor().get_config().set_ainode_id(ainode_id)
system_properties = _generate_system_properties(ainode_id)
with open(system_properties_file, "w") as f:
f.write("#" + str(datetime.now()) + "\n")
for key, value in system_properties.items():
f.write(key + "=" + str(value) + "\n")
except Exception as e:
logger.error(
"IoTDB-AINode failed to register to IoTDB cluster: {}".format(e)
)
raise e
else:
# If the system.properties file does exist, the AINode will just restart.
try:
logger.info("IoTDB-AINode is restarting...")
ClientManager().borrow_config_node_client().node_restart(
AINodeDescriptor().get_config().get_cluster_name(),
_generate_configuration(),
_generate_version_info(),
)
except Exception as e:
logger.error("IoTDB-AINode failed to restart: {}".format(e))
raise e

# Start the RPC service
self._rpc_handler = AINodeRPCServiceHandler(aiNode=self)
self._rpc_service = AINodeRPCService(self._rpc_handler)
self._rpc_service.start()
self._rpc_service.join(1)
if self._rpc_service.exit_code != 0:
logger.info("IoTDB-AINode failed to start, please check previous logs.")
return

logger.info("IoTDB-AINode has successfully started.")

# Register stop hook
self._stop_event = threading.Event()
signal.signal(signal.SIGTERM, self._handle_signal)

def _handle_signal(self, signum, frame):
signal_name = {signal.SIGTERM: "SIGTERM", signal.SIGINT: "SIGINT"}.get(
signum, f"SIGNAL {signum}"
)

logger.info(f"IoTDB-AINode receives {signal_name}, initiating graceful stop...")
self.stop()

def stop(self):
if not self._stop_event.is_set():
self._stop_event.set()
if self._rpc_service:
self._rpc_service.stop()
self._rpc_service.join(1)
logger.info("IoTDB-AINode has successfully stopped.")
2 changes: 1 addition & 1 deletion iotdb-core/ainode/ainode/core/manager/inference_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from ainode.core.manager.model_manager import ModelManager
from ainode.core.model.sundial.modeling_sundial import SundialForPrediction
from ainode.core.model.timerxl.modeling_timer import TimerForPrediction
from ainode.core.rpc.status import get_status
from ainode.core.util.serde import convert_to_binary
from ainode.core.util.status import get_status
from ainode.thrift.ainode.ttypes import (
TForecastReq,
TForecastResp,
Expand Down
2 changes: 1 addition & 1 deletion iotdb-core/ainode/ainode/core/manager/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ainode.core.log import Logger
from ainode.core.model.model_info import BuiltInModelType, ModelInfo, ModelStates
from ainode.core.model.model_storage import ModelStorage
from ainode.core.util.status import get_status
from ainode.core.rpc.status import get_status
from ainode.thrift.ainode.ttypes import (
TDeleteModelReq,
TRegisterModelReq,
Expand Down
17 changes: 17 additions & 0 deletions iotdb-core/ainode/ainode/core/rpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from ainode.core.config import AINodeDescriptor
from ainode.core.constant import TSStatusCode
from ainode.core.log import Logger
from ainode.core.rpc.status import verify_success
from ainode.core.util.decorator import singleton
from ainode.core.util.status import verify_success
from ainode.thrift.common.ttypes import (
TAINodeConfiguration,
TAINodeLocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
# specific language governing permissions and limitations
# under the License.
#

from ainode.core.constant import TSStatusCode
from ainode.core.log import Logger
from ainode.core.manager.cluster_manager import ClusterManager
from ainode.core.manager.inference_manager import InferenceManager
from ainode.core.manager.model_manager import ModelManager
from ainode.core.rpc.status import get_status
from ainode.thrift.ainode import IAINodeRPCService
from ainode.thrift.ainode.ttypes import (
TAIHeartbeatReq,
Expand All @@ -40,10 +41,15 @@


class AINodeRPCServiceHandler(IAINodeRPCService.Iface):
def __init__(self):
def __init__(self, aiNode):
self._aiNode = aiNode
self._model_manager = ModelManager()
self._inference_manager = InferenceManager(model_manager=self._model_manager)

def stopAINode(self) -> TSStatus:
self._aiNode.stop()
return get_status(TSStatusCode.SUCCESS_STATUS, "AINode stopped successfully.")

def registerModel(self, req: TRegisterModelReq) -> TRegisterModelResp:
return self._model_manager.register_model(req)

Expand Down
Loading
Loading