Skip to content

Commit 08476d7

Browse files
feat: add metrics_aggregator service for real-time metric computation (#188)
* feat: add metrics_aggregator service for real-time metric computation - MetricsAggregator: subscribes to ZMQ events, computes QPS/latency/TTFT/TPOT - MetricsTable: columnar storage for efficient percentile calculations - TokenMetrics: ISL/OSL token-level metrics from PromptData - MetricsEmitter: periodic metric publishing - Service entry point with CLI interface - Comprehensive unit and e2e tests * Add design docs for subscriber services * Remove 'errors-only after close' behavior for EventLogger * Move thread-local storage inside TokenizePool instance. Remove unused .tokenize call - only .token_count is used * Improve metrics aggregator design for performance and handling start/stop tracking events for warmup+accuracy * Improve upon aggregator by specifically distinguishing metrics done in streaming mode, fix race condition in TPOT as part of this change * Minor fixes, comments * Fixes for ManagedZMQContext socket_dir - make the 'Managed' part more 'Manage-y' * Add fix for unit test failure * Replace `...` in abstractmethod with `raise` Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> * Use mock objects to prevent 'missing super().__init()' code review rule from triggering * Update design.md with aggregator changes --------- Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
1 parent 0a20748 commit 08476d7

File tree

28 files changed

+4141
-333
lines changed

28 files changed

+4141
-333
lines changed

docs/async_utils/services/design.md

Lines changed: 323 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
# Event Logger Service — Design Document
2+
3+
## Overview
4+
5+
The event logger is a ZMQ subscriber service that consumes `EventRecord` messages
6+
from the pub/sub event bus and persists them to one or more storage backends.
7+
8+
It runs as an independent subprocess with its own event loop, connected to the
9+
same ZMQ PUB socket as other subscriber services (e.g. metrics aggregator).
10+
11+
```
12+
ZMQ PUB (ipc://)
13+
14+
┌──────────────┼──────────────┐
15+
▼ ▼ ▼
16+
EventLogger MetricsAggregator (future subscribers)
17+
(JSONL/SQL) (real-time metrics)
18+
```
19+
20+
## Module Layout
21+
22+
```
23+
event_logger/
24+
├── __main__.py # CLI entry point + EventLoggerService
25+
├── writer.py # RecordWriter ABC (flush interval support)
26+
├── file_writer.py # JSONLWriter (msgspec-based JSONL output)
27+
└── sql_writer.py # SQLWriter (SQLAlchemy, default sqlite)
28+
```
29+
30+
## Subscribed Events
31+
32+
The event logger subscribes to **all topics** (`topics=None`) so that every
33+
published `EventRecord` is persisted. It does not interpret event semantics —
34+
it writes records verbatim to all configured writers.
35+
36+
The only event type with special handling is `SessionEventType.ENDED`, which
37+
triggers shutdown (see [Lifecycle](#lifecycle)).
38+
39+
## Writer System
40+
41+
### RecordWriter ABC
42+
43+
`RecordWriter` defines the interface for all writer backends:
44+
45+
```python
46+
class RecordWriter(ABC):
47+
def write(self, record: EventRecord) -> None # calls _write_record + auto-flush
48+
def _write_record(self, record: EventRecord) -> None # abstract
49+
def flush(self) -> None
50+
def close(self) -> None # abstract
51+
```
52+
53+
`write()` calls the subclass's `_write_record()`, increments a counter, and
54+
auto-flushes when the counter reaches `flush_interval`. This keeps the flush
55+
logic in one place regardless of backend.
56+
57+
### Writer Registry
58+
59+
The CLI maps writer names to classes via `_WRITER_REGISTRY`:
60+
61+
| Name | Class | Output File |
62+
| ------- | ------------- | -------------- |
63+
| `jsonl` | `JSONLWriter` | `events.jsonl` |
64+
| `sql` | `SQLWriter` | `events.db` |
65+
66+
Multiple writers can be active simultaneously (e.g. `--writers jsonl sql`).
67+
Each record is written to every configured writer.
68+
69+
## JSONL Writer
70+
71+
`JSONLWriter` writes one JSON line per `EventRecord` using `msgspec.json.Encoder`
72+
with a custom `enc_hook` that serializes `EventType` enum members as their topic
73+
strings (e.g. `"session.ended"`, `"sample.complete"`).
74+
75+
Output path: `{log_dir}/events.jsonl`
76+
77+
Each line is a complete JSON object representing one `EventRecord`:
78+
79+
```json
80+
{
81+
"event_type": "sample.issued",
82+
"timestamp_ns": 1234567890,
83+
"sample_uuid": "abc-123",
84+
"data": null
85+
}
86+
```
87+
88+
## SQL Writer
89+
90+
`SQLWriter` uses SQLAlchemy (default: sqlite) and maps each `EventRecord` to an
91+
`EventRowModel` row:
92+
93+
| Column | Type | Content |
94+
| -------------- | ------------ | ------------------------------------------ |
95+
| `id` | Integer (PK) | Auto-increment |
96+
| `sample_uuid` | String | Sample UUID |
97+
| `event_type` | String | Topic string (e.g. `"sample.complete"`) |
98+
| `timestamp_ns` | BigInteger | Monotonic nanosecond timestamp |
99+
| `data` | LargeBinary | `msgspec.json.encode(record.data)` (bytes) |
100+
101+
Output path: `sqlite:///{log_dir}/events.db`
102+
103+
The backend is swappable via the `url` parameter (e.g. PostgreSQL), though the
104+
CLI currently only supports the default sqlite path.
105+
106+
## Lifecycle
107+
108+
### Startup
109+
110+
```
111+
python -m inference_endpoint.async_utils.services.event_logger \
112+
--log-dir /path/to/logs \
113+
--socket-dir /path/to/socket_dir \
114+
--socket-name ev_pub_abc123 \
115+
--writers jsonl sql
116+
```
117+
118+
1. Parse CLI arguments.
119+
2. Create writer instances (one per `--writers` entry), writing to `{log_dir}/events.*`.
120+
3. Create `ManagedZMQContext.scoped(socket_dir=args.socket_dir)` with the publisher's socket directory.
121+
4. Create `EventLoggerService` (extends `ZmqEventRecordSubscriber`), which connects
122+
to the publisher via `ctx.connect(socket, socket_name)`.
123+
5. Call `logger.start()` which registers `add_reader` on the subscriber's event loop.
124+
6. `await shutdown_event.wait()` blocks until shutdown is signalled.
125+
126+
### Processing
127+
128+
Each batch of records decoded from the ZMQ socket is passed to `process(records)`:
129+
130+
```
131+
for record in records:
132+
if _shutdown_received:
133+
skip
134+
if record is ENDED:
135+
set _shutdown_received
136+
write to all writers
137+
if saw ENDED:
138+
flush + close writers, request stop
139+
```
140+
141+
All records up to and including `SessionEventType.ENDED` are written. All records
142+
after ENDED (in the same or subsequent batches) are dropped regardless of type.
143+
144+
### Shutdown
145+
146+
When `SessionEventType.ENDED` is received:
147+
148+
1. The ENDED record is written to all writers.
149+
2. All remaining records in the batch are dropped.
150+
3. Writers are flushed and closed; the writer list is cleared.
151+
4. `_request_stop()` is scheduled on the event loop:
152+
- Removes the socket reader (via `close()`)
153+
- Sets the `shutdown_event` so the process can exit
154+
155+
```mermaid
156+
stateDiagram-v2
157+
[*] --> Created: __init__
158+
Created --> Listening: start
159+
Listening --> Processing: _on_readable
160+
Processing --> Listening: batch done, no ENDED
161+
Listening --> ShuttingDown: ENDED in batch
162+
Processing --> ShuttingDown: ENDED in batch
163+
ShuttingDown --> [*]: writers closed, loop stopped
164+
```
165+
166+
## Data Flow
167+
168+
```mermaid
169+
flowchart TB
170+
subgraph Sub["EventLoggerService"]
171+
A[ZMQ SUB socket]
172+
B[_on_readable]
173+
C[decode EventRecord]
174+
D[process batch]
175+
E{shutdown_received?}
176+
F[write to all writers]
177+
G{ENDED?}
178+
H[set shutdown_received]
179+
I[flush + close writers]
180+
J[_request_stop]
181+
A --> B --> C --> D --> E
182+
E -->|no| F
183+
E -->|yes| D
184+
F --> G
185+
G -->|no| D
186+
G -->|yes| H --> F --> I --> J
187+
end
188+
subgraph Writers["Writers"]
189+
W1[JSONLWriter]
190+
W2[SQLWriter]
191+
end
192+
F --> W1
193+
F --> W2
194+
```
195+
196+
## CLI Interface
197+
198+
```
199+
usage: python -m inference_endpoint.async_utils.services.event_logger
200+
--log-dir LOG_DIR
201+
--socket-dir SOCKET_DIR
202+
--socket-name SOCKET_NAME
203+
[--writers WRITER [WRITER ...]]
204+
```
205+
206+
| Argument | Required | Default | Description |
207+
| --------------- | -------- | ------- | --------------------------------------------------------- |
208+
| `--log-dir` | Yes || Directory for log output files |
209+
| `--socket-dir` | Yes || Directory containing ZMQ IPC sockets (must already exist) |
210+
| `--socket-name` | Yes || Socket name within socket-dir |
211+
| `--writers` | No | `jsonl` | Writer backends: `jsonl`, `sql`, or both |
212+
213+
## Not Yet Wired
214+
215+
The EventRecord pub/sub infrastructure is ready, but actual `publish(EventRecord(...))`
216+
calls have not been connected in the load generator or worker processes. Once wired,
217+
the event logger will receive and persist all session/sample/error events published
218+
during a benchmark run.

0 commit comments

Comments
 (0)