Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions lightllm/server/httpserver/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def __init__(
self.transfer_lock = asyncio.Lock() # the lock for transfer to next module in multi node mode.
self.disable_abort = args.nnodes > 1 and args.dp == 1 # mulitnode dp=1 mode, disable abort
self.is_multinode_tp = args.dp == 1 and args.nnodes > 1
self.is_multinode_tp_master = args.dp == 1 and args.nnodes > 1 and args.node_rank == 0
self.is_multinode_tp_slave = args.dp == 1 and args.nnodes > 1 and args.node_rank > 0
if self.is_multinode_tp:
if args.node_rank == 0:
self.multinode_req_manager = []
Expand Down Expand Up @@ -192,7 +194,7 @@ def alloc_req_id(self, sampling_params, is_health_req: bool = False):
if is_health_req:
return sampling_params.group_request_id
if self.pd_mode == NodeRole.NORMAL:
if not (self.nnodes > 1 and self.args.dp == 1):
if not self.is_multinode_tp:
group_request_id = self.id_gen.generate_id()
else:
if self.node_rank == 0:
Expand Down Expand Up @@ -222,7 +224,7 @@ async def generate(

try:
original_multimodal_params = None
if self.nnodes > 1 and self.node_rank == 0 and self.args.dp == 1:
if self.is_multinode_tp_master:
original_multimodal_params = copy.deepcopy(multimodal_params)

if self.pd_mode.is_P_or_NORMAL():
Expand Down Expand Up @@ -366,8 +368,10 @@ async def transfer_to_next_module_or_node(
original_multimodal_params: MultimodalParams,
group_req_objs: Optional[GroupReqObjs] = None,
):
# 多节点纯tp 运行模式下,保证请求能保持相同的顺序转发到其他节点和当前节点next module.
if self.nnodes > 1 and self.node_rank == 0 and self.args.dp == 1:
# 多节点纯tp 运行模式下,master 节点需要将请求按照可控的顺序转发给slave节点,
# 同时转发给salve节点的时候,要保证master节点按照转发的顺序转发给next_module
# 所以需要锁的控制。
if self.is_multinode_tp_master:
async with self.transfer_lock:
for sender in self.multinode_req_manager:
sender.send_pyobj(
Expand All @@ -376,8 +380,10 @@ async def transfer_to_next_module_or_node(
)
await self.transfer_to_next_module(group_req_objs)
return

if self.nnodes > 1 and self.node_rank > 0 and self.args.dp == 1:
# 多节点纯tp 的slave节点,需要按照接受到请求的顺序转发,这需要锁和排队机制来保证。
# self.request_order_queue 实现了一种简单的排队取出机制,这样master 和 slave
# 节点的请求到达各自节点的router的顺序才是一致的,才能完成同步同态调度。
if self.is_multinode_tp_slave:
while True:
if self.request_order_queue and self.request_order_queue[0] != group_req_objs.group_req_id:
await asyncio.sleep(0.002)
Expand Down Expand Up @@ -578,8 +584,10 @@ async def handle_loop(self):
if self.pd_mode.is_P_or_D():
self.forwarding_queue = AsyncQueue()
asyncio.create_task(self.pd_handle_loop())

if self.args.node_rank > 0:

# 多节点tp模式下的slave节点,需要开启一个协程task用来接收
# master 转发过来的请求对象。
if self.is_multinode_tp_slave:
asyncio.create_task(self.loop_for_request())

while True:
Expand Down
Loading