Skip to content

Commit 6e63bd1

Browse files
committed
fix drop bug
1 parent 80e164d commit 6e63bd1

File tree

3 files changed

+56
-50
lines changed

3 files changed

+56
-50
lines changed

iotdb-core/ainode/ainode/core/manager/model_manager.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,16 @@ def register_model(self, req: TRegisterModelReq) -> TRegisterModelResp:
5656
)
5757
except InvalidUriError as e:
5858
logger.warning(e)
59-
self.model_storage.delete_model(req.modelId)
6059
return TRegisterModelResp(
6160
get_status(TSStatusCode.INVALID_URI_ERROR, e.message)
6261
)
6362
except BadConfigValueError as e:
6463
logger.warning(e)
65-
self.model_storage.delete_model(req.modelId)
6664
return TRegisterModelResp(
6765
get_status(TSStatusCode.INVALID_INFERENCE_CONFIG, e.message)
6866
)
6967
except YAMLError as e:
7068
logger.warning(e)
71-
self.model_storage.delete_model(req.modelId)
7269
if hasattr(e, "problem_mark"):
7370
mark = e.problem_mark
7471
return TRegisterModelResp(
@@ -86,7 +83,6 @@ def register_model(self, req: TRegisterModelReq) -> TRegisterModelResp:
8683
)
8784
except Exception as e:
8885
logger.warning(e)
89-
self.model_storage.delete_model(req.modelId)
9086
return TRegisterModelResp(get_status(TSStatusCode.AINODE_INTERNAL_ERROR))
9187

9288
def delete_model(self, req: TDeleteModelReq) -> TSStatus:

iotdb-core/ainode/ainode/core/model/model_storage.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -211,22 +211,30 @@ def register_model(self, model_id: str, uri: str):
211211
configs: TConfigs
212212
attributes: str
213213
"""
214-
storage_path = os.path.join(self._model_dir, f"{model_id}")
215-
# create storage dir if not exist
216-
if not os.path.exists(storage_path):
217-
os.makedirs(storage_path)
218-
model_storage_path = os.path.join(storage_path, DEFAULT_MODEL_FILE_NAME)
219-
config_storage_path = os.path.join(storage_path, DEFAULT_CONFIG_FILE_NAME)
220-
configs, attributes = fetch_model_by_uri(
221-
uri, model_storage_path, config_storage_path
222-
)
223-
self._model_info_map[model_id] = ModelInfo(
224-
model_id=model_id,
225-
model_type="",
226-
category=ModelCategory.USER_DEFINED,
227-
state=ModelStates.ACTIVE,
228-
)
229-
return configs, attributes
214+
with self._lock_pool.get_lock(model_id).write_lock():
215+
storage_path = os.path.join(self._model_dir, f"{model_id}")
216+
# create storage dir if not exist
217+
if not os.path.exists(storage_path):
218+
os.makedirs(storage_path)
219+
model_storage_path = os.path.join(storage_path, DEFAULT_MODEL_FILE_NAME)
220+
config_storage_path = os.path.join(storage_path, DEFAULT_CONFIG_FILE_NAME)
221+
self._model_info_map[model_id] = ModelInfo(
222+
model_id=model_id,
223+
model_type="",
224+
category=ModelCategory.USER_DEFINED,
225+
state=ModelStates.LOADING,
226+
)
227+
try:
228+
# TODO: The uri should be fetched asynchronously
229+
configs, attributes = fetch_model_by_uri(
230+
uri, model_storage_path, config_storage_path
231+
)
232+
self._model_info_map[model_id].state = ModelStates.ACTIVE
233+
return configs, attributes
234+
except Exception as e:
235+
logger.error(f"Failed to register model {model_id}: {e}")
236+
self._model_info_map[model_id].state = ModelStates.INACTIVE
237+
raise e
230238

231239
def delete_model(self, model_id: str) -> None:
232240
"""
@@ -235,12 +243,11 @@ def delete_model(self, model_id: str) -> None:
235243
Returns:
236244
None
237245
"""
238-
storage_path = os.path.join(self._model_dir, f"{model_id}")
239246
with self._lock_pool.get_lock(model_id).write_lock():
247+
storage_path = os.path.join(self._model_dir, f"{model_id}")
240248
if os.path.exists(storage_path):
241249
shutil.rmtree(storage_path)
242-
storage_path = os.path.join(self._builtin_model_dir, f"{model_id}")
243-
with self._lock_pool.get_lock(model_id).write_lock():
250+
storage_path = os.path.join(self._builtin_model_dir, f"{model_id}")
244251
if os.path.exists(storage_path):
245252
shutil.rmtree(storage_path)
246253
if model_id in self._model_info_map:

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.confignode.procedure.impl.model;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
2223
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2324
import org.apache.iotdb.commons.client.ainode.AINodeClient;
2425
import org.apache.iotdb.commons.client.ainode.AINodeClientManager;
@@ -101,33 +102,35 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, DropModelState state
101102
private void dropModelOnAINode(ConfigNodeProcedureEnv env) {
102103
LOGGER.info("Start to drop model file [{}] on AI Node", modelName);
103104

104-
List<Integer> nodeIds =
105-
env.getConfigManager().getModelManager().getModelDistributions(modelName);
106-
for (Integer nodeId : nodeIds) {
107-
try (AINodeClient client =
108-
AINodeClientManager.getInstance()
109-
.borrowClient(
110-
env.getConfigManager()
111-
.getNodeManager()
112-
.getRegisteredAINode(nodeId)
113-
.getLocation()
114-
.getInternalEndPoint())) {
115-
TSStatus status = client.deleteModel(modelName);
116-
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
117-
LOGGER.warn(
118-
"Failed to drop model [{}] on AINode [{}], status: {}",
119-
modelName,
120-
nodeId,
121-
status.getMessage());
122-
}
123-
} catch (Exception e) {
124-
LOGGER.warn(
125-
"Failed to drop model [{}] on AINode [{}], status: {}",
126-
modelName,
127-
nodeId,
128-
e.getMessage());
129-
}
130-
}
105+
List<TAINodeConfiguration> aiNodes =
106+
env.getConfigManager().getNodeManager().getRegisteredAINodes();
107+
aiNodes.forEach(
108+
aiNode -> {
109+
int nodeId = aiNode.getLocation().getAiNodeId();
110+
try (AINodeClient client =
111+
AINodeClientManager.getInstance()
112+
.borrowClient(
113+
env.getConfigManager()
114+
.getNodeManager()
115+
.getRegisteredAINode(nodeId)
116+
.getLocation()
117+
.getInternalEndPoint())) {
118+
TSStatus status = client.deleteModel(modelName);
119+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
120+
LOGGER.warn(
121+
"Failed to drop model [{}] on AINode [{}], status: {}",
122+
modelName,
123+
nodeId,
124+
status.getMessage());
125+
}
126+
} catch (Exception e) {
127+
LOGGER.warn(
128+
"Failed to drop model [{}] on AINode [{}], status: {}",
129+
modelName,
130+
nodeId,
131+
e.getMessage());
132+
}
133+
});
131134
}
132135

133136
private void dropModelOnConfigNode(ConfigNodeProcedureEnv env) {

0 commit comments

Comments
 (0)