Skip to content

Commit f489c9f

Browse files
authored
[Feature] support adapter (#4180)
* [Feature] support adapter * fix * fix * fix * fix * fix * fix
1 parent be98f6e commit f489c9f

File tree

8 files changed

+499
-177
lines changed

8 files changed

+499
-177
lines changed

fastdeploy/engine/common_engine.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@
3737
EngineCacheQueue,
3838
EngineWorkerQueue,
3939
IPCSignal,
40-
ZmqClient,
40+
ZmqIpcServer,
41+
ZmqTcpServer,
4142
)
4243
from fastdeploy.metrics.metrics import main_process_metrics
4344
from fastdeploy.metrics.trace_util import start_span, start_span_request
4445
from fastdeploy.model_executor.guided_decoding import schema_checker
4546
from fastdeploy.output.token_processor import TokenProcessor
47+
from fastdeploy.splitwise.internal_adapter_utils import InternalAdapter
4648
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
4749
from fastdeploy.utils import EngineError, envs, llm_logger
4850

@@ -571,10 +573,21 @@ def _fetch_request():
571573
def start_zmq_service(self, api_server_pid=None):
572574
if api_server_pid is None:
573575
return
574-
self.api_server_pid = api_server_pid
575-
self.zmq_server = ZmqClient(name=api_server_pid, mode=zmq.PULL)
576-
self.zmq_server.start_server()
577-
self.zmq_server.create_router()
576+
577+
if envs.FD_ENABLE_INTERNAL_ADAPTER:
578+
self.recv_request_server = ZmqTcpServer(port=envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT, mode=zmq.PULL)
579+
self.send_response_server = ZmqTcpServer(port=envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT, mode=zmq.ROUTER)
580+
self.external_adapter = InternalAdapter(
581+
cfg=self.cfg, engine=self, dp_rank=self.cfg.parallel_config.local_data_parallel_id
582+
)
583+
else:
584+
self.recv_request_server = ZmqIpcServer(name=api_server_pid, mode=zmq.PULL)
585+
self.send_response_server = ZmqIpcServer(name=api_server_pid, mode=zmq.ROUTER)
586+
self.recv_result_handle_thread = threading.Thread(
587+
target=self.send_response_server.recv_result_handle, daemon=True
588+
)
589+
self.recv_result_handle_thread.start()
590+
578591
time.sleep(3)
579592
self.insert_task_to_scheduler_thread = threading.Thread(target=self._insert_zmq_task_to_scheduler, daemon=True)
580593
self.insert_task_to_scheduler_thread.start()
@@ -588,9 +601,9 @@ def _insert_zmq_task_to_scheduler(self):
588601
try:
589602
block = True if len(added_requests) == 0 else False
590603
if not self.cfg.model_config.enable_mm:
591-
err, data = self.zmq_server.receive_json_once(block)
604+
err, data = self.recv_request_server.receive_json_once(block)
592605
else:
593-
err, data = self.zmq_server.receive_pyobj_once(block)
606+
err, data = self.recv_request_server.receive_pyobj_once(block)
594607
if err is not None:
595608
llm_logger.error("Engine stops inserting zmq task into scheduler, err:{err}")
596609
break
@@ -644,7 +657,7 @@ def _insert_zmq_task_to_scheduler(self):
644657
)
645658
# Since the request is not in scheduler
646659
# Send result by zmq directly
647-
self.zmq_server.send_multipart(request_id, [error_result])
660+
self.send_response_server.send_response(request_id, [error_result])
648661
except Exception as e:
649662
llm_logger.error(
650663
f"Error happend while receving new request from zmq, details={e}, "
@@ -662,7 +675,7 @@ def _zmq_send_generated_tokens(self):
662675
time.sleep(0.005)
663676
continue
664677
for request_id, contents in results.items():
665-
self.zmq_server.send_multipart(request_id, contents)
678+
self.send_response_server.send_response(request_id, contents)
666679

667680
except Exception as e:
668681
llm_logger.error(f"Unexcepted error happend: {e}, {traceback.format_exc()!s}")

fastdeploy/entrypoints/engine_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from fastdeploy.entrypoints.openai.utils import DealerConnectionManager
2828
from fastdeploy.envs import FD_SUPPORT_MAX_CONNECTIONS
2929
from fastdeploy.input.preprocess import InputPreprocessor
30-
from fastdeploy.inter_communicator import IPCSignal, ZmqClient
30+
from fastdeploy.inter_communicator import IPCSignal, ZmqIpcClient
3131
from fastdeploy.metrics.work_metrics import work_process_metrics
3232
from fastdeploy.multimodal.registry import MultimodalRegistry
3333
from fastdeploy.platforms import current_platform
@@ -110,7 +110,7 @@ def create_zmq_client(self, model, mode):
110110
"""
111111
Create a ZMQ client.
112112
"""
113-
self.zmq_client = ZmqClient(model, mode)
113+
self.zmq_client = ZmqIpcClient(model, mode)
114114
self.zmq_client.connect()
115115

116116
async def format_and_add_data(self, prompts: dict):

fastdeploy/envs.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@
9595
"FD_FOR_TORCH_MODEL_FORMAT": lambda: bool(int(os.getenv("FD_FOR_TORCH_MODEL_FORMAT", "0"))),
9696
# force disable default chunked prefill
9797
"FD_DISABLE_CHUNKED_PREFILL": lambda: bool(int(os.getenv("FD_DISABLE_CHUNKED_PREFILL", "0"))),
98+
"FD_ENABLE_INTERNAL_ADAPTER": lambda: int(os.getenv("FD_ENABLE_INTERNAL_ADAPTER", "0")),
99+
# LLMEngine recieve requests port, used when FD_ENABLE_INTERNAL_ADAPTER=1
100+
"FD_ZMQ_RECV_REQUEST_SERVER_PORT": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORT", "8200"),
101+
# LLMEngine send response port, used when FD_ENABLE_INTERNAL_ADAPTER=1
102+
"FD_ZMQ_SEND_RESPONSE_SERVER_PORT": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORT", "8201"),
103+
# LLMEngine recieve control command port, used when FD_ENABLE_INTERNAL_ADAPTER=1
104+
"FD_ZMQ_CONTROL_CMD_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_CONTROL_CMD_SERVER_PORTS", "8202"),
98105
}
99106

