Skip to content

Commit a3b303a

Browse files
committed
Update main.py
1 parent 06a4af6 commit a3b303a

File tree

1 file changed

+4
-129
lines changed

1 file changed

+4
-129
lines changed

app/main.py

Lines changed: 4 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import time
44
import uuid
55
import logging
6-
import threading
7-
import multiprocessing
86
import asyncio
97
from typing import List, Optional, Dict, Any, Union, AsyncGenerator, Tuple
108
from contextlib import asynccontextmanager
@@ -29,7 +27,6 @@
2927
SILICON_FLOW_BASE_URL = os.getenv(
3028
"SILICON_FLOW_BASE_URL", "https://api.siliconflow.cn/v1"
3129
)
32-
_MOCK_ENV_API_KEY = os.getenv("SILICON_FLOW_API_KEY")
3330

3431

3532
@dataclass
@@ -74,11 +71,8 @@ class ServerState:
7471
_instance = None
7572

7673
def __init__(self):
77-
self.request_queue: Optional[multiprocessing.Queue] = None
78-
self.response_queue: Optional[multiprocessing.Queue] = None
7974
self.active_requests: int = 0
8075
self._lock: Optional[asyncio.Lock] = None
81-
self.is_mock_mode = False
8276

8377
@classmethod
8478
def get_instance(cls):
@@ -93,12 +87,6 @@ def lock(self) -> asyncio.Lock:
9387
self._lock = asyncio.Lock()
9488
return self._lock
9589

96-
def set_queues(self, req_q, res_q):
97-
self.request_queue = req_q
98-
self.response_queue = res_q
99-
self.is_mock_mode = True
100-
logger.warning("!!! Server running in MOCK MODE via Queue Injection !!!")
101-
10290
async def increment_request(self):
10391
async with self.lock:
10492
self.active_requests += 1
@@ -112,34 +100,8 @@ async def get_snapshot(self) -> Dict[str, Any]:
112100
async with self.lock:
113101
return {
114102
"active_requests": self.active_requests,
115-
"is_mock_mode": self.is_mock_mode,
116103
}
117104

118-
async def mock_request_interaction(
119-
self, request_body: Dict[str, Any], timeout: float = 10.0
120-
) -> Dict[str, Any]:
121-
"""
122-
Handles the interaction with the multiprocessing Queue in a non-blocking way.
123-
This fixes the blocking issue by offloading queue.get/put to the executor.
124-
"""
125-
loop = asyncio.get_running_loop()
126-
127-
# 1. Offload the blocking put operation
128-
await loop.run_in_executor(None, self.request_queue.put, request_body)
129-
130-
# 2. Offload the blocking get operation
131-
# We use a lambda to pass the timeout to the queue.get method
132-
def blocking_get():
133-
return self.response_queue.get(timeout=timeout)
134-
135-
response_data = await loop.run_in_executor(None, blocking_get)
136-
137-
return (
138-
json.loads(response_data)
139-
if isinstance(response_data, str)
140-
else response_data
141-
)
142-
143105

144106
SERVER_STATE = ServerState.get_instance()
145107

@@ -974,11 +936,8 @@ async def epoch_clock():
974936
while True:
975937
await asyncio.sleep(5)
976938
state = await SERVER_STATE.get_snapshot()
977-
if state["active_requests"] > 0 or state["is_mock_mode"]:
978-
logger.info(
979-
f"[Monitor] Active: {state['active_requests']} | "
980-
f"Mode: {'MOCK' if state['is_mock_mode'] else 'PRODUCTION'}"
981-
)
939+
if state["active_requests"] > 0:
940+
logger.info(f"[Monitor] Active: {state['active_requests']}")
982941

