Skip to content

Commit 676215e

Browse files
committed
add wait for embed for llm
1 parent 67a3c38 commit 676215e

File tree

5 files changed

+52
-21
lines changed

5 files changed

+52
-21
lines changed

lightllm/server/embed_cache/impl/memory_cache_with_redis.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ class MemoryCacheWithRedis(InMemoryCache):
1818
def __init__(self, args) -> None:
1919
super().__init__(args)
2020
redis_url = f"redis://{args.config_server_host}:{args.redis_port}/0"
21-
print(redis_url, flush=True)
2221
self.redis_cache = EmbedRefCountRedis(
2322
redis_url=redis_url,
2423
capacity=args.cache_capacity,
@@ -29,24 +28,25 @@ def __init__(self, args) -> None:
2928
# 便于 dynamic prompt cache 的使用。所以要把cache_capacity * 2,保障其保留的图片cache > redis 服务维护的
3029
# 硬盘里的图片image embed 数量。
3130
self.cache_capacity = args.cache_capacity * 2
32-
print(self.redis_cache.stats(), flush=True)
3331

3432
def release(self, ids: list[int]) -> None:
3533
with self.lock:
3634
for id_ in ids:
3735
self._records[id_].ref -= 1
3836
self.redis_cache.decr(id_)
39-
40-
def set_items_data(self, ids: list[int]) -> None:
41-
for id_ in ids:
42-
self._records[id_].data = True
43-
44-
def get_items_data(self, ids: list[int]) -> list[Optional[bool]]:
45-
return [self._records.get(id_).data if id_ in self._records else False for id_ in ids]
37+
print(self.redis_cache.stats(), flush=True)
4638

4739
def set_items_embed(self, ids: list[int]) -> None:
4840
for id in ids:
4941
self.redis_cache.insert(str(id))
5042

5143
def get_items_embed(self, ids: list[int]) -> list[Optional[bool]]:
52-
return [self.redis_cache.query_and_incre(str(id)) for id in ids]
44+
ret = []
45+
for id in ids:
46+
# 避免重复的引用计数增加
47+
if self._records[id].embed:
48+
ret.append(True)
49+
continue
50+
self._records[id].embed = self.redis_cache.query_and_incre(str(id))
51+
ret.append(self._records[id].embed)
52+
return ret

lightllm/server/httpserver/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def __init__(
8787
# 初始化VIT连接管理器
8888
from lightllm.server.visualserver.vit_connect import VITConnectionManager
8989

90-
self.vit_manager = VITConnectionManager(args, context, visual_port)
90+
self.vit_manager = VITConnectionManager(args, context, visual_port, self.cache_client)
9191

9292
self.shm_req_manager = ShmReqManager()
9393

lightllm/server/multimodal_params.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ def free(self):
148148
for audio in self.audios:
149149
audio.free()
150150

151+
def get_all_uuids(self):
152+
return [image.uuid for image in self.images] + [audio.uuid for audio in self.audios]
153+
151154
async def verify_and_preload(self, request: Request):
152155
for image in self.images:
153156
await image.preload(request)

lightllm/server/visualserver/-

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,13 @@
1515
533706:M 28 Aug 2025 13:13:21.724 # Server initialized
1616
533706:M 28 Aug 2025 13:13:21.724 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
1717
533706:M 28 Aug 2025 13:13:21.727 * Ready to accept connections
18+
533706:signal-handler (1756390331) Received SIGINT scheduling shutdown...
19+
533706:M 28 Aug 2025 14:12:11.921 # User requested shutdown...
20+
533706:M 28 Aug 2025 14:12:11.922 # Redis is now ready to exit, bye bye...
21+
546119:C 28 Aug 2025 14:12:19.084 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
22+
546119:C 28 Aug 2025 14:12:19.086 # Redis version=6.0.16, bits=64, commit=00000000, modified=0, pid=546119, just started
23+
546119:C 28 Aug 2025 14:12:19.087 # Configuration loaded
24+
546119:M 28 Aug 2025 14:12:19.089 * Running mode=standalone, port=6379.
25+
546119:M 28 Aug 2025 14:12:19.090 # Server initialized
26+
546119:M 28 Aug 2025 14:12:19.091 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
27+
546119:M 28 Aug 2025 14:12:19.093 * Ready to accept connections

lightllm/server/visualserver/vit_connect.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import httpx
99
import base64
1010
from dataclasses import dataclass
11+
import rpyc
1112

1213
logger = init_logger(__name__)
1314

@@ -24,7 +25,7 @@ def to_log_str(self):
2425
class VITConnectionManager:
2526
"""VIT连接管理器"""
2627

27-
def __init__(self, args, context, local_visual_port: int):
28+
def __init__(self, args, context, local_visual_port: int, cache_client: rpyc.Connection):
2829
self.args = args
2930
self.context = context
3031
self.local_visual_port = local_visual_port
@@ -34,6 +35,7 @@ def __init__(self, args, context, local_visual_port: int):
3435
self.current_vit_index = 0
3536
self.remote_vit = args.enable_remote_vit
3637
self.remote_vit_port = args.remote_vit_port
38+
self.cache_client = cache_client
3739

3840
self._setup_vit_connections()
3941

@@ -159,16 +161,21 @@ async def send_to_vit(self, data, protocol=pickle.HIGHEST_PROTOCOL):
159161
发送数据到VIT实例,支持本地和远程模式
160162
"""
161163
instance = self._get_vit_instance()
164+
# 本地模式下,提前释放图片资源,降低传输开销
165+
if not self.remote_vit:
166+
data.multimodal_params.free()
167+
162168
try:
163169
print(instance, flush=True)
164170
instance.send_pyobj(data, protocol=protocol)
165171
except Exception as e:
166172
logger.error(f"Failed to send to VIT instance: {e}")
167173
raise Exception(f"Failed to send to VIT instance: {e}")
168-
finally:
169-
# 释放图片资源
174+
175+
# 远程模式下,发送完以后,在释放图片资源
176+
await self._wait_visual_embed_ready(data)
177+
if self.remote_vit:
170178
data.multimodal_params.free()
171-
await self._wait_visual_embed_ready()
172179

173180
async def vit_handle_loop(self):
174181
"""
@@ -179,7 +186,6 @@ async def vit_handle_loop(self):
179186
try:
180187
id_to_vit_obj = await self._async_get_vit_objs()
181188
if id_to_vit_obj:
182-
logger.debug(f"Retrieved {len(id_to_vit_obj)} VIT instances")
183189
self._update_vit_connections(id_to_vit_obj)
184190
await asyncio.sleep(30)
185191
except Exception as e:
@@ -205,8 +211,20 @@ async def _async_get_vit_objs(self) -> Optional[Dict[int, VIT_Obj]]:
205211
logger.exception(f"Error getting VIT instances: {e}")
206212
return None
207213

208-
async def _wait_visual_embed_ready(self):
209-
"""
210-
等待VIT实例的embed准备好
211-
"""
212-
await asyncio.sleep(10)
214+
async def _wait_visual_embed_ready(self, data, timeout_seconds: int = 20):
215+
# 本地模式不需要等待
216+
if not self.remote_vit:
217+
return
218+
219+
uuids = data.multimodal_params.get_all_uuids()
220+
221+
async def wait_for_embeds():
222+
while not all(self.cache_client.root.get_items_embed(uuids)):
223+
await asyncio.sleep(0.05)
224+
225+
try:
226+
await asyncio.wait_for(wait_for_embeds(), timeout=timeout_seconds)
227+
except asyncio.TimeoutError:
228+
logger.error(
229+
f"Req {data.group_req_id}: timeout waiting for visual embed ready after {timeout_seconds} seconds"
230+
)

0 commit comments

Comments
 (0)