Skip to content

Commit be1e042

Browse files
committed
feat(evals): wire StudyManager into server startup
- Add configurable stream/topic names to IggyEventLog::with_stream() - Initialize eval system with separate "evals" Iggy stream - Create TursoEvalStorage with local SQLite at ~/.local/share/vibes/eval.db - Start EvalProjectionConsumer as background task - Wire StudyManager into AppState for eval CLI commands Fixes "Error: Eval studies not enabled" when running eval commands.
1 parent b13f3f3 commit be1e042

File tree

4 files changed

+217
-23
lines changed

4 files changed

+217
-23
lines changed

docs/board/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@
256256
- [m39-feat-02-longitudinal-metrics](stages/done/stories/m39-feat-02-longitudinal-metrics.md)
257257
- [m39-feat-03-eval-storage](stages/done/stories/m39-feat-03-eval-storage.md)
258258
- [m39-feat-04-study-lifecycle](stages/done/stories/m39-feat-04-study-lifecycle.md)
259+
- [m39-feat-07-wire-study-manager](stages/done/stories/m39-feat-07-wire-study-manager.md)
259260
- [m40-feat-01-observe-crate-skeleton](stages/done/stories/m40-feat-01-observe-crate-skeleton.md)
260261
- [m40-feat-02-opentelemetry-setup](stages/done/stories/m40-feat-02-opentelemetry-setup.md)
261262
- [m40-feat-03-trace-context](stages/done/stories/m40-feat-03-trace-context.md)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
---
2+
id: m39-feat-07
3+
title: Wire StudyManager into server startup
4+
type: feat
5+
status: done
6+
priority: high
7+
epics: [evals]
8+
depends: [m39-feat-04]
9+
estimate: 2h
10+
milestone: 39-eval-core
11+
---
12+
13+
# Wire StudyManager into server startup
14+
15+
## Summary
16+
17+
The `StudyManager` from vibes-evals is never instantiated in the server. All `AppState` constructors set `study_manager: None`, causing "Eval studies not enabled" errors when using eval CLI commands.
18+
19+
## Problem
20+
21+
```rust
22+
// vibes-server/src/state.rs - all constructors do this:
23+
study_manager: None, // Never wired up!
24+
```
25+
26+
The vibes-evals crate has all the components:
27+
- `StudyManager` - CQRS manager for study lifecycle
28+
- `TursoEvalStorage` - SQLite projection for queries
29+
- `EvalProjectionConsumer` - Background task to process events
30+
31+
But they're not connected to the server startup.
32+
33+
## Solution
34+
35+
Wire up the eval system in `AppState::new_with_iggy()`:
36+
37+
1. Create a separate Iggy stream for `StoredEvalEvent`
38+
2. Create `TursoEvalStorage` with local SQLite database
39+
3. Create `EvalProjectionConsumer` and spawn as background task
40+
4. Create `StudyManager` with event log and storage
41+
5. Set `study_manager: Some(Arc::new(manager))`
42+
43+
## Implementation
44+
45+
### 1. Add dependency
46+
47+
```toml
48+
# vibes-server/Cargo.toml
49+
vibes-evals = { path = "../vibes-evals" }
50+
```
51+
52+
### 2. Create eval event log
53+
54+
Use a separate Iggy stream for eval events to keep them isolated from main vibes events.
55+
56+
### 3. Wire up in new_with_iggy()
57+
58+
```rust
59+
// Create eval storage (SQLite for projection)
60+
let eval_db_path = data_dir.join("eval.db");
61+
let eval_storage = Arc::new(TursoEvalStorage::new_local(&eval_db_path).await?);
62+
63+
// Create eval event log (separate stream)
64+
let eval_event_log = Arc::new(/* Iggy stream for StoredEvalEvent */);
65+
66+
// Create and start projection consumer
67+
let consumer = EvalProjectionConsumer::new(eval_event_log.clone(), eval_storage.clone());
68+
tokio::spawn(async move { consumer.run().await });
69+
70+
// Create study manager
71+
let study_manager = Some(Arc::new(StudyManager::new(eval_event_log, eval_storage)));
72+
```
73+
74+
## Acceptance Criteria
75+
76+
- [x] `vibes eval study status` works without "not enabled" error
77+
- [x] `vibes eval study start` creates a study (StudyManager wired with event log)
78+
- [x] Studies persist across server restarts (SQLite at ~/.local/share/vibes/eval.db)
79+
- [x] Projection consumer processes events correctly (spawned as background task)

vibes-iggy/src/iggy_log.rs

