Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/guidellm/mock_server/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
GuideLLM Mock Server for OpenAI and vLLM API compatibility.
"""

from .config import MockServerConfig
from .server import MockServer

__all__ = ["MockServer", "MockServerConfig"]
84 changes: 84 additions & 0 deletions src/guidellm/mock_server/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""
Configuration settings for the mock server component.

Provides centralized configuration management for mock server behavior including
network binding, model identification, response timing characteristics, and token
generation parameters. Supports environment variable configuration for deployment
flexibility with automatic validation through Pydantic settings.
"""

from __future__ import annotations

from pydantic import Field
from pydantic_settings import BaseSettings

__all__ = ["MockServerConfig"]


class MockServerConfig(BaseSettings):
"""
Configuration settings for mock server behavior and deployment.

Centralizes all configurable parameters for mock server operation including
network settings, model identification, response timing characteristics, and
token generation behavior. Environment variables with GUIDELLM_MOCK_SERVER_
prefix override default values for deployment flexibility.

Example:
::
config = MockServerConfig(host="0.0.0.0", port=8080, model="custom-model")
# Use with environment variables:
# GUIDELLM_MOCK_SERVER_HOST=127.0.0.1 GUIDELLM_MOCK_SERVER_PORT=9000
"""

host: str = Field(
default="127.0.0.1", description="Host address to bind the server to"
)
port: int = Field(default=8000, description="Port number to bind the server to")
workers: int = Field(default=1, description="Number of worker processes to spawn")
model: str = Field(
default="llama-3.1-8b-instruct",
description="Model name to present in API responses",
)
processor: str | None = Field(
default=None,
description=(
"Processor type to use for token stats, tokenize, and detokenize. "
"If None, a mock one is created."
),
)
request_latency: float = Field(
default=3.0,
description="Base request latency in seconds for non-streaming responses",
)
request_latency_std: float = Field(
default=0.0,
description="Standard deviation for request latency variation",
)
ttft_ms: float = Field(
default=150.0,
description="Time to first token in milliseconds for streaming responses",
)
ttft_ms_std: float = Field(
default=0.0,
description="Standard deviation for time to first token variation",
)
itl_ms: float = Field(
default=10.0,
description="Inter-token latency in milliseconds for streaming responses",
)
itl_ms_std: float = Field(
default=0.0,
description="Standard deviation for inter-token latency variation",
)
output_tokens: int = Field(
default=128, description="Number of output tokens to generate in responses"
)
output_tokens_std: float = Field(
default=0.0,
description="Standard deviation for output token count variation",
)

class Config:
env_prefix = "GUIDELLM_MOCK_SERVER_"
case_sensitive = False
17 changes: 17 additions & 0 deletions src/guidellm/mock_server/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""
HTTP request handlers for the GuideLLM mock server.
This module exposes request handlers that implement OpenAI-compatible API endpoints
for the mock server. The handlers provide realistic LLM simulation capabilities
including chat completions, legacy completions, and tokenization services with
configurable timing characteristics, token counting, and proper error handling to
support comprehensive benchmarking and testing scenarios.
"""

from __future__ import annotations

from .chat_completions import ChatCompletionsHandler
from .completions import CompletionsHandler
from .tokenizer import TokenizerHandler

__all__ = ["ChatCompletionsHandler", "CompletionsHandler", "TokenizerHandler"]
280 changes: 280 additions & 0 deletions src/guidellm/mock_server/handlers/chat_completions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
"""
OpenAI Chat Completions API endpoint handler for the mock server.

Provides a complete implementation of the /v1/chat/completions endpoint that simulates
realistic LLM behavior with configurable timing characteristics. Supports both streaming
and non-streaming responses with proper token counting, latency simulation including
TTFT (Time To First Token) and ITL (Inter-Token Latency), and OpenAI-compatible error
handling for comprehensive benchmarking scenarios.
"""

from __future__ import annotations

import asyncio
import json
import math
import time
import uuid

from pydantic import ValidationError
from sanic import response
from sanic.request import Request
from sanic.response import HTTPResponse, ResponseStream
from transformers import PreTrainedTokenizer

from guidellm.mock_server.config import MockServerConfig
from guidellm.mock_server.models import (
ChatCompletionChoice,
ChatCompletionsRequest,
ChatCompletionsResponse,
ChatMessage,
ErrorDetail,
ErrorResponse,
Usage,
)
from guidellm.mock_server.utils import (
MockTokenizer,
create_fake_text,
create_fake_tokens_str,
sample_number,
times_generator,
)

__all__ = ["ChatCompletionsHandler"]


class ChatCompletionsHandler:
"""
Handles OpenAI Chat Completions API requests with realistic LLM simulation.

Implements the /v1/chat/completions endpoint behavior including request validation,
response generation, and timing simulation. Supports both streaming and
non-streaming modes with configurable latency characteristics for comprehensive
benchmarking. Uses either a mock tokenizer or a real tokenizer for accurate token
counting and realistic text generation.

Example:
::
config = MockServerConfig(ttft_ms=100, itl_ms=50)
handler = ChatCompletionsHandler(config)
response = await handler.handle(request)
"""

def __init__(self, config: MockServerConfig) -> None:
"""
Initialize the Chat Completions handler with server configuration.