100107

fastdeploy/inter_communicator/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717
from .engine_cache_queue import EngineCacheQueue
1818
from .engine_worker_queue import EngineWorkerQueue
1919
from .ipc_signal import IPCSignal, shared_memory_exists
20-
from .zmq_client import ZmqClient
20+
from .zmq_client import ZmqIpcClient
21+
from .zmq_server import ZmqIpcServer, ZmqTcpServer
2122

22-
__all__ = ["ZmqClient", "IPCSignal", "EngineWorkerQueue", "EngineCacheQueue", "shared_memory_exists"]
23+
__all__ = [
24+
"ZmqIpcClient",
25+
"IPCSignal",
26+
"EngineWorkerQueue",
27+
"EngineCacheQueue",
28+
"ZmqTcpServer",
29+
"ZmqIpcServer",
30+
"shared_memory_exists",
31+
]

fastdeploy/inter_communicator/zmq_client.py

Lines changed: 33 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -14,209 +14,78 @@
1414
# limitations under the License.
1515
"""
1616

17-
import os
18-
import threading
19-
import time
20-
import traceback
17+
from abc import ABC, abstractmethod
2118

22-
import msgpack
2319
import zmq
2420

25-
from fastdeploy import envs
26-
from fastdeploy.utils import zmq_client_logger
2721

28-
29-
class ZmqClient:
22+
class ZmqClientBase(ABC):
3023
"""
31-
ZmqClient is a class that provides a client-side interface for sending and receiving messages using ZeroMQ.
24+
ZmqClientBase is a base class that provides a client-side interface for sending and receiving messages using ZeroMQ.
3225
"""
3326

34-
def __init__(self, name, mode):
35-
self.context = zmq.Context(4)
36-
self.socket = self.context.socket(mode)
37-
self.file_name = f"/dev/shm/{name}.socket"
38-
self.router_path = f"/dev/shm/router_{name}.ipc"
27+
def __init__(self):
28+
pass
3929

40-
self.ZMQ_SNDHWM = int(envs.FD_ZMQ_SNDHWM)
41-
self.aggregate_send = envs.FD_USE_AGGREGATE_SEND
30+
@abstractmethod
31+
def _create_socket(self):
32+
"""Abstract method to create and return a ZeroMQ socket."""
33+
pass
4234

43-
self.mutex = threading.Lock()
44-
self.req_dict = dict()
45-
self.router = None
46-
self.poller = None
47-
self.running = True
35+
def _ensure_socket(self):
36+
"""Ensure the socket is created before use."""
37+
if self.socket is None:
38+
self.socket = self._create_socket()
4839

40+
@abstractmethod
4941
def connect(self):
5042
"""
5143
Connect to the server using the file name specified in the constructor.
5244
"""
53-
self.socket.connect(f"ipc://{self.file_name}")
54-
55-
def start_server(self):
56-
"""
57-
Start the server using the file name specified in the constructor.
58-
"""
59-
self.socket.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
60-
self.socket.setsockopt(zmq.SNDTIMEO, -1)
61-
self.socket.bind(f"ipc://{self.file_name}")
62-
self.poller = zmq.Poller()
63-
self.poller.register(self.socket, zmq.POLLIN)
64-
65-
def create_router(self):
66-
"""
67-
Create a ROUTER socket and bind it to the specified router path.
68-
"""
69-
self.router = self.context.socket(zmq.ROUTER)
70-
self.router.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
71-
self.router.setsockopt(zmq.ROUTER_MANDATORY, 1)
72-
self.router.setsockopt(zmq.SNDTIMEO, -1)
73-
self.router.bind(f"ipc://{self.router_path}")
74-
zmq_client_logger.info(f"router path: {self.router_path}")
45+
pass
7546

7647
def send_json(self, data):
7748
"""
7849
Send a JSON-serializable object over the socket.
7950
"""
51+
self._ensure_socket()
8052
self.socket.send_json(data)
8153

8254
def recv_json(self):
8355
"""
8456
Receive a JSON-serializable object from the socket.
8557
"""
58+
self._ensure_socket()
8659
return self.socket.recv_json()
8760

8861
def send_pyobj(self, data):
8962
"""
9063
Send a Pickle-serializable object over the socket.
9164
"""
65+
self._ensure_socket()
9266
self.socket.send_pyobj(data)
9367

9468
def recv_pyobj(self):
9569
"""
9670
Receive a Pickle-serializable object from the socket.
9771
"""
72+
self._ensure_socket()
9873
return self.socket.recv_pyobj()
9974

100-
def pack_aggregated_data(self, data):
101-
"""
102-
Aggregate multiple responses into one and send them to the client.
103-
"""
104-
result = data[0]
105-
if len(data) > 1:
106-
for response in data[1:]:
107-
result.add(response)
108-
result = msgpack.packb([result.to_dict()])
109-
return result
110-
111-
def send_multipart(self, req_id, data):
112-
"""
113-
Send a multipart message to the router socket.
114-
"""
115-
if self.router is None:
116-
raise RuntimeError("Router socket not created. Call create_router() first.")
117-
118-
while self.running:
119-
with self.mutex:
120-
if req_id not in self.req_dict:
121-
try:
122-
client, _, request_id = self.router.recv_multipart(flags=zmq.NOBLOCK)
123-
req_id_str = request_id.decode("utf-8")
124-
self.req_dict[req_id_str] = client
125-
except zmq.Again:
126-
time.sleep(0.001)
127-
continue
128-
else:
129-
break
130-
if self.req_dict[req_id] == -1:
131-
if data[-1].finished:
132-
with self.mutex:
133-
self.req_dict.pop(req_id, None)
134-
return
135-
try:
136-
start_send = time.time()
137-
if self.aggregate_send:
138-
result = self.pack_aggregated_data(data)
139-
else:
140-
result = msgpack.packb([response.to_dict() for response in data])
141-
self.router.send_multipart([self.req_dict[req_id], b"", result])
142-
zmq_client_logger.info(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}")
143-
except zmq.ZMQError as e:
144-
zmq_client_logger.error(f"[{req_id}] zmq error: {e}")
145-
self.req_dict[req_id] = -1
146-
except Exception as e:
147-
zmq_client_logger.error(f"Send result to zmq client failed: {e}, {str(traceback.format_exc())}")
148-
149-
if data[-1].finished:
150-
with self.mutex:
151-
self.req_dict.pop(req_id, None)
152-
zmq_client_logger.info(f"send_multipart finished, req_id: {req_id}")
153-
154-
def receive_json_once(self, block=False):
155-
"""
156-
Receive a single message from the socket.
157-
"""
158-
if self.socket is None or self.socket.closed:
159-
return "zmp socket has closed", None
160-
try:
161-
flags = zmq.NOBLOCK if not block else 0
162-
return None, self.socket.recv_json(flags=flags)
163-
except zmq.Again:
164-
return None, None
165-
except Exception as e:
166-
self.close()
167-
zmq_client_logger.warning(f"{e}, {str(traceback.format_exc())}")
168-
return str(e), None
169-
170-
def receive_pyobj_once(self, block=False):
171-
"""
172-
Receive a single message from the socket.
173-
"""
174-
if self.socket is None or self.socket.closed:
175-
return "zmp socket has closed", None
176-
try:
177-
flags = zmq.NOBLOCK if not block else 0
178-
return None, self.socket.recv_pyobj(flags=flags)
179-
except zmq.Again:
180-
return None, None
181-
except Exception as e:
182-
self.close()
183-
zmq_client_logger.warning(f"{e}, {str(traceback.format_exc())}")
184-
return str(e), None
185-
186-
def _clear_ipc(self, name):
187-
"""
188-
Remove the IPC file with the given name.
189-
"""
190-
if os.path.exists(name):
191-
try:
192-
os.remove(name)
193-
except OSError as e:
194-
zmq_client_logger.warning(f"Failed to remove IPC file {name} - {e}")
195-
196-
def close(self):
197-
"""
198-
Close the socket and context, and remove the IPC files.
199-
"""
200-
if not self.running:
201-
return
202-
203-
self.running = False
204-
zmq_client_logger.info("Closing ZMQ connection...")
205-
try:
206-
if hasattr(self, "socket") and not self.socket.closed:
207-
self.socket.close()
20875

209-
if self.router is not None and not self.router.closed:
210-
self.router.close()
211-
212-
if not self.context.closed:
213-
self.context.term()
76+
class ZmqIpcClient(ZmqClientBase):
77+
def __init__(self, name, mode):
78+
self.name = name
79+
self.mode = mode
80+
self.file_name = f"/dev/shm/{name}.socket"
81+
self.context = zmq.Context()
82+
self.socket = self.context.socket(self.mode)
21483

215-
self._clear_ipc(self.file_name)
216-
self._clear_ipc(self.router_path)
217-
except Exception as e:
218-
zmq_client_logger.warning(f"Failed to close ZMQ connection - {e}, {str(traceback.format_exc())}")
219-
return
84+
def _create_socket(self):
85+
"""create and return a ZeroMQ socket."""
86+
self.context = zmq.Context()
87+
return self.context.socket(self.mode)
22088

221-
def __exit__(self, exc_type, exc_val, exc_tb):
222-
self.close()
89+
def connect(self):
90+
self._ensure_socket()
91+
self.socket.connect(f"ipc://{self.file_name}")

0 commit comments

Comments
 (0)