983942
monitor_task = asyncio.create_task(epoch_clock())
984943
yield
@@ -997,10 +956,7 @@ def _prepare_proxy_and_headers(
997956

998957
x_api_key = request.headers.get("x-api-key")
999958

1000-
if SERVER_STATE.is_mock_mode:
1001-
api_key = _MOCK_ENV_API_KEY or "mock-key"
1002-
1003-
elif x_api_key:
959+
if x_api_key:
1004960
api_key = x_api_key
1005961
logger.debug(
1006962
f"Request {request_id}: Using x-api-key for authorization override."
@@ -1198,7 +1154,7 @@ async def health_check():
11981154
content={
11991155
"status": "healthy",
12001156
"service": "DeepSeek-Proxy",
1201-
"mode": "mock" if SERVER_STATE.is_mock_mode else "production",
1157+
"mode": "production",
12021158
},
12031159
)
12041160

@@ -1229,26 +1185,6 @@ async def generation(
12291185
)
12301186
force_stream = True
12311187

1232-
if SERVER_STATE.is_mock_mode:
1233-
if body:
1234-
try:
1235-
if _MOCK_ENV_API_KEY:
1236-
# Just to ensure init consistency even if not used
1237-
_ = DeepSeekProxy(api_key=_MOCK_ENV_API_KEY)
1238-
except Exception:
1239-
pass
1240-
1241-
try:
1242-
raw_body = await request.json()
1243-
# Use the new async-safe mock interaction
1244-
response_json = await SERVER_STATE.mock_request_interaction(raw_body)
1245-
status_code = response_json.pop("status_code", 200)
1246-
return JSONResponse(content=response_json, status_code=status_code)
1247-
except Exception as e:
1248-
return JSONResponse(
1249-
status_code=500, content={"code": "MockError", "message": str(e)}
1250-
)
1251-
12521188
return await proxy.generate(body, request_id, force_stream=force_stream)
12531189

12541190
@app.post("/siliconflow/models/{model_path:path}")
@@ -1283,32 +1219,6 @@ async def dynamic_path_generation(
12831219

12841220
@app.api_route("/{path_name:path}", methods=["GET", "POST", "DELETE", "PUT"])
12851221
async def catch_all(path_name: str, request: Request):
1286-
if SERVER_STATE.is_mock_mode:
1287-
try:
1288-
body = None
1289-
if request.method in ["POST", "PUT"]:
1290-
try:
1291-
body = await request.json()
1292-
except:
1293-
pass
1294-
req_record = {
1295-
"path": f"/{path_name}",
1296-
"method": request.method,
1297-
"headers": dict(request.headers),
1298-
"body": body,
1299-
}
1300-
# Use the new async-safe mock interaction
1301-
response_json = await SERVER_STATE.mock_request_interaction(
1302-
req_record, timeout=5.0
1303-
)
1304-
status_code = response_json.pop("status_code", 200)
1305-
return JSONResponse(content=response_json, status_code=status_code)
1306-
except Exception as e:
1307-
return JSONResponse(
1308-
status_code=500,
1309-
content={"error": "Mock Catch-All Error", "detail": str(e)},
1310-
)
1311-
13121222
return JSONResponse(
13131223
status_code=404,
13141224
content={"code": "NotFound", "message": "Proxy endpoint not implemented"},
@@ -1317,41 +1227,6 @@ async def catch_all(path_name: str, request: Request):
13171227
return app
13181228

13191229

1320-
def run_server_process(req_q, res_q, host="0.0.0.0", port=8000):
1321-
if req_q and res_q:
1322-
SERVER_STATE.set_queues(req_q, res_q)
1323-
app = create_app()
1324-
uvicorn.run(app, host=host, port=port, log_level="info")
1325-
1326-
1327-
class MockServer:
1328-
def __init__(self) -> None:
1329-
self.requests = multiprocessing.Queue()
1330-
self.responses = multiprocessing.Queue()
1331-
self.proc = None
1332-
1333-
1334-
def create_mock_server(*args, **kwargs):
1335-
mock_server = MockServer()
1336-
proc = multiprocessing.Process(
1337-
target=run_server_process,
1338-
args=(mock_server.requests, mock_server.responses, "0.0.0.0", 8089),
1339-
)
1340-
proc.start()
1341-
mock_server.proc = proc
1342-
time.sleep(1.5)
1343-
logger.info("Mock Server started on port 8089")
1344-
if args and hasattr(args[0], "addfinalizer"):
1345-
1346-
def stop_server():
1347-
if proc.is_alive():
1348-
proc.terminate()
1349-
proc.join()
1350-
1351-
args[0].addfinalizer(stop_server)
1352-
return mock_server
1353-
1354-
13551230
def run_server(host="0.0.0.0", port=8000):
13561231
app = create_app()
13571232
uvicorn.run(app, host=host, port=port)

0 commit comments

Comments
 (0)