|
5 | 5 | import socket |
6 | 6 | import httpx |
7 | 7 | import base64 |
8 | | -import zmq |
9 | 8 | from typing import Dict, Optional |
10 | 9 | from lightllm.server.pd_io_struct import NodeRole, ObjType |
11 | 10 | from lightllm.server.httpserver.async_queue import AsyncQueue |
@@ -34,8 +33,6 @@ async def pd_handle_loop(manager: HttpServerManager): |
34 | 33 | manager.host_ip = manager.args.host |
35 | 34 |
|
36 | 35 | asyncio.create_task(timer_log(manager)) |
37 | | - if manager.pd_mode.is_NP_or_ND(): |
38 | | - asyncio.create_task(pd_handle_loop_from_d(manager)) |
39 | 36 |
|
40 | 37 | id_to_handle_task: Dict[int, asyncio.Task] = {} |
41 | 38 |
|
@@ -95,8 +92,7 @@ async def _pd_handle_task(manager: HttpServerManager, pd_master_obj: PD_Master_O |
95 | 92 | logger.info(f"Sent registration JSON: {regist_json}") |
96 | 93 |
|
97 | 94 | # 转发任务 |
98 | | - if manager.pd_mode != NodeRole.NP: # nixl prefill don't need up token to master |
99 | | - forwarding_tokens_task = asyncio.create_task(_up_tokens_to_pd_master(forwarding_queue, websocket)) |
| 95 | + forwarding_tokens_task = asyncio.create_task(_up_tokens_to_pd_master(forwarding_queue, websocket)) |
100 | 96 |
|
101 | 97 | # 接收 pd master 发来的请求,并推理后,将生成的token转发回pd master。 |
102 | 98 | while True: |
@@ -210,33 +206,3 @@ def _get_load_info() -> dict: |
210 | 206 | "client_ip_port": f"{g_objs.httpserver_manager.host_ip}:{g_objs.args.port}", |
211 | 207 | } |
212 | 208 | return load_info |
213 | | - |
214 | | - |
215 | | -async def pd_handle_loop_from_d(manager: HttpServerManager): |
216 | | - if manager.pd_mode != NodeRole.NP: |
217 | | - return |
218 | | - |
219 | | - context = zmq.asyncio.Context(2) |
220 | | - manager.recv_from_d = context.socket(zmq.PULL) |
221 | | - manager.recv_from_d.bind(f"tcp://*:{manager.args.pd_nixl_remote_prefill_http_port}") |
222 | | - |
223 | | - while True: |
224 | | - try: |
225 | | - ( |
226 | | - prompt, |
227 | | - sampling_params, |
228 | | - multimodal_params, |
229 | | - ) = await manager.recv_from_d.recv_pyobj() |
230 | | - |
231 | | - # 触发推理的task |
232 | | - async def pd_process_generate(manager: "HttpServerManager", prompt, sampling_params, multimodal_params): |
233 | | - try: |
234 | | - async for _, _, _, _ in manager.generate(prompt, sampling_params, multimodal_params, None): |
235 | | - pass |
236 | | - except BaseException as e: |
237 | | - logger.error(str(e)) |
238 | | - |
239 | | - asyncio.create_task(pd_process_generate(manager, prompt, sampling_params, multimodal_params)) |
240 | | - |
241 | | - except Exception as e: |
242 | | - logger.exception(f"pd loop generate error: {str(e)}") |
0 commit comments