Skip to content

Commit e41f365

Browse files
committed
fix
1 parent a01bd90 commit e41f365

File tree

4 files changed

+37
-8
lines changed

4 files changed

+37
-8
lines changed

lightllm/server/router/model_infer/mode_backend/continues_batch/pd_mode/decode_node_impl/decode_trans_obj.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def create(
5151
self.pd_prefill_nccl_port = pd_prefill_nccl_port
5252

5353
self.manager = manager
54-
self.timer_checker = TimeChecker(3)
54+
self.timer_checker = TimeChecker(6)
5555

5656
with self.kv_trans_process.device_lock:
5757
self.kv_trans_process.task_in_queue.put(

lightllm/server/router/model_infer/mode_backend/continues_batch/pd_mode/decode_node_impl/decode_trans_process.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import inspect
55
import torch.multiprocessing as mp
66
from torch.distributed import TCPStore
7+
from datetime import timedelta
78
from typing import List, Dict, Union
89
from lightllm.utils.log_utils import init_logger
910
from lightllm.common.mem_manager import MemoryManager
@@ -52,12 +53,17 @@ def _handle_prefill_join(
5253
node_info: PDTransJoinInfo, task_out_queue: mp.Queue, connect_id_to_comm: Dict[str, PyNcclCommunicator]
5354
):
5455
try:
56+
logger.info(f"connect start {node_info}")
5557
store_client = TCPStore(
56-
host_name=node_info.pd_prefill_nccl_ip, port=node_info.pd_prefill_nccl_port, is_master=False, use_libuv=True
57-
)
58-
group = StatelessP2PProcessGroup.create(
59-
src_id=node_info.prefill_id, dest_id=node_info.decode_id, is_server=False, store=store_client
58+
host_name=node_info.pd_prefill_nccl_ip, port=node_info.pd_prefill_nccl_port, is_master=False, use_libuv=True, timeout=timedelta(seconds=30)
6059
)
60+
src_id = node_info.prefill_id
61+
dest_id = node_info.connect_id
62+
logger.info(f"connect src_id {src_id} dest_id {dest_id}")
63+
group = StatelessP2PProcessGroup.create(src_id=src_id,
64+
dest_id=dest_id,
65+
is_server=False,
66+
store=store_client)
6167
comm = PyNcclCommunicator(group, node_info.decode_device_id)
6268
connect_id_to_comm[node_info.connect_id] = comm
6369
logger.info(f"{node_info} kv trans connected")
@@ -68,6 +74,13 @@ def _handle_prefill_join(
6874

6975

7076
def _init_env(args, device_id: int, task_in_queue: mp.Queue, task_out_queue: mp.Queue, mem_queues: List[mp.Queue]):
77+
import os
78+
79+
# os.environ["NCCL_DEBUG"] = "INFO"
80+
os.environ["NCCL_MAX_NCHANNELS"] = "2"
81+
os.environ["NCCL_NSOCKS_PER_CHANNEL"] = "1"
82+
os.environ["NCCL_SOCKET_NTHREADS"] = "1"
83+
torch.backends.cudnn.enabled = False
7184

7285
dp_size_in_node = max(1, args.dp // args.nnodes)
7386

lightllm/server/router/model_infer/mode_backend/continues_batch/pd_mode/prefill_node_impl/prefill_trans_obj.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def create(
5151
self.prefill_node_id = prefill_node_id
5252
self.device_index = device_index
5353
self.manager = manager
54-
self.timer_checker = TimeChecker(3)
54+
self.timer_checker = TimeChecker(6)
5555

5656
con = rpyc.connect(
5757
host=decode_node_ip, port=decode_node_rpyc_port, config={"allow_pickle": True}, keepalive=True

lightllm/server/router/model_infer/mode_backend/continues_batch/pd_mode/prefill_node_impl/prefill_trans_process.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import inspect
55
import torch.multiprocessing as mp
66
from torch.distributed import TCPStore
7+
from datetime import timedelta
78
from typing import List, Dict, Union
89
from lightllm.utils.log_utils import init_logger
910
from lightllm.common.mem_manager import MemoryManager
@@ -56,7 +57,14 @@ def _handle_decode_join(
5657
store: TCPStore,
5758
):
5859
try:
59-
group = StatelessP2PProcessGroup.create(node_info.prefill_id, node_info.decode_id, True, store)
60+
logger.info(f"connect start {node_info}")
61+
src_id = node_info.prefill_id
62+
dest_id = node_info.connect_id
63+
logger.info(f"connect src_id {src_id} dest_id {dest_id}")
64+
group = StatelessP2PProcessGroup.create(src_id=src_id,
65+
dest_id=dest_id,
66+
is_server=True,
67+
store=store)
6068
comm = PyNcclCommunicator(group, node_info.prefill_device_id)
6169
connect_id_to_comm[node_info.connect_id] = comm
6270
logger.info(f"{node_info} kv trans connected!")
@@ -75,10 +83,18 @@ def _init_env(
7583
task_out_queue: mp.Queue,
7684
mem_queues: List[mp.Queue],
7785
):
86+
import os
87+
88+
# os.environ["NCCL_DEBUG"] = "INFO"
89+
os.environ["NCCL_MAX_NCHANNELS"] = "2"
90+
os.environ["NCCL_NSOCKS_PER_CHANNEL"] = "1"
91+
os.environ["NCCL_SOCKET_NTHREADS"] = "1"
92+
torch.backends.cudnn.enabled = False
93+
7894
try:
7995
torch.cuda.set_device(device_id)
8096
graceful_registry(inspect.currentframe().f_code.co_name)
81-
master_store = TCPStore(host_name=store_ip, port=store_port, is_master=True, use_libuv=True)
97+
master_store = TCPStore(host_name=store_ip, port=store_port, is_master=True, use_libuv=True, timeout=timedelta(seconds=30))
8298
dp_size_in_node = max(1, args.dp // args.nnodes)
8399
task_out_queue.put("proc_start")
84100
mem_managers: List[MemoryManager] = [mem_queue.get(timeout=60) for mem_queue in mem_queues]

0 commit comments

Comments
 (0)