Skip to content

Commit 28a2569

Browse files
committed
events observability on all buses
1 parent 6ef4334 commit 28a2569

File tree

2 files changed

+49
-8
lines changed

2 files changed

+49
-8
lines changed

crates/example-eventage-claw/src/main.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,29 @@ async fn main() -> anyhow::Result<()> {
197197
let default_group = groups_list.first().cloned().unwrap_or_default();
198198

199199
// ── Observability ─────────────────────────────────────────────────────────
200+
// Attach exporter + replay to EVERY bus (shared + all per-group) so that
201+
// conversation events appear alongside heartbeats and IPC events.
202+
let group_bus_list: Vec<eventage::EventBus> =
203+
group_buses.values().cloned().collect();
204+
200205
if let Some(log_path) = &args.log {
201206
match JsonlExporter::new(log_path).await {
202207
Ok(exporter) => {
203-
let observer = BusObserver::new(shared_bus.clone()).add_exporter(exporter);
204-
tokio::spawn(observer.run());
208+
let exporter = std::sync::Arc::new(exporter);
209+
// Shared bus (heartbeats, IPC, schedule events)
210+
tokio::spawn(
211+
BusObserver::new(shared_bus.clone())
212+
.add_exporter_arc(exporter.clone())
213+
.run(),
214+
);
215+
// Per-group buses (user messages, assistant replies, tool results)
216+
for bus in &group_bus_list {
217+
tokio::spawn(
218+
BusObserver::new(bus.clone())
219+
.add_exporter_arc(exporter.clone())
220+
.run(),
221+
);
222+
}
205223
if !tui_mode {
206224
eprintln!("Logging events to {}", log_path.display());
207225
}
@@ -212,6 +230,7 @@ async fn main() -> anyhow::Result<()> {
212230

213231
if args.replay {
214232
LiveReplayServer::new(shared_bus.clone())
233+
.with_buses(group_bus_list.iter().cloned())
215234
.port(args.replay_port)
216235
.serve_background();
217236
if !tui_mode {

src/replay.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::bus::EventBus;
3333
use crate::event::Event;
3434
use futures_util::stream;
3535
use std::{convert::Infallible, net::SocketAddr, sync::Arc};
36+
use tokio::sync::mpsc;
3637
use tower_http::cors::CorsLayer;
3738

3839
/// Embedded UI HTML shared between the live server and CLI binary.
@@ -52,19 +53,25 @@ pub const UI_HTML: &str = include_str!("ui.html");
5253
///
5354
/// Clients loading `/` automatically subscribe to `/events/stream` for real-time updates.
5455
pub struct LiveReplayServer {
55-
bus: EventBus,
56+
buses: Vec<EventBus>,
5657
port: u16,
5758
}
5859

5960
#[derive(Clone)]
6061
struct LiveState {
61-
bus: Arc<EventBus>,
62+
buses: Vec<Arc<EventBus>>,
6263
}
6364

6465
impl LiveReplayServer {
6566
/// Create a new server attached to `bus`.
6667
pub fn new(bus: EventBus) -> Self {
67-
Self { bus, port: 4567 }
68+
Self { buses: vec![bus], port: 4567 }
69+
}
70+
71+
/// Attach additional buses so all their events appear in the replay UI.
72+
pub fn with_buses(mut self, buses: impl IntoIterator<Item = EventBus>) -> Self {
73+
self.buses.extend(buses);
74+
self
6875
}
6976

7077
/// Override the listening port (default: `4567`).
@@ -86,7 +93,7 @@ impl LiveReplayServer {
8693
/// Start the server, blocking the current task until it exits.
8794
pub async fn serve(self) {
8895
let state = LiveState {
89-
bus: Arc::new(self.bus),
96+
buses: self.buses.into_iter().map(Arc::new).collect(),
9097
};
9198

9299
let app = Router::new()
@@ -120,13 +127,28 @@ async fn serve_ui() -> Html<&'static str> {
120127
}
121128

122129
async fn serve_snapshot(State(state): State<LiveState>) -> Json<Vec<Event>> {
123-
Json(state.bus.log().await)
130+
let mut all: Vec<Event> = Vec::new();
131+
for bus in &state.buses {
132+
all.extend(bus.log().await);
133+
}
134+
all.sort_by_key(|e| e.timestamp);
135+
Json(all)
124136
}
125137

126138
async fn serve_live_stream(
127139
State(state): State<LiveState>,
128140
) -> Sse<impl futures_util::stream::Stream<Item = Result<SseEvent, Infallible>>> {
129-
let rx = state.bus.subscribe();
141+
// Merge live events from all buses into one channel.
142+
let (tx, rx) = mpsc::unbounded_channel::<Event>();
143+
for bus in &state.buses {
144+
let mut bus_rx = bus.subscribe();
145+
let tx = tx.clone();
146+
tokio::spawn(async move {
147+
while let Some(event) = bus_rx.recv().await {
148+
let _ = tx.send(event);
149+
}
150+
});
151+
}
130152
let event_stream = stream::unfold(rx, |mut rx| async move {
131153
rx.recv().await.map(|event| {
132154
let data = serde_json::to_string(&event).unwrap_or_default();

0 commit comments

Comments
 (0)