Skip to content

Unified Record Handler Architecture #610

@ajcasagrande

Description

@ajcasagrande

Unified Record Handler Architecture

This document describes the unified record handler architecture, a refactor that introduces generic and extensible design patterns for record processing in AIPerf.

Overview

The records system is a protocol-based, factory-registered architecture for handling three distinct data streams in the AIPerf benchmarking pipeline:

Data Stream Source Message Type Purpose
Metric Records RecordProcessor METRIC_RECORDS Inference metrics from LLM API calls
Telemetry Records TelemetryManager TELEMETRY_RECORDS GPU metrics from DCGM
Server Metrics Records ServerMetricsManager SERVER_METRICS_RECORDS Prometheus server metrics

Architecture Comparison

This comparison is based on origin/main branch.

Current Architecture (origin/main)

The current system on origin/main uses a single factory with multiple separate protocols:

┌─────────────────────────────────────────────────────────────┐
│                    CURRENT (origin/main)                     │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  RecordsManager                                              │
│    │                                                         │
│    ├── ResultsProcessorFactory (single factory)              │
│    │     └── ResultsProcessorType enum (7 types, mixed)      │
│    │                                                         │
│    ├── _metric_results_processors: list[ResultsProcessor]    │
│    ├── _gpu_telemetry_processors: list[GPUTelemetryProc]     │
│    ├── _server_metrics_processors: list[ServerMetricsProc]   │
│    │     └── 3 separate protocols, different method names    │
│    │                                                         │
│    ├── _gpu_telemetry_accumulator: GPUTelemetryAccumulator   │
│    ├── _server_metrics_accumulator: ServerMetricsAccumulator │
│    │     └── Special accumulator references for queries      │
│    │                                                         │
│    ├── RecordsTracker (helper class)                         │
│    └── ErrorTracker (helper class)                           │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Current Code Structure (origin/main):

# Current: Single factory, single enum mixing all processor types
class ResultsProcessorType(CaseInsensitiveStrEnum):
    METRIC_RESULTS = "metric_results"              # Accumulator
    RECORD_EXPORT = "record_export"                # Exporter
    GPU_TELEMETRY_ACCUMULATOR = "gpu_telemetry_accumulator"      # Accumulator
    GPU_TELEMETRY_JSONL_WRITER = "gpu_telemetry_jsonl_writer"    # Exporter
    SERVER_METRICS_ACCUMULATOR = "server_metrics_accumulator"    # Accumulator
    SERVER_METRICS_JSONL_WRITER = "server_metrics_jsonl_writer"  # Exporter
    TIMESLICE = "timeslice"                        # Sub-processor

# Current: Multiple separate protocols with different method signatures
class ResultsProcessorProtocol(Protocol):
    async def process_result(self, record_data: MetricRecordsData) -> None: ...

class GPUTelemetryProcessorProtocol(Protocol):
    async def process_telemetry_record(self, record: TelemetryRecord) -> None: ...

class ServerMetricsProcessorProtocol(Protocol):
    async def process_server_metrics_record(self, record: ServerMetricsRecord) -> None: ...

# Current: Multiple handler lists with isinstance-based routing
self._metric_results_processors: list[ResultsProcessorProtocol] = []
self._gpu_telemetry_processors: list[GPUTelemetryProcessorProtocol] = []
self._server_metrics_processors: list[ServerMetricsProcessorProtocol] = []
self._gpu_telemetry_accumulator: GPUTelemetryAccumulatorProtocol | None = None
self._server_metrics_accumulator: ServerMetricsAccumulatorProtocol | None = None

for results_processor_type in ResultsProcessorFactory.get_all_class_types():
    processor = ResultsProcessorFactory.create_instance(...)
    # Multiple isinstance checks for routing
    if isinstance(processor, GPUTelemetryAccumulatorProtocol):
        self._gpu_telemetry_accumulator = processor
    if isinstance(processor, GPUTelemetryProcessorProtocol):
        self._gpu_telemetry_processors.append(processor)
    if isinstance(processor, ServerMetricsAccumulatorProtocol):
        self._server_metrics_accumulator = processor
    # ... more isinstance checks

