|
| 1 | +""" |
| 2 | +OpenAI Chat Completions API endpoint handler for the mock server. |
| 3 | +
|
| 4 | +Provides a complete implementation of the /v1/chat/completions endpoint that simulates |
| 5 | +realistic LLM behavior with configurable timing characteristics. Supports both streaming |
| 6 | +and non-streaming responses with proper token counting, latency simulation including |
| 7 | +TTFT (Time To First Token) and ITL (Inter-Token Latency), and OpenAI-compatible error |
| 8 | +handling for comprehensive benchmarking scenarios. |
| 9 | +""" |
| 10 | + |
| 11 | +from __future__ import annotations |
| 12 | + |
| 13 | +import asyncio |
| 14 | +import json |
| 15 | +import math |
| 16 | +import time |
| 17 | +import uuid |
| 18 | + |
| 19 | +from pydantic import ValidationError |
| 20 | +from sanic import response |
| 21 | +from sanic.request import Request |
| 22 | +from sanic.response import HTTPResponse, ResponseStream |
| 23 | +from transformers import PreTrainedTokenizer |
| 24 | + |
| 25 | +from guidellm.mock_server.config import MockServerConfig |
| 26 | +from guidellm.mock_server.models import ( |
| 27 | + ChatCompletionChoice, |
| 28 | + ChatCompletionsRequest, |
| 29 | + ChatCompletionsResponse, |
| 30 | + ChatMessage, |
| 31 | + ErrorDetail, |
| 32 | + ErrorResponse, |
| 33 | + Usage, |
| 34 | +) |
| 35 | +from guidellm.mock_server.utils import ( |
| 36 | + MockTokenizer, |
| 37 | + create_fake_text, |
| 38 | + create_fake_tokens_str, |
| 39 | + sample_number, |
| 40 | + times_generator, |
| 41 | +) |
| 42 | + |
| 43 | +__all__ = ["ChatCompletionsHandler"] |
| 44 | + |
| 45 | + |
| 46 | +class ChatCompletionsHandler: |
| 47 | + """ |
| 48 | + Handles OpenAI Chat Completions API requests with realistic LLM simulation. |
| 49 | +
|
| 50 | + Implements the /v1/chat/completions endpoint behavior including request validation, |
| 51 | + response generation, and timing simulation. Supports both streaming and |
| 52 | + non-streaming modes with configurable latency characteristics for comprehensive |
| 53 | + benchmarking. Uses either a mock tokenizer or a real tokenizer for accurate token |
| 54 | + counting and realistic text generation. |
| 55 | +
|
| 56 | + Example: |
| 57 | + :: |
| 58 | + config = MockServerConfig(ttft_ms=100, itl_ms=50) |
| 59 | + handler = ChatCompletionsHandler(config) |
| 60 | + response = await handler.handle(request) |
| 61 | + """ |
| 62 | + |
| 63 | + def __init__(self, config: MockServerConfig) -> None: |
| 64 | + """ |
| 65 | + Initialize the Chat Completions handler with server configuration. |
| 66 | +
|
| 67 | + :param config: Mock server configuration containing timing and behavior settings |
| 68 | + """ |
| 69 | + self.config = config |
| 70 | + self.tokenizer = ( |
| 71 | + MockTokenizer() |
| 72 | + if config.processor is None |
| 73 | + else PreTrainedTokenizer.from_pretrained(config.processor) |
| 74 | + ) |
| 75 | + |
| 76 | + async def handle(self, request: Request) -> HTTPResponse: |
| 77 | + """ |
| 78 | + Process incoming chat completion requests with validation and routing. |
| 79 | +
|
| 80 | + Validates the request payload, handles errors gracefully, and routes to |
| 81 | + appropriate streaming or non-streaming response handlers based on the |
| 82 | + request configuration. |
| 83 | +
|
| 84 | + :param request: Sanic HTTP request containing chat completion parameters |
| 85 | + :return: HTTP response with completion data or error information |
| 86 | + :raises ValidationError: When request payload fails validation |
| 87 | + :raises JSONDecodeError: When request contains invalid JSON |
| 88 | + """ |
| 89 | + try: |
| 90 | + # Parse and validate request |
| 91 | + req_data = ChatCompletionsRequest(**request.json) |
| 92 | + except ValidationError as exc: |
| 93 | + return response.json( |
| 94 | + ErrorResponse( |
| 95 | + error=ErrorDetail( |
| 96 | + message=f"Invalid request: {str(exc)}", |
| 97 | + type="invalid_request_error", |
| 98 | + code="invalid_request", |
| 99 | + ) |
| 100 | + ).model_dump(), |
| 101 | + status=400, |
| 102 | + ) |
| 103 | + except (json.JSONDecodeError, TypeError): |
| 104 | + return response.json( |
| 105 | + ErrorResponse( |
| 106 | + error=ErrorDetail( |
| 107 | + message="Invalid JSON in request body", |
| 108 | + type="invalid_request_error", |
| 109 | + code="invalid_json", |
| 110 | + ) |
| 111 | + ).model_dump(), |
| 112 | + status=400, |
| 113 | + ) |
| 114 | + |
| 115 | + # Handle streaming vs non-streaming |
| 116 | + if req_data.stream: |
| 117 | + return await self._handle_stream(req_data) |
| 118 | + else: |
| 119 | + return await self._handle_non_stream(req_data) |
| 120 | + |
| 121 | + async def _handle_non_stream(self, req: ChatCompletionsRequest) -> HTTPResponse: |
| 122 | + """ |
| 123 | + Generate complete non-streaming chat completion response. |
| 124 | +
|
| 125 | + Simulates realistic LLM behavior with TTFT and ITL delays, generates |
| 126 | + appropriate token counts, and returns a complete response with usage |
| 127 | + statistics and generated content. |
| 128 | +
|
| 129 | + :param req: Validated chat completion request parameters |
| 130 | + :return: Complete HTTP response with generated completion data |
| 131 | + """ |
| 132 | + # TTFT delay |
| 133 | + await asyncio.sleep( |
| 134 | + sample_number(self.config.ttft_ms, self.config.ttft_ms_std) / 1000.0 |
| 135 | + ) |
| 136 | + |
| 137 | + # Token counts |
| 138 | + prompt_text = self.tokenizer.apply_chat_template(req.messages) |
| 139 | + prompt_tokens = len(self.tokenizer(prompt_text)) |
| 140 | + max_tokens = req.max_completion_tokens or req.max_tokens or math.inf |
| 141 | + completion_tokens_count = min( |
| 142 | + sample_number(self.config.output_tokens, self.config.output_tokens_std), |
| 143 | + max_tokens, |
| 144 | + ) |
| 145 | + |
| 146 | + # ITL delay |
| 147 | + itl_delay = 0.0 |
| 148 | + delays_iter = iter(times_generator(self.config.itl_ms, self.config.itl_ms_std)) |
| 149 | + for _ in range(int(completion_tokens_count) - 1): |
| 150 | + itl_delay += next(delays_iter) |
| 151 | + await asyncio.sleep(itl_delay / 1000.0) |
| 152 | + |
| 153 | + # Response |
| 154 | + chat_response = ChatCompletionsResponse( |
| 155 | + id=f"chatcmpl-{uuid.uuid4().hex[:29]}", |
| 156 | + model=req.model, |
| 157 | + choices=[ |
| 158 | + ChatCompletionChoice( |
| 159 | + index=0, |
| 160 | + message=ChatMessage( |
| 161 | + role="assistant", |
| 162 | + content=create_fake_text( |
| 163 | + int(completion_tokens_count), self.tokenizer |
| 164 | + ), |
| 165 | + ), |
| 166 | + finish_reason="stop", |
| 167 | + ) |
| 168 | + ], |
| 169 | + usage=Usage( |
| 170 | + prompt_tokens=prompt_tokens, |
| 171 | + completion_tokens=int(completion_tokens_count), |
| 172 | + ), |
| 173 | + system_fingerprint=f"fp_{uuid.uuid4().hex[:10]}", |
| 174 | + ) |
| 175 | + |
| 176 | + return response.json(chat_response.model_dump()) |
| 177 | + |
| 178 | + async def _handle_stream(self, req: ChatCompletionsRequest) -> HTTPResponse: |
| 179 | + """ |
| 180 | + Generate streaming chat completion response with real-time token delivery. |
| 181 | +
|
| 182 | + Creates a streaming response that delivers tokens incrementally with |
| 183 | + realistic timing delays. Supports optional usage statistics in the final |
| 184 | + stream chunk when requested via stream_options. |
| 185 | +
|
| 186 | + :param req: Validated chat completion request with streaming enabled |
| 187 | + :return: Streaming HTTP response delivering tokens with proper timing |
| 188 | + """ |
| 189 | + |
| 190 | + async def generate_stream(stream_response): |
| 191 | + completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" |
| 192 | + |
| 193 | + # TTFT delay |
| 194 | + await asyncio.sleep( |
| 195 | + sample_number(self.config.ttft_ms, self.config.ttft_ms_std) / 1000.0 |
| 196 | + ) |
| 197 | + |
| 198 | + # Token counts |
| 199 | + prompt_text = self.tokenizer.apply_chat_template(req.messages) |
| 200 | + prompt_tokens = len(self.tokenizer(prompt_text)) |
| 201 | + max_tokens = req.max_completion_tokens or req.max_tokens or math.inf |
| 202 | + completion_tokens_count = int( |
| 203 | + min( |
| 204 | + sample_number( |
| 205 | + self.config.output_tokens, self.config.output_tokens_std |
| 206 | + ), |
| 207 | + max_tokens, |
| 208 | + ) |
| 209 | + ) |
| 210 | + |
| 211 | + # Send tokens |
| 212 | + tokens = create_fake_tokens_str(completion_tokens_count, self.tokenizer) |
| 213 | + delays_iter = iter( |
| 214 | + times_generator(self.config.itl_ms, self.config.itl_ms_std) |
| 215 | + ) |
| 216 | + |
| 217 | + for index, token in enumerate(tokens): |
| 218 | + if index > 0: |
| 219 | + itl_delay = next(delays_iter) |
| 220 | + await asyncio.sleep(itl_delay / 1000.0) |
| 221 | + |
| 222 | + chunk_data = { |
| 223 | + "id": completion_id, |
| 224 | + "object": "chat.completion.chunk", |
| 225 | + "created": int(time.time()), |
| 226 | + "model": req.model, |
| 227 | + "choices": [ |
| 228 | + { |
| 229 | + "index": 0, |
| 230 | + "delta": {"content": token}, |
| 231 | + "finish_reason": None, |
| 232 | + } |
| 233 | + ], |
| 234 | + } |
| 235 | + await stream_response.write(f"data: {json.dumps(chunk_data)}\n\n") |
| 236 | + |
| 237 | + # Send final chunk with finish reason |
| 238 | + final_chunk = { |
| 239 | + "id": completion_id, |
| 240 | + "object": "chat.completion.chunk", |
| 241 | + "created": int(time.time()), |
| 242 | + "model": req.model, |
| 243 | + "choices": [ |
| 244 | + { |
| 245 | + "index": 0, |
| 246 | + "delta": {}, |
| 247 | + "finish_reason": "stop", |
| 248 | + } |
| 249 | + ], |
| 250 | + } |
| 251 | + await stream_response.write(f"data: {json.dumps(final_chunk)}\n\n") |
| 252 | + |
| 253 | + # Send usage if requested |
| 254 | + if req.stream_options and req.stream_options.include_usage: |
| 255 | + usage_chunk = { |
| 256 | + "id": completion_id, |
| 257 | + "object": "chat.completion.chunk", |
| 258 | + "created": int(time.time()), |
| 259 | + "model": req.model, |
| 260 | + "choices": [], |
| 261 | + "usage": { |
| 262 | + "prompt_tokens": prompt_tokens, |
| 263 | + "completion_tokens": completion_tokens_count, |
| 264 | + "total_tokens": prompt_tokens + completion_tokens_count, |
| 265 | + }, |
| 266 | + } |
| 267 | + await stream_response.write(f"data: {json.dumps(usage_chunk)}\n\n") |
| 268 | + |
| 269 | + # End stream |
| 270 | + await stream_response.write("data: [DONE]\n\n") |
| 271 | + |
| 272 | + return ResponseStream( # type: ignore[return-value] |
| 273 | + generate_stream, |
| 274 | + content_type="text/event-stream", |
| 275 | + headers={ |
| 276 | + "Cache-Control": "no-cache", |
| 277 | + "Connection": "keep-alive", |
| 278 | + "X-Accel-Buffering": "no", |
| 279 | + }, |
| 280 | + ) |
0 commit comments