Skip to content

Commit 31d4fcb

Browse files
[BugFix] fix too many open files problem (#3256)
* Update cache_messager.py * fix too many open files problem * fix too many open files problem * fix too many open files problem * fix ci bugs * Update api_server.py * add parameter * format * format * format * format * Update parameters.md * Update parameters.md * Update serving_completion.py * Update serving_chat.py * Update envs.py --------- Co-authored-by: Jiang-Jia-Jun <[email protected]>
1 parent 22255a6 commit 31d4fcb

File tree

8 files changed

+183
-25
lines changed

8 files changed

+183
-25
lines changed

docs/parameters.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ When using FastDeploy to deploy models (including offline inference and service
88
|:--------------|:----|:-----------|
99
| ```port``` | `int` | Only required for service deployment, HTTP service port number, default: 8000 |
1010
| ```metrics_port``` | `int` | Only required for service deployment, metrics monitoring port number, default: 8001 |
11+
| ```max_waiting_time``` | `int` | Only required for service deployment, maximum wait time for establishing a connection upon service request. Default: -1 (indicates no wait time limit).|
12+
| ```max_concurrency``` | `int` | Only required for service deployment, the actual number of connections established by the service, default 512 |
1113
| ```engine_worker_queue_port``` | `int` | FastDeploy internal engine communication port, default: 8002 |
1214
| ```cache_queue_port``` | `int` | FastDeploy internal KVCache process communication port, default: 8003 |
1315
| ```max_model_len``` | `int` | Default maximum supported context length for inference, default: 2048 |

docs/zh/parameters.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
|:-----------------------------------|:----------| :----- |
77
| ```port``` | `int` | 仅服务化部署需配置,服务HTTP请求端口号,默认8000 |
88
| ```metrics_port``` | `int` | 仅服务化部署需配置,服务监控Metrics端口号,默认8001 |
9+
| ```max_waiting_time``` | `int` | 仅服务化部署需配置,服务请求建立连接最大等待时间,默认-1 表示无等待时间限制|
10+
| ```max_concurrency``` | `int` | 仅服务化部署需配置,服务实际建立连接数目,默认512 |
911
| ```engine_worker_queue_port``` | `int` | FastDeploy内部引擎进程通信端口, 默认8002 |
1012
| ```cache_queue_port``` | `int` | FastDeploy内部KVCache进程通信端口, 默认8003 |
1113
| ```max_model_len``` | `int` | 推理默认最大支持上下文长度,默认2048 |

fastdeploy/entrypoints/engine_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@
2121

2222
from fastdeploy import envs
2323
from fastdeploy.engine.config import ModelConfig
24+
from fastdeploy.envs import FD_SUPPORT_MAX_CONNECTIONS
2425
from fastdeploy.input.preprocess import InputPreprocessor
2526
from fastdeploy.inter_communicator import IPCSignal, ZmqClient
2627
from fastdeploy.metrics.work_metrics import work_process_metrics
2728
from fastdeploy.multimodal.registry import MultimodalRegistry
2829
from fastdeploy.platforms import current_platform
29-
from fastdeploy.utils import EngineError, api_server_logger
30+
from fastdeploy.utils import EngineError, StatefulSemaphore, api_server_logger
3031

3132

3233
class EngineClient:
@@ -47,6 +48,7 @@ def __init__(
4748
reasoning_parser=None,
4849
data_parallel_size=1,
4950
enable_logprob=False,
51+
workers=1,
5052
):
5153
import fastdeploy.model_executor.models # noqa: F401
5254

@@ -77,7 +79,7 @@ def __init__(
7779
suffix=pid,
7880
create=False,
7981
)
80-
82+
self.semaphore = StatefulSemaphore((FD_SUPPORT_MAX_CONNECTIONS + workers - 1) // workers)
8183
model_weights_status = np.zeros([1], dtype=np.int32)
8284
self.model_weights_status_signal = IPCSignal(
8385
name="model_weights_status",

fastdeploy/entrypoints/openai/api_server.py

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,17 @@
1414
# limitations under the License.
1515
"""
1616

17+
import asyncio
1718
import os
1819
import threading
1920
import time
21+
from collections.abc import AsyncGenerator
2022
from contextlib import asynccontextmanager
2123
from multiprocessing import current_process
2224

2325
import uvicorn
2426
import zmq
25-
from fastapi import FastAPI, Request
27+
from fastapi import FastAPI, HTTPException, Request
2628
from fastapi.responses import JSONResponse, Response, StreamingResponse
2729
from prometheus_client import CONTENT_TYPE_LATEST
2830

@@ -49,6 +51,7 @@
4951
from fastdeploy.plugins.model_register import load_model_register_plugins
5052
from fastdeploy.utils import (
5153
FlexibleArgumentParser,
54+
StatefulSemaphore,
5255
api_server_logger,
5356
console_logger,
5457
is_port_available,
@@ -61,6 +64,13 @@
6164
parser.add_argument("--workers", default=1, type=int, help="number of workers")
6265
parser.add_argument("--metrics-port", default=8001, type=int, help="port for metrics server")
6366
parser.add_argument("--controller-port", default=-1, type=int, help="port for controller server")
67+
parser.add_argument(
68+
"--max-waiting-time",
69+
default=-1,
70+
type=int,
71+
help="max waiting time for connection, if set value -1 means no waiting time limit",
72+
)
73+
parser.add_argument("--max-concurrency", default=512, type=int, help="max concurrency")
6474
parser = EngineArgs.add_cli_args(parser)
6575
args = parser.parse_args()
6676
args.model = retrive_model_from_server(args.model, args.revision)
@@ -92,6 +102,12 @@ def load_engine():
92102
return engine
93103

94104

105+
app = FastAPI()
106+
107+
MAX_CONCURRENT_CONNECTIONS = (args.max_concurrency + args.workers - 1) // args.workers
108+
connection_semaphore = StatefulSemaphore(MAX_CONCURRENT_CONNECTIONS)
109+
110+
95111
@asynccontextmanager
96112
async def lifespan(app: FastAPI):
97113
"""
@@ -117,10 +133,11 @@ async def lifespan(app: FastAPI):
117133
args.reasoning_parser,
118134
args.data_parallel_size,
119135
args.enable_logprob,
136+
args.workers,
120137
)
121138
app.state.dynamic_load_weight = args.dynamic_load_weight
122-
chat_handler = OpenAIServingChat(engine_client, pid, args.ips)
123-
completion_handler = OpenAIServingCompletion(engine_client, pid, args.ips)
139+
chat_handler = OpenAIServingChat(engine_client, pid, args.ips, args.max_waiting_time)
140+
completion_handler = OpenAIServingCompletion(engine_client, pid, args.ips, args.max_waiting_time)
124141
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
125142
engine_client.pid = pid
126143
app.state.engine_client = engine_client
@@ -142,6 +159,21 @@ async def lifespan(app: FastAPI):
142159
instrument(app)
143160

144161

162+
@asynccontextmanager
163+
async def connection_manager():
164+
"""
165+
async context manager for connection manager
166+
"""
167+
try:
168+
await asyncio.wait_for(connection_semaphore.acquire(), timeout=0.001)
169+
yield
170+
except asyncio.TimeoutError:
171+
api_server_logger.info(f"Reach max request release: {connection_semaphore.status()}")
172+
if connection_semaphore.locked():
173+
connection_semaphore.release()
174+
raise HTTPException(status_code=429, detail="Too many requests")
175+
176+
145177
# TODO 传递真实引擎值 通过pid 获取状态
146178
@app.get("/health")
147179
def health(request: Request) -> Response:
@@ -195,6 +227,22 @@ def ping(raw_request: Request) -> Response:
195227
return health(raw_request)
196228

197229

230+
def wrap_streaming_generator(original_generator: AsyncGenerator):
231+
"""
232+
Wrap an async generator to release the connection semaphore when the generator is finished.
233+
"""
234+
235+
async def wrapped_generator():
236+
try:
237+
async for chunk in original_generator:
238+
yield chunk
239+
finally:
240+
api_server_logger.debug(f"release: {connection_semaphore.status()}")
241+
connection_semaphore.release()
242+
243+
return wrapped_generator
244+
245+
198246
@app.post("/v1/chat/completions")
199247
async def create_chat_completion(request: ChatCompletionRequest):
200248
"""
@@ -204,16 +252,23 @@ async def create_chat_completion(request: ChatCompletionRequest):
204252
status, msg = app.state.engine_client.is_workers_alive()
205253
if not status:
206254
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
207-
inject_to_metadata(request)
208-
generator = await app.state.chat_handler.create_chat_completion(request)
209-
210-
if isinstance(generator, ErrorResponse):
211-
return JSONResponse(content=generator.model_dump(), status_code=generator.code)
212-
213-
elif isinstance(generator, ChatCompletionResponse):
214-
return JSONResponse(content=generator.model_dump())
215-
216-
return StreamingResponse(content=generator, media_type="text/event-stream")
255+
try:
256+
async with connection_manager():
257+
inject_to_metadata(request)
258+
generator = await app.state.chat_handler.create_chat_completion(request)
259+
if isinstance(generator, ErrorResponse):
260+
connection_semaphore.release()
261+
return JSONResponse(content={"detail": generator.model_dump()}, status_code=generator.code)
262+
elif isinstance(generator, ChatCompletionResponse):
263+
connection_semaphore.release()
264+
return JSONResponse(content=generator.model_dump())
265+
else:
266+
wrapped_generator = wrap_streaming_generator(generator)
267+
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
268+
269+
except HTTPException as e:
270+
api_server_logger.error(f"Error in chat completion: {str(e)}")
271+
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
217272

218273

219274
@app.post("/v1/completions")
@@ -225,14 +280,20 @@ async def create_completion(request: CompletionRequest):
225280
status, msg = app.state.engine_client.is_workers_alive()
226281
if not status:
227282
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
228-
229-
generator = await app.state.completion_handler.create_completion(request)
230-
if isinstance(generator, ErrorResponse):
231-
return JSONResponse(content=generator.model_dump(), status_code=generator.code)
232-
elif isinstance(generator, CompletionResponse):
233-
return JSONResponse(content=generator.model_dump())
234-
235-
return StreamingResponse(content=generator, media_type="text/event-stream")
283+
try:
284+
async with connection_manager():
285+
generator = await app.state.completion_handler.create_completion(request)
286+
if isinstance(generator, ErrorResponse):
287+
connection_semaphore.release()
288+
return JSONResponse(content=generator.model_dump(), status_code=generator.code)
289+
elif isinstance(generator, CompletionResponse):
290+
connection_semaphore.release()
291+
return JSONResponse(content=generator.model_dump())
292+
else:
293+
wrapped_generator = wrap_streaming_generator(generator)
294+
return StreamingResponse(content=wrapped_generator(), media_type="text/event-stream")
295+
except HTTPException as e:
296+
return JSONResponse(status_code=e.status_code, content={"detail": e.detail})
236297

237298

238299
@app.get("/update_model_weight")

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@ class OpenAIServingChat:
4949
OpenAI-style chat completions serving
5050
"""
5151

52-
def __init__(self, engine_client, pid, ips):
52+
def __init__(self, engine_client, pid, ips, max_waiting_time):
5353
self.engine_client = engine_client
5454
self.pid = pid
5555
self.master_ip = ips
56+
self.max_waiting_time = max_waiting_time
5657
self.host_ip = get_host_ip()
5758
if self.master_ip is not None:
5859
if isinstance(self.master_ip, list):
@@ -93,6 +94,14 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
9394
return ErrorResponse(code=400, message=str(e))
9495

9596
del current_req_dict
97+
try:
98+
api_server_logger.debug(f"{self.engine_client.semaphore.status()}")
99+
if self.max_waiting_time < 0:
100+
await self.engine_client.semaphore.acquire()
101+
else:
102+
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
103+
except Exception:
104+
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
96105

97106
if request.stream:
98107
return self.chat_completion_stream_generator(request, request_id, request.model, prompt_token_ids)
@@ -310,6 +319,8 @@ async def chat_completion_stream_generator(
310319
yield f"data: {error_data}\n\n"
311320
finally:
312321
dealer.close()
322+
self.engine_client.semaphore.release()
323+
api_server_logger.info(f"release {self.engine_client.semaphore.status()}")
313324
yield "data: [DONE]\n\n"
314325

315326
async def chat_completion_full_generator(
@@ -383,6 +394,7 @@ async def chat_completion_full_generator(
383394
if task_is_finished:
384395
break
385396
finally:
397+
self.engine_client.semaphore.release()
386398
dealer.close()
387399

388400
choices = []

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@
4040

4141

4242
class OpenAIServingCompletion:
43-
def __init__(self, engine_client, pid, ips):
43+
def __init__(self, engine_client, pid, ips, max_waiting_time):
4444
self.engine_client = engine_client
4545
self.pid = pid
4646
self.master_ip = ips
4747
self.host_ip = get_host_ip()
48+
self.max_waiting_time = max_waiting_time
4849
if self.master_ip is not None:
4950
if isinstance(self.master_ip, list):
5051
self.master_ip = self.master_ip[0]
@@ -114,6 +115,14 @@ async def create_completion(self, request: CompletionRequest):
114115

115116
del current_req_dict
116117

118+
try:
119+
if self.max_waiting_time < 0:
120+
await self.engine_client.semaphore.acquire()
121+
else:
122+
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
123+
except Exception:
124+
return ErrorResponse(code=408, message=f"Request queued time exceed {self.max_waiting_time}")
125+
117126
if request.stream:
118127
return self.completion_stream_generator(
119128
request=request,
@@ -221,6 +230,7 @@ async def completion_full_generator(
221230
api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True)
222231
raise
223232
finally:
233+
self.engine_client.semaphore.release()
224234
if dealer is not None:
225235
dealer.close()
226236

@@ -371,6 +381,7 @@ async def completion_stream_generator(
371381
yield f"data: {ErrorResponse(message=str(e), code=400).model_dump_json(exclude_unset=True)}\n\n"
372382
finally:
373383
del request
384+
self.engine_client.semaphore.release()
374385
if dealer is not None:
375386
dealer.close()
376387
yield "data: [DONE]\n\n"

fastdeploy/envs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@
8484
"FD_PLUGINS": lambda: None if "FD_PLUGINS" not in os.environ else os.environ["FD_PLUGINS"].split(","),
8585
# set trace attribute job_id.
8686
"FD_JOB_ID": lambda: os.getenv("FD_JOB_ID"),
87+
# support max connections
88+
"FD_SUPPORT_MAX_CONNECTIONS": lambda: 768,
8789
}
8890

8991

0 commit comments

Comments
 (0)