Skip to content

Commit 86a0dfe

Browse files
simonrosenbergclaudeopenhands-agent
authored
Add event-sourcing system benchmarks (#2032)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: openhands <openhands@all-hands.dev>
1 parent 21d1b67 commit 86a0dfe

File tree

5 files changed

+825
-0
lines changed

5 files changed

+825
-0
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Event-Sourced State: Systems Metrics
2+
3+
We report four SDK-attributable systems metrics for the event-sourced state management design described in Section 4.2, including its persistence and crash recovery paths. We extract real event payloads from 433 SWE-Bench Verified evaluation conversations (39,870 total events) and replay them through the SDK's production I/O code path on a local machine. The SDK does not instrument persist or replay timing internally, so storage metrics are measured directly from the traces while latency metrics are obtained by re-executing the same `LocalFileStore` lock-and-write path with the original payloads under a fixed deployment configuration.
4+
5+
## Metrics
6+
7+
1. **Persist latency per event / action cycle.** The wall-clock time to durably append a single event to the log. Each append acquires a file lock, serializes the event to JSON, and writes a new file. An action cycle comprises one ActionEvent write followed by one ObservationEvent write — the two persists that bracket every tool invocation.
8+
9+
2. **Replay time vs. log size.** The time to reconstruct in-memory state from the on-disk event log. This has two phases: index rebuild (listing the events directory and parsing filenames via regex) and full replay (reading and deserializing every event file). This cost is paid once on process startup or after a crash.
10+
11+
3. **Storage growth.** The cumulative on-disk footprint of the event log as a function of conversation length, broken down by event type. Since each event is an independent JSON file, total storage grows linearly with event count.
12+
13+
4. **Time-to-recover via replay after failures.** The end-to-end latency of the crash recovery path: load all persisted events, then scan in reverse for actions that lack a matching observation (unmatched-action detection, as implemented in `ConversationState.get_unmatched_actions()`). An unmatched action indicates the agent crashed mid-execution and must re-dispatch.
14+
15+
## Setup
16+
17+
**Workload:** Event payloads extracted from a full SWE-Bench Verified evaluation run (433 instances, `litellm_proxy` backend, max 500 iterations). Events range from 190B to 260KB, with a median of 1.5KB.
18+
**I/O path:** All persist measurements exercise the production code path — `LocalFileStore.lock()` followed by `LocalFileStore.write()` — with the original JSON payloads from the evaluation traces.
19+
20+
## Data
21+
22+
The evaluation traces used for these benchmarks are from a SWE-Bench Verified run (433 instances, SDK commit `cfe52af`, GitHub Actions run `21870831025`). To download:
23+
24+
```bash
25+
curl -L -o results.tar.gz \
26+
https://results.eval.all-hands.dev/swtbench/litellm_proxy-jade-spark-2862/21870831025/results.tar.gz
27+
tar xzf results.tar.gz
28+
```
29+
30+
After extraction, pass the inner run directory as `--eval-dir`. It should contain `conversations/` (with `.tar.gz` traces) and `output.jsonl`.
31+
32+
## Scripts
33+
34+
All scripts accept `--eval-dir <path>` pointing to the extracted evaluation run directory.
35+
36+
| Script | Metrics | Usage |
37+
|---|---|---|
38+
| `bench_persist_latency.py` | Persist latency per event / action cycle | `python bench_persist_latency.py --eval-dir <path>` |
39+
| `bench_replay_and_recovery.py` | Replay time vs. log size, time-to-recover | `python bench_replay_and_recovery.py --eval-dir <path>` |
40+
| `bench_storage_growth.py` | Storage growth and composition | `python bench_storage_growth.py --eval-dir <path>` |
41+
42+
---
43+
44+
## Results
45+
46+
### 1. Persist Latency per Event / Action Cycle
47+
48+
**Method:** Extract persisted event files from 29 sampled SWE-Bench conversations. Replay each through the `LocalFileStore.lock()` + `LocalFileStore.write()` path with the original JSON payloads.
49+
50+
#### Per-Event Persist Latency
51+
52+
| Event Type | N | Median | Mean | P95 | Median Size |
53+
|---|---|---|---|---|---|
54+
| SystemPromptEvent | 29 | 0.351ms | 0.374ms | 0.582ms | 24,500B |
55+
| MessageEvent | 29 | 0.201ms | 0.206ms | 0.261ms | 3,239B |
56+
| ActionEvent | 1,264 | 0.163ms | 0.175ms | 0.244ms | 1,071B |
57+
| ObservationEvent | 1,264 | 0.167ms | 0.180ms | 0.255ms | 2,254B |
58+
| ConversationStateUpdateEvent | 58 | 0.168ms | 0.172ms | 0.218ms | 191B |
59+
| **All Events** | **2,644** | **0.166ms** | **0.180ms** | **0.267ms** | **1,395B** |
60+
61+
#### Per Action Cycle (Action + Observation)
62+
63+
| Metric | Value |
64+
|---|---|
65+
| Median | 0.36ms |
66+
| Mean | 0.37ms |
67+
68+
---
69+
70+
### 2. Replay Time vs. Log Size
71+
72+
**Method:** Build event logs of increasing size from real payloads. Measure index rebuild (directory listing + filename regex parse) and full replay (read + JSON parse all events).
73+
74+
| Events | Storage | Index Rebuild | Full Replay |
75+
|---|---|---|---|
76+
| 10 | 36.4KB | 0.02ms | 0.30ms |
77+
| 25 | 57.5KB | 0.03ms | 0.58ms |
78+
| 50 | 122.1KB | 0.05ms | 1.21ms |
79+
| 100 | 227.0KB | 0.08ms | 2.28ms |
80+
| 200 | 576.2KB | 0.17ms | 4.89ms |
81+
| 500 | 2.0MB | 0.37ms | 14.26ms |
82+
| 1,000 | 4.3MB | 0.75ms | 29.49ms |
83+
| 1,500 | 8.2MB | 1.09ms | 48.06ms |
84+
85+
Replay scales linearly with event count. At the maximum observed conversation size in the evaluation (358 events), full replay completes in under 10ms.
86+
87+
---
88+
89+
### 3. Storage Growth
90+
91+
**Method:** Analyze all 433 SWE-Bench conversations. Measure per-conversation storage and breakdown by event type.
92+
93+
#### Conversation Size Distribution
94+
95+
| Metric | Min | P25 | Median | P75 | Max |
96+
|---|---|---|---|---|---|
97+
| Events | 22 | 64 | 82 | 108 | 358 |
98+
| Storage | 109.6KB || 380.0KB | 634.3KB | 3,357.0KB |
99+
100+
Mean events per conversation: 92.1 (stdev 39.9). Average event size: ~624 bytes. Storage grows linearly with event count.
101+
102+
#### Storage Composition by Event Type
103+
104+
| Event Type | Count | % Events | Total | % Storage | Avg Size |
105+
|---|---|---|---|---|---|
106+
| ObservationEvent | 19,065 | 47.8% | 177.1MB | 78.0% | 9.51KB |
107+
| ActionEvent | 19,069 | 47.8% | 38.3MB | 16.9% | 2.05KB |
108+
| SystemPromptEvent | 433 | 1.1% | 10.1MB | 4.5% | 23.93KB |
109+
| MessageEvent | 433 | 1.1% | 1.4MB | 0.6% | 3.29KB |
110+
| ConversationStateUpdateEvent | 866 | 2.2% | 0.2MB | 0.1% | 0.19KB |
111+
| **Total** | **39,870** | | **227.1MB** | | |
112+
113+
ObservationEvents (tool outputs) account for 78% of storage despite being only 48% of events by count.
114+
115+
---
116+
117+
### 4. Time-to-Recover via Replay After Failures
118+
119+
**Method:** Build event logs from real payloads, then measure the full recovery path: read all events + reverse scan for actions without matching observations (unmatched-action detection, as implemented in `ConversationState.get_unmatched_actions()`).
120+
121+
| Events | Storage | Time-to-Recover |
122+
|---|---|---|
123+
| 10 | 36.4KB | 0.64ms |
124+
| 25 | 57.5KB | 1.45ms |
125+
| 50 | 122.1KB | 2.71ms |
126+
| 100 | 227.0KB | 5.35ms |
127+
| 200 | 576.2KB | 10.70ms |
128+
| 500 | 2.0MB | 27.92ms |
129+
| 1,000 | 4.3MB | 57.50ms |
130+
| 1,500 | 8.2MB | 90.26ms |
131+
132+
Recovery includes full Pydantic deserialization of all events via `Event.model_validate_json()` and scanning in reverse for actions that lack a corresponding observation (indicating a crash mid-execution) via `ConversationState.get_unmatched_actions()`. At the median conversation size (82 events), recovery completes in ~5ms. At the largest observed conversation (358 events), recovery completes in under 20ms.
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Benchmark: Persist latency per event and per action cycle.
4+
5+
Extracts real event payloads from SWE-Bench evaluation conversation traces
6+
and replays them through the SDK's LocalFileStore lock-and-write path to
7+
measure per-event and per-cycle persist latency.
8+
9+
Usage:
10+
python bench_persist_latency.py --eval-dir <path-to-eval-run>
11+
"""
12+
13+
import argparse
14+
import gc
15+
import json
16+
import os
17+
import shutil
18+
import statistics
19+
import tempfile
20+
import time
21+
22+
from benchmark_utils import extract_conversation, read_event_files
23+
24+
from openhands.sdk.io import LocalFileStore
25+
26+
27+
EVENTS_DIR_NAME = "events"
28+
LOCK_FILE = "events/.eventlog.lock"
29+
30+
31+
def measure_persist_latencies(event_files: list[dict]) -> list[dict]:
32+
"""Replay the persist path EventLog.append() uses:
33+
lock -> write JSON file -> release lock
34+
35+
Uses LocalFileStore directly with real event payloads.
36+
"""
37+
tmpdir = tempfile.mkdtemp(prefix="bench_persist_")
38+
try:
39+
fs = LocalFileStore(tmpdir, cache_limit_size=len(event_files) + 100)
40+
41+
results = []
42+
for i, ef in enumerate(event_files):
43+
target_path = f"{EVENTS_DIR_NAME}/{ef['filename']}"
44+
45+
gc.disable()
46+
t0 = time.perf_counter()
47+
with fs.lock(LOCK_FILE, timeout=30.0):
48+
fs.write(target_path, ef["json_str"])
49+
t1 = time.perf_counter()
50+
gc.enable()
51+
52+
results.append(
53+
{
54+
"kind": ef["kind"],
55+
"size_bytes": ef["size_bytes"],
56+
"persist_ms": (t1 - t0) * 1000,
57+
"event_idx": i,
58+
}
59+
)
60+
return results
61+
finally:
62+
shutil.rmtree(tmpdir, ignore_errors=True)
63+
64+
65+
def main():
66+
import logging
67+
68+
logging.getLogger("openhands").setLevel(logging.ERROR)
69+
70+
parser = argparse.ArgumentParser(
71+
description="Benchmark persist latency per event/action cycle"
72+
)
73+
parser.add_argument(
74+
"--eval-dir",
75+
required=True,
76+
help="Path to evaluation run directory",
77+
)
78+
parser.add_argument(
79+
"--output",
80+
default="bench_persist_latency_results.json",
81+
help="Output JSON file path",
82+
)
83+
parser.add_argument(
84+
"--sample-step",
85+
type=int,
86+
default=15,
87+
help="Sample every Nth conversation (default: 15)",
88+
)
89+
args = parser.parse_args()
90+
91+
# Load instance metadata
92+
instances = {}
93+
with open(os.path.join(args.eval_dir, "output.jsonl")) as f:
94+
for line in f:
95+
d = json.loads(line)
96+
instances[d["instance_id"]] = d
97+
98+
conv_dir = os.path.join(args.eval_dir, "conversations")
99+
tarballs = sorted(os.listdir(conv_dir))
100+
sample_tarballs = tarballs[:: args.sample_step]
101+
print(f"Sampling {len(sample_tarballs)} of {len(tarballs)} conversations\n")
102+
103+
all_persist: list[dict] = []
104+
conv_summaries: list[dict] = []
105+
106+
for tarname in sample_tarballs:
107+
instance_id = tarname.replace(".tar.gz", "")
108+
instance_data = instances.get(instance_id)
109+
if not instance_data:
110+
continue
111+
112+
tarpath = os.path.join(conv_dir, tarname)
113+
tmpdir = tempfile.mkdtemp(prefix="bench_persist_")
114+
try:
115+
events_dir = extract_conversation(tarpath, tmpdir)
116+
if not events_dir:
117+
continue
118+
event_files = read_event_files(events_dir)
119+
if not event_files:
120+
continue
121+
122+
persist_results = measure_persist_latencies(event_files)
123+
all_persist.extend(persist_results)
124+
125+
# Per-cycle persist time (action + observation pairs)
126+
action_p = [r for r in persist_results if r["kind"] == "ActionEvent"]
127+
obs_p = [r for r in persist_results if r["kind"] == "ObservationEvent"]
128+
n_cycles = min(len(action_p), len(obs_p))
129+
cycle_persist = [
130+
action_p[i]["persist_ms"] + obs_p[i]["persist_ms"]
131+
for i in range(n_cycles)
132+
]
133+
134+
total_persist_ms = sum(r["persist_ms"] for r in persist_results)
135+
136+
conv_summaries.append(
137+
{
138+
"instance_id": instance_id,
139+
"n_events": len(event_files),
140+
"n_cycles": n_cycles,
141+
"total_persist_ms": total_persist_ms,
142+
"mean_cycle_persist_ms": (
143+
statistics.mean(cycle_persist) if cycle_persist else 0
144+
),
145+
}
146+
)
147+
n_ev = len(event_files)
148+
print(
149+
f" {instance_id[:50]:50s} events={n_ev:>4}"
150+
f" persist={total_persist_ms:>7.1f}ms"
151+
)
152+
153+
finally:
154+
shutil.rmtree(tmpdir, ignore_errors=True)
155+
156+
# --- Analysis ---
157+
print(f"\n{'=' * 70}")
158+
print("RESULTS: Persist Latency per Event / Action Cycle")
159+
print(f"{'=' * 70}")
160+
161+
by_kind: dict[str, list[dict]] = {}
162+
for r in all_persist:
163+
by_kind.setdefault(r["kind"], []).append(r)
164+
165+
print("\n--- Per-Event Persist Latency ---")
166+
header = (
167+
f" {'Event Type':<35} {'N':>5} {'Median':>10}"
168+
f" {'Mean':>10} {'P95':>10} {'MedSize':>10}"
169+
)
170+
print(header)
171+
print(f" {'-' * 80}")
172+
for kind in [
173+
"SystemPromptEvent",
174+
"MessageEvent",
175+
"ActionEvent",
176+
"ObservationEvent",
177+
"ConversationStateUpdateEvent",
178+
"AgentErrorEvent",
179+
]:
180+
if kind not in by_kind:
181+
continue
182+
entries = by_kind[kind]
183+
lats = sorted([e["persist_ms"] for e in entries])
184+
sizes = sorted([e["size_bytes"] for e in entries])
185+
n = len(lats)
186+
print(
187+
f" {kind:<35} {n:>5}"
188+
f" {lats[n // 2]:>9.3f}ms"
189+
f" {statistics.mean(lats):>9.3f}ms"
190+
f" {lats[int(n * 0.95)]:>9.3f}ms"
191+
f" {sizes[n // 2]:>8,}B"
192+
)
193+
194+
all_lats = sorted([r["persist_ms"] for r in all_persist])
195+
all_sizes = sorted([r["size_bytes"] for r in all_persist])
196+
n = len(all_lats)
197+
print(f" {'-' * 80}")
198+
print(
199+
f" {'ALL EVENTS':<35} {n:>5}"
200+
f" {all_lats[n // 2]:>9.3f}ms"
201+
f" {statistics.mean(all_lats):>9.3f}ms"
202+
f" {all_lats[int(n * 0.95)]:>9.3f}ms"
203+
f" {all_sizes[n // 2]:>8,}B"
204+
)
205+
206+
# Per action cycle
207+
print("\n--- Per Action Cycle (Action + Observation) ---")
208+
cycle_persists = [
209+
s["mean_cycle_persist_ms"] for s in conv_summaries if s["n_cycles"] > 0
210+
]
211+
med = statistics.median(cycle_persists)
212+
mean = statistics.mean(cycle_persists)
213+
print(f" Median per-cycle persist time: {med:.2f}ms")
214+
print(f" Mean per-cycle persist time: {mean:.2f}ms")
215+
216+
# Save
217+
with open(args.output, "w") as f:
218+
json.dump(
219+
{"per_event": all_persist, "conversations": conv_summaries},
220+
f,
221+
indent=2,
222+
)
223+
print(f"\nRaw data saved to {args.output}")
224+
225+
226+
if __name__ == "__main__":
227+
main()

0 commit comments

Comments
 (0)