Skip to content

Commit 89177d8

Browse files
authored
[Bug fix] Fix zmq core bug (#3357)
* [Bug fix] Fix zmq core bug due to concurrently used by threads * Fix zmq core bug due to concurrently used by threads
1 parent 7573802 commit 89177d8

File tree

3 files changed

+20
-17
lines changed

3 files changed

+20
-17
lines changed

fastdeploy/inter_communicator/zmq_server.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -223,17 +223,15 @@ def recv_control_cmd(self):
223223
Recieve control command from client
224224
"""
225225
self._ensure_socket()
226-
while self.running:
227-
try:
228-
client, _, task_data = self.socket.recv_multipart(flags=zmq.NOBLOCK)
229-
task = msgpack.unpackb(task_data)
230-
task_id_str = task["task_id"]
231-
except zmq.Again:
232-
time.sleep(0.001)
233-
continue
234-
with self.mutex:
235-
self.req_dict[task_id_str] = client
236-
return task
226+
try:
227+
client, _, task_data = self.socket.recv_multipart(flags=zmq.NOBLOCK)
228+
task = msgpack.unpackb(task_data)
229+
task_id_str = task["task_id"]
230+
except zmq.Again:
231+
return None
232+
with self.mutex:
233+
self.req_dict[task_id_str] = client
234+
return task
237235

238236
def response_for_control_cmd(self, task_id, result):
239237
"""
@@ -251,7 +249,7 @@ def response_for_control_cmd(self, task_id, result):
251249

252250
with self.mutex:
253251
self.req_dict.pop(task_id, None)
254-
llm_logger.info(f"response control cmd finished, task_id: {task_id}")
252+
llm_logger.debug(f"response control cmd finished, task_id: {task_id}")
255253

256254
def close(self):
257255
"""

fastdeploy/output/token_processor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,8 @@ def _process_batch_output(self):
525525
for token_id in token_ids:
526526
self.tokens_counter[task_id] += 1
527527
if token_id != RECOVERY_STOP_SIGNAL:
528-
result.outputs.token_ids.append(token_id)
528+
if not (envs.FD_ENABLE_INTERNAL_ADAPTER and token_id in task.eos_token_ids):
529+
result.outputs.token_ids.append(token_id)
529530
task.output_token_ids.append(token_id)
530531
if token_id in task.eos_token_ids or is_prefill or recovery_stop:
531532
result.finished = True

fastdeploy/splitwise/internal_adapter_utils.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def __init__(self, cfg, engine, dp_rank):
3434
self.engine = engine
3535
self.dp_rank = dp_rank
3636
recv_control_cmd_ports = envs.FD_ZMQ_CONTROL_CMD_SERVER_PORTS.split(",")
37+
self.response_lock = threading.Lock() # prevent to call send_multipart in zmq concurrently
3738
self.recv_control_cmd_server = ZmqTcpServer(port=recv_control_cmd_ports[dp_rank], mode=zmq.ROUTER)
3839
self.recv_external_instruct_thread = threading.Thread(
3940
target=self._recv_external_module_control_instruct, daemon=True
@@ -43,7 +44,6 @@ def __init__(self, cfg, engine, dp_rank):
4344
target=self._response_external_module_control_instruct, daemon=True
4445
)
4546
self.response_external_instruct_thread.start()
46-
self.response_lock = threading.Lock() # prevent to call send_multipart in zmq concurrently
4747

4848
def _get_current_server_info(self):
4949
"""
@@ -71,13 +71,17 @@ def _recv_external_module_control_instruct(self):
7171
"""
7272
while True:
7373
try:
74-
task = self.recv_control_cmd_server.recv_control_cmd()
74+
with self.response_lock:
75+
task = self.recv_control_cmd_server.recv_control_cmd()
76+
if task is None:
77+
time.sleep(0.001)
78+
continue
7579
logger.info(f"Recieve control task: {task}")
7680
task_id_str = task["task_id"]
7781
if task["cmd"] == "get_payload":
7882
payload_info = self._get_current_server_info()
7983
result = {"task_id": task_id_str, "result": payload_info}
80-
logger.info(f"Response for task: {task_id_str}")
84+
logger.debug(f"Response for task: {task_id_str}")
8185
with self.response_lock:
8286
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
8387

@@ -87,7 +91,7 @@ def _recv_external_module_control_instruct(self):
8791
extra_register_func=lambda reg: main_process_metrics.register_all(reg, workers=1),
8892
)
8993
result = {"task_id": task_id_str, "result": metrics_text}
90-
logger.info(f"Response for task: {task_id_str}")
94+
logger.debug(f"Response for task: {task_id_str}")
9195
with self.response_lock:
9296
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
9397
elif task["cmd"] == "connect_rdma":

0 commit comments

Comments
 (0)