:param config: Mock server configuration containing timing and behavior settings
"""
self.config = config
self.tokenizer = (
MockTokenizer()
if config.processor is None
else PreTrainedTokenizer.from_pretrained(config.processor)
)

async def handle(self, request: Request) -> HTTPResponse:
"""
Process incoming chat completion requests with validation and routing.

Validates the request payload, handles errors gracefully, and routes to
appropriate streaming or non-streaming response handlers based on the
request configuration.

:param request: Sanic HTTP request containing chat completion parameters
:return: HTTP response with completion data or error information
:raises ValidationError: When request payload fails validation
:raises JSONDecodeError: When request contains invalid JSON
"""
try:
# Parse and validate request
req_data = ChatCompletionsRequest(**request.json)
except ValidationError as exc:
return response.json(
ErrorResponse(
error=ErrorDetail(
message=f"Invalid request: {str(exc)}",
type="invalid_request_error",
code="invalid_request",
)
).model_dump(),
status=400,
)
except (json.JSONDecodeError, TypeError):
return response.json(
ErrorResponse(
error=ErrorDetail(
message="Invalid JSON in request body",
type="invalid_request_error",
code="invalid_json",
)
).model_dump(),
status=400,
)

# Handle streaming vs non-streaming
if req_data.stream:
return await self._handle_stream(req_data)
else:
return await self._handle_non_stream(req_data)

async def _handle_non_stream(self, req: ChatCompletionsRequest) -> HTTPResponse:
"""
Generate complete non-streaming chat completion response.

Simulates realistic LLM behavior with TTFT and ITL delays, generates
appropriate token counts, and returns a complete response with usage
statistics and generated content.

:param req: Validated chat completion request parameters
:return: Complete HTTP response with generated completion data
"""
# TTFT delay
await asyncio.sleep(
sample_number(self.config.ttft_ms, self.config.ttft_ms_std) / 1000.0
)

# Token counts
prompt_text = self.tokenizer.apply_chat_template(req.messages)
prompt_tokens = len(self.tokenizer(prompt_text))
max_tokens = req.max_completion_tokens or req.max_tokens or math.inf
completion_tokens_count = min(
sample_number(self.config.output_tokens, self.config.output_tokens_std),
max_tokens,
)

# ITL delay
itl_delay = 0.0
delays_iter = iter(times_generator(self.config.itl_ms, self.config.itl_ms_std))
for _ in range(int(completion_tokens_count) - 1):
itl_delay += next(delays_iter)
await asyncio.sleep(itl_delay / 1000.0)

# Response
chat_response = ChatCompletionsResponse(
id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
model=req.model,
choices=[
ChatCompletionChoice(
index=0,
message=ChatMessage(
role="assistant",
content=create_fake_text(
int(completion_tokens_count), self.tokenizer
),
),
finish_reason="stop",
)
],
usage=Usage(
prompt_tokens=prompt_tokens,
completion_tokens=int(completion_tokens_count),
),
system_fingerprint=f"fp_{uuid.uuid4().hex[:10]}",
)

return response.json(chat_response.model_dump())

async def _handle_stream(self, req: ChatCompletionsRequest) -> HTTPResponse:
"""
Generate streaming chat completion response with real-time token delivery.

Creates a streaming response that delivers tokens incrementally with
realistic timing delays. Supports optional usage statistics in the final
stream chunk when requested via stream_options.

:param req: Validated chat completion request with streaming enabled
:return: Streaming HTTP response delivering tokens with proper timing
"""

async def generate_stream(stream_response):
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"

# TTFT delay
await asyncio.sleep(
sample_number(self.config.ttft_ms, self.config.ttft_ms_std) / 1000.0
)

# Token counts
prompt_text = self.tokenizer.apply_chat_template(req.messages)
prompt_tokens = len(self.tokenizer(prompt_text))
max_tokens = req.max_completion_tokens or req.max_tokens or math.inf
completion_tokens_count = int(
min(
sample_number(
self.config.output_tokens, self.config.output_tokens_std
),
max_tokens,
)
)

# Send tokens
tokens = create_fake_tokens_str(completion_tokens_count, self.tokenizer)
delays_iter = iter(
times_generator(self.config.itl_ms, self.config.itl_ms_std)
)

for index, token in enumerate(tokens):
if index > 0:
itl_delay = next(delays_iter)
await asyncio.sleep(itl_delay / 1000.0)

chunk_data = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": req.model,
"choices": [
{
"index": 0,
"delta": {"content": token},
"finish_reason": None,
}
],
}
await stream_response.write(f"data: {json.dumps(chunk_data)}\n\n")

# Send final chunk with finish reason
final_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": req.model,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
}
await stream_response.write(f"data: {json.dumps(final_chunk)}\n\n")

# Send usage if requested
if req.stream_options and req.stream_options.include_usage:
usage_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": req.model,
"choices": [],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens_count,
"total_tokens": prompt_tokens + completion_tokens_count,
},
}
await stream_response.write(f"data: {json.dumps(usage_chunk)}\n\n")

# End stream
await stream_response.write("data: [DONE]\n\n")

return ResponseStream( # type: ignore[return-value]
generate_stream,
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
Loading
Loading