Current Limitations (origin/main):

  1. Mixed concerns in single enum - ResultsProcessorType combines accumulators (3), exporters (3), and sub-processors (1)
  2. Split protocols - 3 separate protocols with different method names (process_result(), process_telemetry_record(), process_server_metrics_record())
  3. isinstance-based dispatch - Runtime type checking to route processors to 5+ handler lists
  4. No unified dispatch - Each record type needs separate handling code
  5. Accumulator special-casing - Accumulators stored separately from processor lists for query access
  6. Hard to extend - Adding a new record type requires modifying RecordsManager with new lists and isinstance checks

Refactored Architecture (This Branch)

This refactor introduces a unified protocol-based design with separated factories:

┌─────────────────────────────────────────────────────────────┐
│                    NEW PATTERN                               │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  RecordsManager                                              │
│    │                                                         │
│    ├── RecordRouter (unified dispatch)                       │
│    │     │                                                   │
│    │     ├── AccumulatorFactory (via _create_record_handlers)│
│    │     │     └── Protocol-based accumulators               │
│    │     │                                                   │
│    │     ├── SubProcessorFactory (via _create_record_handlers)│
│    │     │     └── Protocol-based sub-processors             │
│    │     │                                                   │
│    │     └── StreamExporterFactory (via _create_record_handlers)│
│    │           └── Protocol-based exporters                  │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Core Components

1. RecordRouter

The RecordRouter provides O(1) dispatch to all registered handlers based on record type.

Location: src/aiperf/records/record_router.py

class RecordRouter(AIPerfLoggerMixin):
    """High-performance dispatcher for routing records to handlers."""

    def __init__(
        self,
        accumulators: dict[AccumulatorType, AccumulatorProtocol] | None = None,
        sub_processors: dict[SubProcessorType, SubProcessorProtocol] | None = None,
        stream_exporters: dict[StreamExporterType, StreamExporterProtocol] | None = None,
    ):
        # Build unified type → handler lookup table
        self._dispatch: dict[type, list[RecordHandlerProtocol]] = defaultdict(list)

        # Populate from all handler sources
        for acc in (accumulators or {}).values():
            for record_type in acc.supported_record_types:
                self._dispatch[record_type].append(acc)
        # ... same for sub_processors and stream_exporters

    async def dispatch(self, record: Any) -> None:
        """Dispatch to all matching handlers in parallel."""
        handlers = self._dispatch.get(type(record), [])
        if handlers:
            await asyncio.gather(
                *[handler.process_record(record) for handler in handlers],
                return_exceptions=True,
            )

Key Benefits:

  • Single dispatch point for all record types
  • O(1) lookup via pre-built dispatch table
  • Parallel execution of all matching handlers
  • No ownership of handler lifecycle (RecordsManager manages lifecycle)

2. Protocol Hierarchy

All handlers implement RecordHandlerProtocol, enabling unified dispatch:

RecordHandlerProtocol (base)
    │
    ├── AccumulatorProtocol
    │     - Stateful data stores
    │     - Time-range queries
    │     - Summarization
    │
    ├── SubProcessorProtocol
    │     - Query accumulators
    │     - Enhanced processing
    │     - Declare dependencies
    │
    └── StreamExporterProtocol
        - Real-time file output
        - JSONL streaming
        - Finalization hooks

Location: src/aiperf/common/protocols.py

RecordHandlerProtocol (Base)

@runtime_checkable
class RecordHandlerProtocol(Protocol):
    """Base protocol for any handler that processes records."""

    supported_record_types: tuple[type, ...]

    async def process_record(self, record: Any) -> None:
        """Process an incoming record."""
        ...

AccumulatorProtocol

@runtime_checkable
class AccumulatorProtocol(RecordHandlerProtocol, AIPerfLifecycleProtocol, Protocol):
    """Stateful data stores that receive and store records."""

    accumulator_type: AccumulatorType

    def query_time_range(self, start_ns: int | None, end_ns: int | None) -> HierarchyT:
        """Filter data within a time range."""
        ...

    def get_hierarchy(self) -> HierarchyT:
        """Get complete hierarchical data structure."""
        ...

    async def summarize(self) -> AccumulatorSummary:
        """Summarize accumulated data into results."""
        ...

    def to_json_export(self) -> dict[str, Any] | None:
        """Export as JSON for profile_export.json."""
        ...

    def to_csv_rows(self) -> list[dict[str, Any]] | None:
        """Export as CSV rows."""
        ...

SubProcessorProtocol

