Skip to content

Commit 2c0b77f

Browse files
committed
seems finished and pass the IT
1 parent 59eb9bb commit 2c0b77f

File tree

37 files changed

+673
-209
lines changed

37 files changed

+673
-209
lines changed

integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeClusterConfigIT.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.apache.iotdb.it.env.EnvFactory;
2323
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2424
import org.apache.iotdb.itbase.category.AIClusterIT;
25+
import org.apache.iotdb.itbase.env.BaseEnv;
2526

26-
import org.junit.AfterClass;
27-
import org.junit.BeforeClass;
27+
import org.junit.After;
28+
import org.junit.Assert;
29+
import org.junit.Before;
2830
import org.junit.Test;
2931
import org.junit.experimental.categories.Category;
3032
import org.junit.runner.RunWith;
@@ -42,35 +44,70 @@
4244
@Category({AIClusterIT.class})
4345
public class AINodeClusterConfigIT {
4446

45-
@BeforeClass
46-
public static void setUp() throws Exception {
47+
@Before
48+
public void setUp() throws Exception {
4749
// Init 1C1D1A cluster environment
4850
EnvFactory.getEnv().initClusterEnvironment(1, 1);
4951
}
5052

51-
@AfterClass
52-
public static void tearDown() throws Exception {
53+
@After
54+
public void tearDown() throws Exception {
5355
EnvFactory.getEnv().cleanClusterEnvironment();
5456
}
5557

5658
@Test
57-
public void aiNodeRegisterTest() throws SQLException {
58-
String sql = "SHOW AINODES";
59-
String title = "NodeID,Status,InternalAddress,InternalPort";
60-
try (Connection connection = EnvFactory.getEnv().getConnection();
59+
public void aiNodeRegisterAndRemoveTestInTree() throws SQLException {
60+
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
6161
Statement statement = connection.createStatement()) {
62+
aiNodeRegisterAndRemoveTest(statement);
63+
}
64+
}
65+
66+
@Test
67+
public void aiNodeRegisterAndRemoveTestInTable() throws SQLException {
68+
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
69+
Statement statement = connection.createStatement()) {
70+
aiNodeRegisterAndRemoveTest(statement);
71+
}
72+
}
6273

63-
try (ResultSet resultSet = statement.executeQuery(sql)) {
74+
private void aiNodeRegisterAndRemoveTest(Statement statement) throws SQLException {
75+
String show_sql = "SHOW AINODES";
76+
String title = "NodeID,Status,InternalAddress,InternalPort";
77+
try (ResultSet resultSet = statement.executeQuery(show_sql)) {
78+
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
79+
checkHeader(resultSetMetaData, title);
80+
int count = 0;
81+
while (resultSet.next()) {
82+
assertEquals("2", resultSet.getString(1));
83+
assertEquals("Running", resultSet.getString(2));
84+
count++;
85+
}
86+
assertEquals(1, count);
87+
}
88+
String remove_sql = "REMOVE AINODE";
89+
statement.execute(remove_sql);
90+
for (int retry = 0; retry < 500; retry++) {
91+
try (ResultSet resultSet = statement.executeQuery(show_sql)) {
6492
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
6593
checkHeader(resultSetMetaData, title);
6694
int count = 0;
6795
while (resultSet.next()) {
68-
assertEquals("2", resultSet.getString(1));
69-
assertEquals("Running", resultSet.getString(2));
7096
count++;
7197
}
72-
assertEquals(1, count);
98+
if (count == 0) {
99+
return; // Successfully removed the AI node
100+
}
101+
}
102+
try {
103+
Thread.sleep(1000); // Wait before retrying
104+
} catch (InterruptedException e) {
105+
Thread.currentThread().interrupt();
73106
}
74107
}
108+
Assert.fail("The target AINode is not removed successfully after all retries.");
75109
}
110+
111+
// TODO: We might need to add remove unknown test in the future, but current infrastructure is too
112+
// hard to implement it.
76113
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
import os
19+
import signal
20+
import threading
21+
from datetime import datetime
22+
23+
import psutil
24+
25+
from ainode.core.config import AINodeDescriptor
26+
from ainode.core.constant import AINODE_SYSTEM_FILE_NAME
27+
from ainode.core.log import Logger
28+
from ainode.core.rpc.client import ClientManager
29+
from ainode.core.rpc.handler import AINodeRPCServiceHandler
30+
from ainode.core.rpc.service import AINodeRPCService
31+
from ainode.thrift.common.ttypes import (
32+
TAINodeConfiguration,
33+
TAINodeLocation,
34+
TEndPoint,
35+
TNodeResource,
36+
TSStatus,
37+
)
38+
from ainode.thrift.confignode.ttypes import TNodeVersionInfo
39+
40+
logger = Logger()
41+
42+
43+
def _generate_configuration() -> TAINodeConfiguration:
44+
location = TAINodeLocation(
45+
AINodeDescriptor().get_config().get_ainode_id(),
46+
TEndPoint(
47+
AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
48+
AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
49+
),
50+
)
51+
resource = TNodeResource(int(psutil.cpu_count()), int(psutil.virtual_memory()[0]))
52+
53+
return TAINodeConfiguration(location, resource)
54+
55+
56+
def _generate_version_info() -> TNodeVersionInfo:
57+
return TNodeVersionInfo(
58+
AINodeDescriptor().get_config().get_version_info(),
59+
AINodeDescriptor().get_config().get_build_info(),
60+
)
61+
62+
63+
def _check_path_permission():
64+
system_path = AINodeDescriptor().get_config().get_ain_system_dir()
65+
if not os.path.exists(system_path):
66+
try:
67+
os.makedirs(system_path)
68+
os.chmod(system_path, 0o777)
69+
except PermissionError as e:
70+
logger.error(e)
71+
raise e
72+
73+
74+
def _generate_system_properties(ainode_id: int):
75+
return {
76+
"ainode_id": ainode_id,
77+
"cluster_name": AINodeDescriptor().get_config().get_cluster_name(),
78+
"iotdb_version": AINodeDescriptor().get_config().get_version_info(),
79+
"commit_id": AINodeDescriptor().get_config().get_build_info(),
80+
"ain_rpc_address": AINodeDescriptor()
81+
.get_config()
82+
.get_ain_inference_rpc_address(),
83+
"ain_rpc_port": AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
84+
"config_node_list": AINodeDescriptor()
85+
.get_config()
86+
.get_ain_target_config_node_list(),
87+
}
88+
89+
90+
class AINode:
91+
def __init__(self):
92+
self._rpc_service = None
93+
self._rpc_handler = None
94+
self._stop_event = None
95+
96+
def start(self):
97+
_check_path_permission()
98+
system_properties_file = os.path.join(
99+
AINodeDescriptor().get_config().get_ain_system_dir(),
100+
AINODE_SYSTEM_FILE_NAME,
101+
)
102+
if not os.path.exists(system_properties_file):
103+
# If the system.properties file does not exist, the AINode will register to IoTDB cluster.
104+
try:
105+
logger.info("IoTDB-AINode is registering to IoTDB cluster...")
106+
ainode_id = (
107+
ClientManager()
108+
.borrow_config_node_client()
109+
.node_register(
110+
AINodeDescriptor().get_config().get_cluster_name(),
111+
_generate_configuration(),
112+
_generate_version_info(),
113+
)
114+
)
115+
AINodeDescriptor().get_config().set_ainode_id(ainode_id)
116+
system_properties = _generate_system_properties(ainode_id)
117+
with open(system_properties_file, "w") as f:
118+
f.write("#" + str(datetime.now()) + "\n")
119+
for key, value in system_properties.items():
120+
f.write(key + "=" + str(value) + "\n")
121+
except Exception as e:
122+
logger.error(
123+
"IoTDB-AINode failed to register to IoTDB cluster: {}".format(e)
124+
)
125+
raise e
126+
else:
127+
# If the system.properties file does exist, the AINode will just restart.
128+
try:
129+
logger.info("IoTDB-AINode is restarting...")
130+
ClientManager().borrow_config_node_client().node_restart(
131+
AINodeDescriptor().get_config().get_cluster_name(),
132+
_generate_configuration(),
133+
_generate_version_info(),
134+
)
135+
except Exception as e:
136+
logger.error("IoTDB-AINode failed to restart: {}".format(e))
137+
raise e
138+
139+
# Start the RPC service
140+
self._rpc_handler = AINodeRPCServiceHandler(aiNode=self)
141+
self._rpc_service = AINodeRPCService(self._rpc_handler)
142+
self._rpc_service.start()
143+
self._rpc_service.join(1)
144+
if self._rpc_service.exit_code != 0:
145+
logger.info("IoTDB-AINode failed to start, please check previous logs.")
146+
return
147+
148+
logger.info("IoTDB-AINode has successfully started.")
149+
150+
# Register stop hook
151+
self._stop_event = threading.Event()
152+
signal.signal(signal.SIGTERM, self._handle_signal)
153+
154+
def _handle_signal(self, signum, frame):
155+
signal_name = {signal.SIGTERM: "SIGTERM", signal.SIGINT: "SIGINT"}.get(
156+
signum, f"SIGNAL {signum}"
157+
)
158+
159+
logger.info(f"IoTDB-AINode receives {signal_name}, initiating graceful stop...")
160+
self.stop()
161+
162+
def stop(self):
163+
if not self._stop_event.is_set():
164+
self._stop_event.set()
165+
if self._rpc_service:
166+
self._rpc_service.stop()
167+
self._rpc_service.join(1)
168+
logger.info("IoTDB-AINode has successfully stopped.")

iotdb-core/ainode/ainode/core/manager/inference_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
from ainode.core.manager.model_manager import ModelManager
3434
from ainode.core.model.sundial.modeling_sundial import SundialForPrediction
3535
from ainode.core.model.timerxl.modeling_timer import TimerForPrediction
36+
from ainode.core.rpc.status import get_status
3637
from ainode.core.util.serde import convert_to_binary
37-
from ainode.core.util.status import get_status
3838
from ainode.thrift.ainode.ttypes import (
3939
TForecastReq,
4040
TForecastResp,

iotdb-core/ainode/ainode/core/manager/model_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from ainode.core.log import Logger
2929
from ainode.core.model.model_info import BuiltInModelType, ModelInfo, ModelStates
3030
from ainode.core.model.model_storage import ModelStorage
31-
from ainode.core.util.status import get_status
31+
from ainode.core.rpc.status import get_status
3232
from ainode.thrift.ainode.ttypes import (
3333
TDeleteModelReq,
3434
TRegisterModelReq,

iotdb-core/ainode/ainode/core/rpc/__init__.py

Whitespace-only changes.

iotdb-core/ainode/ainode/core/client.py renamed to iotdb-core/ainode/ainode/core/rpc/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
from ainode.core.config import AINodeDescriptor
2525
from ainode.core.constant import TSStatusCode
2626
from ainode.core.log import Logger
27+
from ainode.core.rpc.status import verify_success
2728
from ainode.core.util.decorator import singleton
28-
from ainode.core.util.status import verify_success
2929
from ainode.thrift.common.ttypes import (
3030
TAINodeConfiguration,
3131
TAINodeLocation,

iotdb-core/ainode/ainode/core/handler.py renamed to iotdb-core/ainode/ainode/core/rpc/handler.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
#
18-
18+
from ainode.core.constant import TSStatusCode
1919
from ainode.core.log import Logger
2020
from ainode.core.manager.cluster_manager import ClusterManager
2121
from ainode.core.manager.inference_manager import InferenceManager
2222
from ainode.core.manager.model_manager import ModelManager
23+
from ainode.core.rpc.status import get_status
2324
from ainode.thrift.ainode import IAINodeRPCService
2425
from ainode.thrift.ainode.ttypes import (
2526
TAIHeartbeatReq,
@@ -40,10 +41,15 @@
4041

4142

4243
class AINodeRPCServiceHandler(IAINodeRPCService.Iface):
43-
def __init__(self):
44+
def __init__(self, aiNode):
45+
self._aiNode = aiNode
4446
self._model_manager = ModelManager()
4547
self._inference_manager = InferenceManager(model_manager=self._model_manager)
4648

49+
def stopAINode(self) -> TSStatus:
50+
self._aiNode.stop()
51+
return get_status(TSStatusCode.SUCCESS_STATUS, "AINode stopped successfully.")
52+
4753
def registerModel(self, req: TRegisterModelReq) -> TRegisterModelResp:
4854
return self._model_manager.register_model(req)
4955

0 commit comments

Comments
 (0)