Skip to content

Commit f6385b3

Browse files
committed
sanic server script
Signed-off-by: guangli.bao <[email protected]>
1 parent 59c1be8 commit f6385b3

File tree

1 file changed

+244
-0
lines changed

1 file changed

+244
-0
lines changed

tests/unit/sanic_server.py

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
#!/usr/bin/env python3
2+
"""
3+
sanic_server.py
4+
5+
Simple Sanic-based mock server that implements common OpenAI / vLLM-compatible routes:
6+
- GET / : health
7+
- GET /v1/models : list models
8+
- POST /v1/chat/completions : chat completions (supports streaming via ?stream=true)
9+
- POST /v1/completions : classic completions
10+
- POST /v1/embeddings : fake embeddings
11+
- POST /v1/moderations : fake moderation
12+
13+
Usage:
14+
pip install sanic==25.3.0 or latest
15+
Command:
16+
python sanic_server.py or \
17+
python sanic_server.py --host=0.0.0.0 --port=8000 --workers=1 --debug
18+
"""
19+
20+
import argparse
21+
import asyncio
22+
import json
23+
import random
24+
25+
from sanic import Sanic
26+
from sanic.request import Request
27+
from sanic.response import ResponseStream
28+
from sanic.response import json as sjson
29+
30+
app = Sanic("sanic_server")
31+
32+
33+
# ---------- utils ----------
34+
35+
36+
def fake_tokenize(text: str) -> list[str]:
37+
# crude whitespace tokenizer for token counting
38+
return text.strip().split()
39+
40+
41+
def make_choice_text(prompt: str) -> str:
42+
# Very simple deterministic reply generator
43+
# Echo some truncated summary for testing
44+
tail = prompt.strip()[:120]
45+
return f"Mock reply summarizing: {tail}"
46+
47+
48+
def now_ms() -> int:
49+
return int(asyncio.get_event_loop().time() * 1000)
50+
51+
52+
# ---------- routes ----------
53+
54+
55+
@app.get("/")
56+
async def health(request: Request):
57+
return sjson({"ok": True, "msg": "mock openai/vllm server"})
58+
59+
60+
@app.get("/v1/models")
61+
async def list_models(request: Request):
62+
# minimal model list
63+
models = [
64+
{"id": "mock-qwen-2.5", "object": "model", "owned_by": "mock"},
65+
{"id": "facebook/opt-125m", "object": "model", "owned_by": "mock"},
66+
]
67+
return sjson({"object": "list", "data": models})
68+
69+
70+
@app.post("/v1/chat/completions")
71+
async def chat_completions(request: Request):
72+
"""
73+
Supports:
74+
- JSON body with 'messages' (OpenAI format)
75+
- query param stream=true or JSON {'stream': true}
76+
=> responds with text/event-stream chunks containing 'data: {json}\n\n'
77+
"""
78+
body = request.json or {}
79+
stream_mode = False
80+
if request.args.get("stream", "false").lower() == "true":
81+
stream_mode = True
82+
if isinstance(body.get("stream"), bool):
83+
stream_mode = body.get("stream")
84+
85+
messages = body.get("messages", [])
86+
prompt_text = ""
87+
if isinstance(messages, list) and messages:
88+
# approximate prompt as concatenation of last user message(s)
89+
for m in messages:
90+
role = m.get("role", "")
91+
content = m.get("content", "")
92+
if role == "user":
93+
prompt_text += content + " "
94+
95+
# build a deterministic reply
96+
reply = make_choice_text(prompt_text or "hello")
97+
prompt_tokens = len(fake_tokenize(prompt_text))
98+
completion_tokens = len(fake_tokenize(reply))
99+
100+
# create response object (non-streaming)
101+
def make_response_obj():
102+
return {
103+
"id": f"cmpl-mock-{random.randint(1000, 9999)}",
104+
"object": "chat.completion",
105+
"created": now_ms(),
106+
"model": body.get("model", "mock-qwen-2.5"),
107+
"usage": {
108+
"prompt_tokens": prompt_tokens,
109+
"completion_tokens": completion_tokens,
110+
"total_tokens": prompt_tokens + completion_tokens,
111+
},
112+
"choices": [
113+
{
114+
"index": 0,
115+
"message": {"role": "assistant", "content": reply},
116+
"finish_reason": "stop",
117+
}
118+
],
119+
}
120+
121+
if not stream_mode:
122+
return sjson(make_response_obj())
123+
124+
# streaming mode: SSE-style chunks with 'data: <json>\n\n'
125+
async def streaming_fn(resp):
126+
# send an initial "response.start" like chunk
127+
await resp.write(
128+
f"data: \
129+
{json.dumps({'type': 'response.start', 'created': now_ms()})}\n\n"
130+
)
131+
132+
# simulate token-by-token streaming
133+
tokens = fake_tokenize(reply)
134+
chunk_text = ""
135+
for i, tk in enumerate(tokens):
136+
chunk_text += tk + (" " if i < len(tokens) - 1 else "")
137+
chunk_payload = {
138+
"id": f"cmpl-mock-{random.randint(1000, 9999)}",
139+
"object": "chat.completion.chunk",
140+
"created": now_ms(),
141+
"model": body.get("model", "mock-qwen-2.5"),
142+
"choices": [
143+
{
144+
"delta": {"content": tk + (" " if i < len(tokens) - 1 else "")},
145+
"index": 0,
146+
"finish_reason": None,
147+
}
148+
],
149+
}
150+
# write chunk
151+
await resp.write(f"data: {json.dumps(chunk_payload)}\n\n")
152+
# small jitter between tokens
153+
await asyncio.sleep(0.03)
154+
# final done event
155+
done_payload = {"type": "response.done", "created": now_ms()}
156+
await resp.write(f"data: {json.dumps(done_payload)}\n\n")
157+
158+
headers = {"Content-Type": "text/event-stream", "Cache-Control": "no-cache"}
159+
return ResponseStream(streaming_fn, headers=headers)
160+
161+
162+
@app.post("/v1/completions")
163+
async def completions(request: Request):
164+
body = request.json or {}
165+
prompt = body.get("prompt") or (
166+
body.get("messages")
167+
and " ".join([m.get("content", "") for m in body.get("messages", [])])
168+
)
169+
if not prompt:
170+
prompt = "hello"
171+
# optional max_tokens
172+
max_tokens = int(body.get("max_tokens", 64))
173+
reply = make_choice_text(prompt)
174+
tokenized = fake_tokenize(reply)[:max_tokens]
175+
text_out = " ".join(tokenized)
176+
177+
prompt_tokens = len(fake_tokenize(prompt))
178+
completion_tokens = len(tokenized)
179+
180+
resp = {
181+
"id": f"cmpl-mock-{random.randint(1000, 9999)}",
182+
"object": "text_completion",
183+
"created": now_ms(),
184+
"model": body.get("model", "mock-qwen-2.5"),
185+
"choices": [{"text": text_out, "index": 0, "finish_reason": "stop"}],
186+
"usage": {
187+
"prompt_tokens": prompt_tokens,
188+
"completion_tokens": completion_tokens,
189+
"total_tokens": prompt_tokens + completion_tokens,
190+
},
191+
}
192+
# simulate a small server-side latency
193+
await asyncio.sleep(0.01)
194+
return sjson(resp)
195+
196+
197+
@app.post("/v1/embeddings")
198+
async def embeddings(request: Request):
199+
body = request.json or {}
200+
inputs = body.get("input") or body.get("inputs") or []
201+
if isinstance(inputs, str):
202+
inputs = [inputs]
203+
# produce deterministic embedding length 16
204+
dim = int(request.args.get("dim", body.get("dim", 16)))
205+
out = []
206+
for i, txt in enumerate(inputs):
207+
# make pseudo-random but deterministic numbers based on hash
208+
seed = abs(hash(txt)) % (10**8)
209+
random.seed(seed)
210+
vec = [round((random.random() - 0.5), 6) for _ in range(dim)]
211+
out.append({"object": "embedding", "embedding": vec, "index": i})
212+
return sjson({"data": out, "model": body.get("model", "mock-embed-1")})
213+
214+
215+
@app.post("/v1/moderations")
216+
async def moderations(request: Request):
217+
body = request.json or {}
218+
input_text = body.get("input") or ""
219+
# super naive: classify as 'flagged' if contains "bad"
220+
flagged = "bad" in input_text.lower()
221+
return sjson(
222+
{
223+
"id": "mod-mock-1",
224+
"model": body.get("model", "mock-moderation"),
225+
"results": [{"flagged": flagged}],
226+
}
227+
)
228+
229+
230+
if __name__ == "__main__":
231+
parser = argparse.ArgumentParser(prog="sanic_server")
232+
parser.add_argument("--host", default="127.0.0.1")
233+
parser.add_argument("--port", default=8000, type=int)
234+
parser.add_argument("--debug", action="store_true")
235+
parser.add_argument("--workers", default=1, type=int)
236+
args = parser.parse_args()
237+
238+
app.run(
239+
host=args.host,
240+
port=args.port,
241+
debug=args.debug,
242+
workers=args.workers,
243+
access_log=False,
244+
)

0 commit comments

Comments
 (0)