Skip to content

Commit 630d3ee

Browse files
committed
rename
1 parent 3a89cf0 commit 630d3ee

File tree

2 files changed

+137
-10
lines changed

2 files changed

+137
-10
lines changed

lightllm/server/httpserver/manager.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,18 +85,9 @@ def __init__(
8585
if self.enable_multimodal:
8686
self.cache_client = rpyc.connect("localhost", cache_port, config={"allow_pickle": True})
8787
# 初始化VIT连接管理器
88-
from .vit_loop import VITConnectionManager
88+
from .vit_connect import VITConnectionManager
8989

9090
self.vit_manager = VITConnectionManager(args, context, visual_port)
91-
# self.send_to_visual = context.socket(zmq.PUSH)
92-
# if self.args.run_mode == "llm_only":
93-
# self.send_to_visual.connect(f"{args.zmq_mode}127.0.0.1:{self.args.visual_only_port}")
94-
# else:
95-
# self.send_to_visual.connect(f"{args.zmq_mode}127.0.0.1:{visual_port}")
96-
# self.cache_client = rpyc.connect("localhost", cache_port, config={"allow_pickle": True})
97-
98-
self.token_id_range_start = 100000000
99-
self.token_id_range_end = 2 ** 63 - 1
10091

10192
self.shm_req_manager = ShmReqManager()
10293

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import asyncio
2+
import zmq
3+
import zmq.asyncio
4+
import time
5+
import pickle
6+
from typing import Dict, List, Optional, Any
7+
from lightllm.utils.log_utils import init_logger
8+
import httpx
9+
import base64
10+
from dataclasses import dataclass
11+
12+
logger = init_logger(__name__)
13+
14+
15+
@dataclass
16+
class VIT_Obj:
17+
node_id: int
18+
host_ip_port: str
19+
20+
def to_log_str(self):
21+
return f"VIT host_ip_port: {self.host_ip_port} node_id: {self.node_id}"
22+
23+
24+
class VITConnectionManager:
25+
"""VIT连接管理器"""
26+
27+
def __init__(self, args, context, local_visual_port: int):
28+
self.args = args
29+
self.context = context
30+
self.local_visual_port = local_visual_port
31+
32+
self.send_to_visual = None
33+
self.remote_vit_instances = []
34+
self.current_vit_index = 0
35+
self.remote_vit = args.enable_remote_vit
36+
self.remote_vit_port = args.remote_vit_port
37+
38+
self._setup_vit_connections()
39+
40+
def _setup_vit_connections(self):
41+
"""
42+
设置VIT连接,支持本地和远程VIT实例
43+
支持多种连接模式:
44+
1. 本地VIT实例 (默认)
45+
2. 远程单个VIT实例
46+
3. 远程多个VIT实例 (负载均衡)
47+
"""
48+
if self.remote_vit:
49+
# 远程VIT实例模式
50+
self._setup_remote_vit_connections()
51+
else:
52+
self._setup_local_vit_connection()
53+
54+
def _setup_local_vit_connection(self):
55+
self.send_to_visual = self.context.socket(zmq.PUSH)
56+
self.send_to_visual.connect(f"{self.args.zmq_mode}127.0.0.1:{self.local_visual_port}")
57+
logger.info(f"Connected to local VIT instance at {self.args.zmq_mode}127.0.0.1:{self.local_visual_port}")
58+
59+
def _setup_remote_vit_connections(self):
60+
asyncio.create_task(self.vit_handle_loop())
61+
62+
# wait for remote vit instances
63+
while True:
64+
if len(self.remote_vit_instances) > 0:
65+
break
66+
time.sleep(1)
67+
68+
def _get_vit_instance(self):
69+
"""
70+
获取下一个可用的VIT实例 (轮询负载均衡)
71+
"""
72+
if not self.remote_vit:
73+
return self.send_to_visual
74+
75+
# 简单的轮询负载均衡
76+
index = (self.current_vit_index + 1) % len(self.remote_vit_instances)
77+
self.current_vit_index = index
78+
return self.remote_vit_instances[index]
79+
80+
async def send_to_vit(self, data, protocol=pickle.HIGHEST_PROTOCOL):
81+
"""
82+
发送数据到VIT实例,支持本地和远程模式
83+
"""
84+
instance = self._get_vit_instance()
85+
try:
86+
instance.send_pyobj(data, protocol=protocol)
87+
except Exception as e:
88+
logger.error(f"Failed to send to VIT instance {instance.host_ip_port}: {e}")
89+
raise Exception(f"Failed to send to VIT instance {instance.host_ip_port}: {e}")
90+
91+
async def vit_handle_loop(self):
92+
while True:
93+
try:
94+
id_to_vit_obj = await self._get_vit_objs()
95+
logger.info(f"get vit_objs {id_to_vit_obj}")
96+
for id, remote_instance in self.remote_vit_instances.items():
97+
if id not in id_to_vit_obj:
98+
try:
99+
remote_instance[id].close()
100+
except:
101+
pass
102+
self.remote_vit_instances.pop(id)
103+
logger.info(f"remote vit {id} closed")
104+
105+
for id, vit_obj in id_to_vit_obj.items():
106+
if id not in self.remote_vit_instances:
107+
self.remote_vit_instances[id] = self.context.socket(zmq.PUSH)
108+
self.remote_vit_instances[id].connect(
109+
f"tcp://{vit_obj.host_ip_port}:{self.args.remote_vit_port}"
110+
)
111+
await asyncio.sleep(30)
112+
except Exception as e:
113+
logger.exception(str(e))
114+
await asyncio.sleep(10)
115+
116+
async def _get_vit_objs(self) -> Optional[Dict[int, VIT_Obj]]:
117+
"""
118+
get_vit_objs 主要负责从 config_server 获取所有的vit远程服务。
119+
"""
120+
# 使用 config_server 服务来发现所有的 pd_master 节点。
121+
uri = f"ws://{self.args.config_server_host}:{self.args.config_server_port}/registered_vit"
122+
123+
try:
124+
async with httpx.AsyncClient() as client:
125+
response = await client.get(uri)
126+
if response.status_code == 200:
127+
base64data = response.json()["data"]
128+
id_to_vit_obj = pickle.loads(base64.b64decode(base64data))
129+
return id_to_vit_obj
130+
else:
131+
logger.error(f"get pd_master_objs error {response.status_code}")
132+
return None
133+
except Exception as e:
134+
logger.exception(str(e))
135+
await asyncio.sleep(10)
136+
return None

0 commit comments

Comments
 (0)