Skip to content

Commit e76c85d

Browse files
committed
adapter req dis
1 parent 2b4748d commit e76c85d

File tree

1 file changed

+36
-2
lines changed

1 file changed

+36
-2
lines changed

fastdeploy/splitwise/internal_adapter_utils.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
# **Note**: Just for internal use
2222
import zmq
2323

24-
from fastdeploy.inter_communicator import ZmqTcpServer
24+
from fastdeploy.engine.request import RequestStatus
25+
from fastdeploy.inter_communicator import ZmqIpcClient, ZmqTcpServer
2526
from fastdeploy.metrics.metrics import get_filtered_metrics
2627
from fastdeploy.utils import envs, get_logger
2728

@@ -45,6 +46,25 @@ def __init__(self, cfg, engine, dp_rank):
4546
target=self._response_external_module_control_instruct, daemon=True
4647
)
4748
self.response_external_instruct_thread.start()
49+
self._create_abort_request_client()
50+
51+
def _create_abort_request_client(self):
52+
self.abort_client = ZmqIpcClient(envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT, zmq.PUSH)
53+
self.abort_client.connect()
54+
logger.debug(f"Abort request client connected to port {envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT}")
55+
56+
def _send_abort_request(self, req_id: str) -> bool:
57+
try:
58+
data = {
59+
"request_id": req_id,
60+
"status": RequestStatus.ABORT.value,
61+
}
62+
self.abort_client.send_json(data)
63+
logger.debug(f"Sent abort request for req_id: {req_id}")
64+
return True
65+
except Exception as e:
66+
logger.error(f"Failed to send abort request for req_id {req_id}: {e}")
67+
return False
4868

4969
def _get_current_server_info(self):
5070
"""
@@ -104,7 +124,21 @@ def _recv_external_module_control_instruct(self):
104124
logger.debug(f"Response for task: {task_id_str}: is_health {is_health}")
105125
with self.response_lock:
106126
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
107-
127+
elif task["cmd"] == "interrupt_requests":
128+
req_ids = task.get("req_ids", [])
129+
interrupted_req_ids = []
130+
for req_id in req_ids:
131+
if self._send_abort_request(req_id):
132+
interrupted_req_ids.append(req_id)
133+
result = {
134+
"task_id": task_id_str,
135+
"result": {
136+
"success": len(req_ids) == 0 or len(interrupted_req_ids) == len(req_ids),
137+
"interrupted_req_ids": interrupted_req_ids,
138+
},
139+
}
140+
with self.response_lock:
141+
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
108142
except Exception as e:
109143
logger.error(f"handle_control_cmd got error: {e}, {traceback.format_exc()!s}")
110144

0 commit comments

Comments
 (0)