@runtime_checkable
class SubProcessorProtocol(RecordHandlerProtocol, AIPerfLifecycleProtocol, Protocol):
    """Processors that query accumulators for enhanced processing."""

    processor_type: SubProcessorType
    required_accumulators: set[AccumulatorType]

    async def summarize(self) -> ProcessorSummary:
        """Summarize processor results."""
        ...

StreamExporterProtocol

@runtime_checkable
class StreamExporterProtocol(RecordHandlerProtocol, AIPerfLifecycleProtocol, Protocol):
    """Exporters that write records in real-time to JSONL files."""

    exporter_type: StreamExporterType
    required_accumulators: set[AccumulatorType]

    async def finalize(self) -> ExportSummary:
        """Finalize export after all records processed."""
        ...

3. Factory Registration

Each handler type has a dedicated factory for runtime registration:

Location: src/aiperf/common/factories.py

class AccumulatorFactory(AIPerfFactory[AccumulatorType, AccumulatorProtocol]):
    """Factory for creating AccumulatorProtocol instances."""
    pass

class SubProcessorFactory(AIPerfFactory[SubProcessorType, SubProcessorProtocol]):
    """Factory for creating SubProcessorProtocol instances."""
    pass

class StreamExporterFactory(AIPerfFactory[StreamExporterType, StreamExporterProtocol]):
    """Factory for creating StreamExporterProtocol instances."""
    pass

4. Type Enums

Location: src/aiperf/common/enums/post_processor_enums.py

class AccumulatorType(CaseInsensitiveStrEnum):
    METRICS = "metrics"              # Inference metrics
    TELEMETRY = "telemetry"          # GPU telemetry
    SERVER_METRICS = "server_metrics"  # Prometheus metrics

class SubProcessorType(CaseInsensitiveStrEnum):
    TIMESLICE = "timeslice"          # Time-window partitioning

class StreamExporterType(CaseInsensitiveStrEnum):
    RECORDS_JSONL = "records_jsonl"
    TELEMETRY_JSONL = "telemetry_jsonl"
    SERVER_METRICS_JSONL = "server_metrics_jsonl"

Data Flow

┌─────────────────────────────────────────────────────────────────────────┐
│                          UNIFIED RECORD FLOW                             │
└─────────────────────────────────────────────────────────────────────────┘

                    ┌──────────────────┐
                    │   Data Sources   │
                    └────────┬─────────┘
                             │
        ┌────────────────────┼────────────────────┐
        ▼                    ▼                    ▼
┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│RecordProcessor│   │TelemetryMgr  │   │ServerMetricsMgr│
│(inference)    │   │(DCGM)        │   │(Prometheus)   │
└───────┬───────┘   └───────┬───────┘   └───────┬───────┘
        │                   │                   │
        ▼                   ▼                   ▼
┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│MetricRecords  │   │TelemetryRecs │   │ServerMetrics  │
│Message        │   │Message       │   │RecsMessage    │
└───────┬───────┘   └───────┬───────┘   └───────┬───────┘
        │                   │                   │
        └───────────────────┼───────────────────┘
                            ▼
                ┌───────────────────────┐
                │    RecordsManager     │
                │  ┌─────────────────┐  │
                │  │  RecordRouter   │  │
                │  │  (dispatch)     │  │
                │  └────────┬────────┘  │
                └───────────┼───────────┘
                            │
        ┌───────────────────┼───────────────────┐
        ▼                   ▼                   ▼
┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│ Accumulators  │   │Sub-Processors │   │Stream Exporters│
│ (store)       │   │ (enhance)     │   │ (write JSONL) │
└───────────────┘   └───────────────┘   └───────────────┘
        │                   │                   │
        └───────────────────┼───────────────────┘
                            ▼
                ┌───────────────────────┐
                │   summarize() / finalize()  │
                └───────────────────────┘
                            │
                            ▼
                ┌───────────────────────┐
                │  ProcessResults       │
                │  Messages             │
                └───────────────────────┘

Registered Implementations

Accumulators

Type Class Supported Records Purpose
METRICS MetricResultsProcessor MetricRecordsData Inference metrics accumulation
TELEMETRY TelemetryResultsProcessor TelemetryRecord GPU telemetry accumulation
SERVER_METRICS ServerMetricsResultsProcessor ServerMetricsRecord Server metrics accumulation

Sub-Processors

Type Class Required Accumulators Purpose
TIMESLICE TimesliceMetricResultsProcessor {METRICS} Time-window partitioned metrics

