Skip to content

Commit e9a52ae

Browse files
authored
fix processor bugs (#1048)
1 parent 304e480 commit e9a52ae

File tree

3 files changed

+37
-8
lines changed

3 files changed

+37
-8
lines changed

lazyllm/tools/rag/parsing_service/server.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
import threading
33
import time
44
import traceback
5-
import cloudpickle
65
from datetime import datetime
76
from typing import Any, Callable, Dict, Optional
87

98
from lazyllm import (
109
LOG, ModuleBase, ServerModule, UrlModule, FastapiApp as app,
11-
LazyLLMLaunchersBase as Launcher, once_wrapper
10+
LazyLLMLaunchersBase as Launcher, load_obj, once_wrapper, dump_obj
1211
)
1312
from lazyllm.thirdparty import fastapi
1413

@@ -118,7 +117,7 @@ def register_algorithm(self, name: str, store: _DocumentStore, reader: Directory
118117
'node_groups': node_groups,
119118
'schema_extractor': schema_extractor,
120119
}
121-
info_pickle = cloudpickle.dumps(info_dict)
120+
info_pickle = dump_obj(info_dict)
122121
with self._db_manager.get_session() as session:
123122
AlgoInfo = self._db_manager.get_table_orm_class('lazyllm_algorithm')
124123
existing_algorithm = session.query(AlgoInfo).filter(AlgoInfo.id == name).first()
@@ -239,7 +238,7 @@ def get_algo_group_info(self, algo_id: str) -> None:
239238
if algorithm is None:
240239
raise fastapi.HTTPException(status_code=404, detail=f'Invalid algo_id {algo_id}')
241240
info_pickle_bytes = algorithm.get('info_pickle')
242-
info = cloudpickle.loads(info_pickle_bytes)
241+
info = load_obj(info_pickle_bytes)
243242
store: _DocumentStore = info['store'] # type: ignore
244243
node_groups = info['node_groups']
245244

lazyllm/tools/rag/parsing_service/worker.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22
import time
33
import traceback
44
import threading
5-
import cloudpickle
65

76
from datetime import datetime
8-
from lazyllm import LOG, FastapiApp as app, ModuleBase, ServerModule, once_wrapper
7+
from lazyllm import LOG, FastapiApp as app, ModuleBase, ServerModule, once_wrapper, load_obj
98
from ..utils import BaseResponse, _get_default_db_config
109
from .base import (
1110
FINISHED_TASK_QUEUE_TABLE_INFO, WAITING_TASK_QUEUE_TABLE_INFO,
@@ -86,7 +85,7 @@ def _get_or_create_processor(self, algo_id: str) -> _Processor:
8685
display_name = algorithm.display_name
8786
description = algorithm.description
8887
info_pickle = algorithm.info_pickle
89-
info = cloudpickle.loads(info_pickle)
88+
info = load_obj(info_pickle)
9089
store = info['store']
9190
reader = info['reader']
9291
node_groups = info['node_groups']

lazyllm/tools/rag/store/vector/milvus_store.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@
1717
from ...data_type import DataType
1818
from ...global_metadata import GlobalMetadataDesc
1919

20+
21+
def _is_empty_embedding_value(v) -> bool:
22+
if v is None:
23+
return True
24+
if isinstance(v, (list, tuple)):
25+
return len(v) == 0
26+
if isinstance(v, dict):
27+
return not v
28+
return False
29+
2030
MILVUS_UPSERT_BATCH_SIZE = 500
2131
MILVUS_PAGINATION_OFFSET = 1000
2232
MILVUS_INDEX_MAX_RETRY = 3
@@ -141,10 +151,31 @@ def _client_context(self):
141151
finally:
142152
self._client_pool.release(c)
143153

154+
def _row_has_valid_embedding(self, d: dict) -> bool:
155+
'''True if row has every required embed key with a non-empty value (Milvus requires all columns same length).'''
156+
emb = d.get('embedding')
157+
if not emb or not isinstance(emb, dict):
158+
return False
159+
for k in self._embed_datatypes:
160+
if _is_empty_embedding_value(emb.get(k)):
161+
return False
162+
return True
163+
144164
@override
145-
def upsert(self, collection_name: str, data: List[dict]) -> bool:
165+
def upsert(self, collection_name: str, data: List[dict]) -> bool: # noqa: C901
146166
try:
147167
if not data: return True
168+
# Only upsert rows that have valid embedding for every key. _serialize_data omits missing/empty
169+
# embedding fields, which would make pymilvus build columns with different lengths (e.g. uid 230 vs
170+
# embedding___default__ 229) and raise num_rows mismatch.
171+
valid_data = [d for d in data if self._row_has_valid_embedding(d)]
172+
dropped = len(data) - len(valid_data)
173+
if dropped:
174+
LOG.warning(f'[Milvus Store - upsert] Dropping {dropped} rows with missing/empty embedding for '
175+
f'collection {collection_name}.')
176+
data = valid_data
177+
if not data:
178+
return True
148179
data_embeddings = data[0].get('embedding', {})
149180
if not data_embeddings: return True
150181
with self._client_context() as client:

0 commit comments

Comments
 (0)