Lines changed: 63 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ pub struct IggyEventLog<E> {
6161
/// The Iggy client for sending messages.
6262
client: IggyClient,
6363

64+
/// Stream name for this event log.
65+
stream_name: String,
66+
67+
/// Topic name for this event log.
68+
topic_name: String,
69+
6470
/// Buffer for events during disconnect.
6571
reconnect_buffer: RwLock<Vec<E>>,
6672

@@ -78,12 +84,29 @@ impl<E> IggyEventLog<E>
7884
where
7985
E: Serialize + for<'de> Deserialize<'de> + Send + Sync + Clone + Partitionable + 'static,
8086
{
81-
/// Create a new IggyEventLog.
87+
/// Create a new IggyEventLog with default stream/topic names.
88+
///
89+
/// Uses "vibes" stream and "events" topic. For custom names, use `with_stream()`.
8290
///
8391
/// The manager should be started before calling this.
8492
/// Call `connect()` to establish the connection.
8593
#[must_use]
8694
pub fn new(manager: Arc<IggyManager>) -> Self {
95+
Self::with_stream(
96+
manager,
97+
topics::STREAM_NAME.to_string(),
98+
topics::EVENTS_TOPIC.to_string(),
99+
)
100+
}
101+
102+
/// Create a new IggyEventLog with custom stream and topic names.
103+
///
104+
/// Use this when you need a separate event stream (e.g., for eval events).
105+
///
106+
/// The manager should be started before calling this.
107+
/// Call `connect()` to establish the connection.
108+
#[must_use]
109+
pub fn with_stream(manager: Arc<IggyManager>, stream_name: String, topic_name: String) -> Self {
87110
let client = IggyClient::builder()
88111
.with_tcp()
89112
.with_server_address(manager.connection_address())
@@ -93,6 +116,8 @@ where
93116
Self {
94117
manager,
95118
client,
119+
stream_name,
120+
topic_name,
96121
reconnect_buffer: RwLock::new(Vec::new()),
97122
high_water_mark: AtomicU64::new(0),
98123
connected: RwLock::new(false),
@@ -120,15 +145,15 @@ where
120145

121146
// Get or create stream
122147
let streams = self.client.get_streams().await?;
123-
let stream_id = Identifier::named(topics::STREAM_NAME)
148+
let stream_id = Identifier::named(&self.stream_name)
124149
.map_err(|e| Error::Iggy(format!("Invalid stream name: {}", e)))?;
125-
let stream_exists = streams.iter().any(|s| s.name == topics::STREAM_NAME);
150+
let stream_exists = streams.iter().any(|s| s.name == self.stream_name);
126151

127152
if stream_exists {
128-
debug!("Stream '{}' already exists", topics::STREAM_NAME);
153+
debug!("Stream '{}' already exists", self.stream_name);
129154
} else {
130-
match self.client.create_stream(topics::STREAM_NAME).await {
131-
Ok(_) => info!("Created stream '{}'", topics::STREAM_NAME),
155+
match self.client.create_stream(&self.stream_name).await {
156+
Ok(_) => info!("Created stream '{}'", self.stream_name),
132157
Err(e) if is_already_exists_error(&e) => {
133158
debug!("Stream already exists (concurrent creation)");
134159
}
@@ -141,7 +166,7 @@ where
141166
.client
142167
.create_topic(
143168
&stream_id,
144-
topics::EVENTS_TOPIC,
169+
&self.topic_name,
145170
topics::PARTITION_COUNT,
146171
CompressionAlgorithm::None,
147172
None, // replication_factor
@@ -150,7 +175,7 @@ where
150175
)
151176
.await
152177
{
153-
Ok(_) => info!("Created topic '{}'", topics::EVENTS_TOPIC),
178+
Ok(_) => info!("Created topic '{}'", self.topic_name),
154179
Err(e) if is_already_exists_error(&e) => {
155180
debug!("Topic already exists");
156181
}
@@ -159,7 +184,7 @@ where
159184

160185
// Query the topic to get the actual message count
161186
// This initializes high_water_mark correctly on server restart
162-
let topic_id = Identifier::named(topics::EVENTS_TOPIC)
187+
let topic_id = Identifier::named(&self.topic_name)
163188
.map_err(|e| Error::Iggy(format!("Invalid topic name: {}", e)))?;
164189
if let Some(topic_details) = self.client.get_topic(&stream_id, &topic_id).await? {
165190
let message_count = topic_details.messages_count;
@@ -207,9 +232,9 @@ where
207232
})?;
208233

209234
// Send to Iggy
210-
let stream_id = Identifier::named(topics::STREAM_NAME)
235+
let stream_id = Identifier::named(&self.stream_name)
211236
.map_err(|e| Error::Iggy(format!("Invalid stream name: {}", e)))?;
212-
let topic_id = Identifier::named(topics::EVENTS_TOPIC)
237+
let topic_id = Identifier::named(&self.topic_name)
213238
.map_err(|e| Error::Iggy(format!("Invalid topic name: {}", e)))?;
214239

215240
let mut messages = [message];
@@ -264,9 +289,9 @@ where
264289
///
265290
/// Call this before reading historical events to ensure all data is visible.
266291
pub async fn flush_to_disk(&self) -> Result<()> {
267-
let stream_id = Identifier::named(topics::STREAM_NAME)
292+
let stream_id = Identifier::named(&self.stream_name)
268293
.map_err(|e| Error::Iggy(format!("Invalid stream name: {}", e)))?;
269-
let topic_id = Identifier::named(topics::EVENTS_TOPIC)
294+
let topic_id = Identifier::named(&self.topic_name)
270295
.map_err(|e| Error::Iggy(format!("Invalid topic name: {}", e)))?;
271296

272297
debug!("Flushing Iggy server buffer to disk with fsync");
@@ -280,6 +305,16 @@ where
280305
debug!("Iggy buffer flushed successfully");
281306
Ok(())
282307
}
308+
309+
/// Get the stream name for this event log.
310+
pub fn stream_name(&self) -> &str {
311+
&self.stream_name
312+
}
313+
314+
/// Get the topic name for this event log.
315+
pub fn topic_name(&self) -> &str {
316+
&self.topic_name
317+
}
283318
}
284319

285320
#[async_trait]
@@ -346,6 +381,8 @@ where
346381
Ok(Box::new(IggyEventConsumer::new(
347382
consumer_client,
348383
group.to_string(),
384+
self.stream_name.clone(),
385+
self.topic_name.clone(),
349386
)))
350387
}
351388

@@ -365,6 +402,10 @@ where
365402
pub struct IggyEventConsumer<E> {
366403
client: IggyClient,
367404
group: String,
405+
/// Stream name for this consumer.
406+
stream_name: String,
407+
/// Topic name for this consumer.
408+
topic_name: String,
368409
/// Unique consumer ID for this instance (avoids Iggy's cached offset issue).
369410
consumer_id: u32,
370411
/// Current read position in partition 0.
@@ -403,14 +444,16 @@ impl<E> IggyEventConsumer<E>
403444
where
404445
E: for<'de> Deserialize<'de> + Send + Clone + 'static,
405446
{
406-
fn new(client: IggyClient, group: String) -> Self {
447+
fn new(client: IggyClient, group: String, stream_name: String, topic_name: String) -> Self {
407448
// Generate a unique consumer ID per instance to avoid Iggy's cached offset issue.
408449
// Even explicit PollingStrategy::offset(N) doesn't override cached offsets for
409450
// consumer IDs that have stored offsets in Iggy.
410451
let consumer_id = generate_unique_consumer_id();
411452
Self {
412453
client,
413454
group,
455+
stream_name,
456+
topic_name,
414457
consumer_id,
415458
offset: 0,
416459
committed_offset: 0,
@@ -434,9 +477,9 @@ where
434477
max_count,
435478
"Polling: about to request from offset"
436479
);
437-
let stream_id = Identifier::named(topics::STREAM_NAME)
480+
let stream_id = Identifier::named(&self.stream_name)
438481
.map_err(|e| Error::Iggy(format!("Invalid stream name: {}", e)))?;
439-
let topic_id = Identifier::named(topics::EVENTS_TOPIC)
482+
let topic_id = Identifier::named(&self.topic_name)
440483
.map_err(|e| Error::Iggy(format!("Invalid topic name: {}", e)))?;
441484
// Use a unique numeric consumer ID per instance to avoid Iggy's cached offset issue.
442485
// Even PollingStrategy::first() doesn't override cached offsets for shared consumer IDs.
@@ -504,9 +547,9 @@ where
504547
}
505548

506549
async fn commit(&mut self, _offset: Offset) -> Result<()> {
507-
let stream_id = Identifier::named(topics::STREAM_NAME)
550+
let stream_id = Identifier::named(&self.stream_name)
508551
.map_err(|e| Error::Iggy(format!("Invalid stream name: {}", e)))?;
509-
let topic_id = Identifier::named(topics::EVENTS_TOPIC)
552+
let topic_id = Identifier::named(&self.topic_name)
510553
.map_err(|e| Error::Iggy(format!("Invalid topic name: {}", e)))?;
511554
let consumer =
512555
Consumer::new(Identifier::named(&self.group).map_err(|e| Error::Iggy(e.to_string()))?);
@@ -521,9 +564,9 @@ where
521564
}
522565

523566
async fn seek(&mut self, position: SeekPosition) -> Result<()> {
524-
let stream_id = Identifier::named(topics::STREAM_NAME)
567+
let stream_id = Identifier::named(&self.stream_name)
525568
.map_err(|e| Error::Iggy(format!("Invalid stream name: {}", e)))?;
526-
let topic_id = Identifier::named(topics::EVENTS_TOPIC)
569+
let topic_id = Identifier::named(&self.topic_name)
527570
.map_err(|e| Error::Iggy(format!("Invalid topic name: {}", e)))?;
528571

529572
match position {

0 commit comments

Comments
 (0)