Stream Exporters

Type Class Output File Purpose
RECORDS_JSONL RecordExportResultsProcessor records.jsonl Inference metrics export
TELEMETRY_JSONL TelemetryExportResultsProcessor telemetry.jsonl GPU telemetry export
SERVER_METRICS_JSONL ServerMetricsJSONLWriter server_metrics.jsonl Server metrics export

Extensibility Guide

Adding a New Accumulator

  1. Add enum value in post_processor_enums.py:

    class AccumulatorType(CaseInsensitiveStrEnum):
        # existing...
        MY_NEW_TYPE = "my_new_type"
  2. Implement the protocol:

    @AccumulatorFactory.register(AccumulatorType.MY_NEW_TYPE)
    class MyNewAccumulator:
        accumulator_type = AccumulatorType.MY_NEW_TYPE
        supported_record_types = (MyRecordData,)
    
        async def process_record(self, record: MyRecordData) -> None:
            # Store the record
            ...
    
        def query_time_range(self, start_ns: int | None, end_ns: int | None):
            # Filter by time
            ...
    
        def get_hierarchy(self):
            # Return hierarchical data
            ...
    
        async def summarize(self) -> AccumulatorSummary:
            # Compute summary statistics
            ...
  3. Done! The accumulator is automatically:

    • Instantiated by RecordsManager
    • Added to RecordRouter's dispatch table
    • Called for matching record types

Adding a New Sub-Processor

@SubProcessorFactory.register(SubProcessorType.MY_PROCESSOR)
class MySubProcessor:
    processor_type = SubProcessorType.MY_PROCESSOR
    required_accumulators = {AccumulatorType.METRICS}  # Dependencies
    supported_record_types = (MetricRecordsData,)

    def __init__(self, accumulators: dict[AccumulatorType, AccumulatorProtocol], ...):
        # Access to accumulators for querying
        self._metrics = accumulators[AccumulatorType.METRICS]

    async def process_record(self, record: MetricRecordsData) -> None:
        # Enhanced processing with accumulator access
        historical = self._metrics.query_time_range(start, end)
        ...

    async def summarize(self) -> ProcessorSummary:
        ...

Adding a New Stream Exporter

@StreamExporterFactory.register(StreamExporterType.MY_EXPORTER)
class MyExporter:
    exporter_type = StreamExporterType.MY_EXPORTER
    required_accumulators = set()  # Usually none needed
    supported_record_types = (MyRecordData,)

    async def process_record(self, record: MyRecordData) -> None:
        # Write to output file in real-time
        await self._write_line(record.model_dump_json())

    async def finalize(self) -> ExportSummary:
        # Flush buffers, return summary
        ...

Key Benefits Summary

Comparison between origin/main and this refactored branch:

Aspect origin/main Refactored (This Branch)
Factories 1 (ResultsProcessorFactory) 3 (AccumulatorFactory, SubProcessorFactory, StreamExporterFactory)
Enums 1 mixed enum (ResultsProcessorType with 7 types) 3 specialized enums (AccumulatorType, SubProcessorType, StreamExporterType)
Protocols 3 split protocols with different methods (process_result(), process_telemetry_record(), process_server_metrics_record()) 1 unified base (RecordHandlerProtocol.process_record()) + 3 specialized
Dispatch isinstance() checks + 5 handler lists RecordRouter with O(1) type-based lookup
Handler Lists 5 separate lists + 2 accumulator refs Single unified dispatch table
Querying Accumulator refs stored separately query_time_range() on all accumulators via router
Dependencies Implicit (hidden in RecordsManager) Explicit via required_accumulators
Adding New Type Modify RecordsManager + add isinstance() checks + new list Register with decorator, auto-discovered
Helper Classes RecordsTracker, ErrorTracker Integrated into RecordsManager

Side-by-Side Code Comparison

Initialization

origin/main:

# Multiple handler lists, manual isinstance() routing
self._metric_results_processors: list[ResultsProcessorProtocol] = []
self._gpu_telemetry_processors: list[GPUTelemetryProcessorProtocol] = []
self._server_metrics_processors: list[ServerMetricsProcessorProtocol] = []
self._gpu_telemetry_accumulator: GPUTelemetryAccumulatorProtocol | None = None
self._server_metrics_accumulator: ServerMetricsAccumulatorProtocol | None = None

