| sidebar-title | Code Patterns |
|---|
Code examples for common development tasks. Referenced from CLAUDE.md.
Commands live in src/aiperf/cli_commands/, one file per command. They are
lazily loaded via import strings in aiperf.cli — modules are only imported
when their command is invoked:
# aiperf/cli.py — register with lazy import strings
app.command("aiperf.cli_commands.profile:app", name="profile")# aiperf/cli_commands/profile.py — thin command definition
from cyclopts import App
from aiperf.common.config import ServiceConfig, UserConfig
app = App(name="profile")
@app.default
def profile(user_config: UserConfig, service_config: ServiceConfig | None = None) -> None:
"""Run the Profile subcommand."""
from aiperf.cli_runner import run_system_controller # heavy import deferred
run_system_controller(user_config, service_config)Conventions:
- Export a single
Appnamedapp. - Hyphenate multi-word commands:
App(name="analyze-trace"). - Keep module-level imports minimal; heavy deps go inside the function body.
- Heavy implementation logic lives in a
cli.pyinside the owning domain package (e.g.aiperf/plugin/cli.py), lazily imported at call time.
Services run in separate processes via bootstrap.py:
class MyService(BaseComponentService):
@on_message(MessageType.MY_MSG)
async def _handle(self, msg: MyMsg) -> None:
await self.publish(ResponseMsg(data=msg.data))Register in plugins.yaml:
service:
my_service:
class: aiperf.my_module.my_service:MyService
description: My custom service
metadata:
required: true
auto_start: trueConfig types:
ServiceConfig: infrastructure (ZMQ ports, logging level)UserConfig: benchmark params (endpoints, loadgen settings)
Use AIPerfBaseModel for data, BaseConfig for configuration:
from pydantic import Field
from aiperf.common.models import AIPerfBaseModel
class Record(AIPerfBaseModel):
ts_ns: int = Field(description="Timestamp in nanoseconds")
value: float = Field(description="Measured value")Messages require message_type field and handler decorator:
from aiperf.common.messages import Message
from aiperf.common.hooks import on_message
class MyMsg(Message):
message_type: MessageType = MessageType.MY_MSG
data: list[Record] = Field(description="Records to process")
# In service class:
@on_message(MessageType.MY_MSG)
async def _handle(self, msg: MyMsg) -> None:
await self.publish(OtherMsg(data=msg.data))Auto-subscription happens during @on_init phase.
YAML-based registry with lazy-loading:
# plugins.yaml
endpoint:
chat:
class: aiperf.endpoints.openai_chat:ChatEndpoint
description: OpenAI Chat Completions endpoint
metadata:
endpoint_path: /v1/chat/completions
supports_streaming: true
produces_tokens: true
tokenizes_input: true
supports_audio: true
supports_images: true
supports_videos: true
metrics_title: LLM Metricsfrom aiperf.plugin import plugins
from aiperf.plugin.enums import PluginType
EndpointClass = plugins.get_class(PluginType.ENDPOINT, 'chat')Log errors and publish ErrorDetails in messages:
try:
await risky_operation()
except Exception as e:
self.error(f"Operation failed: {e!r}")
await self.publish(ResultMsg(error=ErrorDetails.from_exception(e)))Use lambda for expensive log messages:
# Expensive - lambda defers evaluation
self.debug(lambda: f"Processing {len(self._items())} items")
# Cheap - direct string is fine
self.info("Starting service")import pytest
from aiperf.plugin import plugins
from aiperf.plugin.enums import PluginType
from tests.harness import mock_plugin
@pytest.mark.asyncio
async def test_async_operation():
result = await some_async_func()
assert result.status == "ok"
@pytest.mark.parametrize("input,expected",
[
("a", 1),
("b", 2),
]
) # fmt: skip
def test_with_params(input, expected):
assert process(input) == expected
def test_with_mock_plugin():
with mock_plugin(PluginType.ENDPOINT, "test", MockClass):
assert plugins.get_class(PluginType.ENDPOINT, "test") == MockClassAuto-fixtures (always active): asyncio.sleep runs instantly, RNG=42, singletons reset.