for results_processor_type in ResultsProcessorFactory.get_all_class_types():
    processor = ResultsProcessorFactory.create_instance(...)
    # Multiple isinstance checks for routing to different lists
    if isinstance(processor, GPUTelemetryAccumulatorProtocol):
        self._gpu_telemetry_accumulator = processor
    if isinstance(processor, GPUTelemetryProcessorProtocol):
        self._gpu_telemetry_processors.append(processor)
    if isinstance(processor, ServerMetricsAccumulatorProtocol):
        self._server_metrics_accumulator = processor
    if isinstance(processor, ServerMetricsProcessorProtocol):
        self._server_metrics_processors.append(processor)
    # ... more isinstance checks for metric processors

Refactored (this branch):

# Three factories, unified via RecordRouter
self._accumulators: dict[AccumulatorType, AccumulatorProtocol] = (
    self._create_record_handlers(AccumulatorFactory)
)
self._sub_processors: dict[SubProcessorType, SubProcessorProtocol] = (
    self._create_record_handlers(SubProcessorFactory)
)
self._stream_exporters: dict[StreamExporterType, StreamExporterProtocol] = (
    self._create_record_handlers(StreamExporterFactory)
)

# Unified router - O(1) dispatch based on record type
self._router = RecordRouter(
    accumulators=self._accumulators,
    sub_processors=self._sub_processors,
    stream_exporters=self._stream_exporters,
)

Record Dispatch

origin/main:

# Separate processor lists, separate dispatch per record type
# In message handlers:
for processor in self._metric_results_processors:
    await processor.process_result(record_data)

for processor in self._gpu_telemetry_processors:
    await processor.process_telemetry_record(record)

for processor in self._server_metrics_processors:
    await processor.process_server_metrics_record(record)

Refactored (this branch):

# Unified dispatch - same call for all record types
await self._router.dispatch(record_data)      # MetricRecordsData
await self._router.dispatch(record)           # TelemetryRecord
await self._router.dispatch(server_record)    # ServerMetricsRecord

Protocol Methods

origin/main:

# Different method names per protocol
class ResultsProcessorProtocol(Protocol):
    async def process_result(self, record_data: MetricRecordsData) -> None: ...

class GPUTelemetryProcessorProtocol(Protocol):
    async def process_telemetry_record(self, record: TelemetryRecord) -> None: ...

class ServerMetricsProcessorProtocol(Protocol):
    async def process_server_metrics_record(self, record: ServerMetricsRecord) -> None: ...

Refactored (this branch):

# Unified base protocol
class RecordHandlerProtocol(Protocol):
    supported_record_types: tuple[type, ...]
    async def process_record(self, record: Any) -> None: ...

# All specialized protocols inherit from base
class AccumulatorProtocol(RecordHandlerProtocol, ...): ...
class SubProcessorProtocol(RecordHandlerProtocol, ...): ...
class StreamExporterProtocol(RecordHandlerProtocol, ...): ...

Adding a New Record Type

origin/main:

  1. Add new enum value to ResultsProcessorType
  2. Create new protocol (e.g., NewTypeProcessorProtocol)
  3. Modify RecordsManager.__init__ to add new handler list
  4. Add multiple isinstance() checks for routing
  5. Optionally add accumulator reference if queries needed
  6. Add dispatch logic in message handler

Refactored (this branch):

  1. Add new enum value to AccumulatorType (or StreamExporterType)
  2. Implement class with @AccumulatorFactory.register() decorator
  3. Set supported_record_types = (MyNewRecord,)
  4. Done! Router auto-discovers via supported_record_types

File Reference

File Purpose
src/aiperf/records/records_manager.py Central orchestration service
src/aiperf/records/record_router.py O(1) dispatch logic
src/aiperf/records/record_processor_service.py Distributed metric computation
src/aiperf/common/protocols.py Protocol definitions
src/aiperf/common/factories.py Factory definitions
src/aiperf/common/enums/post_processor_enums.py Type enums
src/aiperf/post_processors/metric_results_processor.py Metrics accumulator
src/aiperf/post_processors/timeslice_metric_results_processor.py Timeslice sub-processor
src/aiperf/post_processors/telemetry_results_processor.py Telemetry accumulator
src/aiperf/server_metrics/results_processor.py Server metrics accumulator
src/aiperf/server_metrics/jsonl_exporter.py Server metrics JSONL writer (ServerMetricsJSONLWriter)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions