Skip to content

Commit 1eff323

Browse files
feat(observer): establish server-side observability primitives for TUI (#3)
* feat(observer): establish server-side observability primitives for TUI This introduces flow/execution correlation and server-backed storage so a TUI can observe runs in real-time (SSE) and fetch full execution details on demand. - flows: create/finish + TTL/limits + flow-scoped sequencing - executions: reqExecId, requestQueued, lifecycle tracking, error finalization - storage: GET execution detail by flowId/reqExecId - workspace: list .http files and requests for navigation - security: session variable redaction; SSE filter requirements under auth - core: createRemoteClient for “single import change” observability + ordered var sync - tests: update parse calls for includeDiagnostics typing; Biome formatting * fix(observer): deep-sanitize variables and serialize remote variable sync Sanitize nested variable structures including arrays to prevent sensitive-key leaks. Queue remote client variable updates to avoid races and ensure executions see latest values. Add regression tests for nested/array sanitization. * feat(app/server): avoid evicting active flows at capacity Only evict finished flows when MAX_FLOWS is reached; if none are finished, throw FLOW_LIMIT_REACHED instead of deleting an in-progress flow. This prevents active clients from hitting FlowNotFoundError mid-flow.
1 parent d2cf985 commit 1eff323

File tree

15 files changed

+1632
-80
lines changed

15 files changed

+1632
-80
lines changed

apps/webdocs/src/content/docs/reference/client.md

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
22
title: Client
3-
description: API reference for createClient and the Client interface
3+
description: API reference for createClient, createRemoteClient, and the Client interface
44
---
55

66
The Client is the primary interface for executing HTTP requests in @t-req/core.
@@ -41,6 +41,45 @@ interface RequestDefaults {
4141
}
4242
```
4343

44+
## createRemoteClient
45+
46+
Creates a client that routes requests through `treq serve` (the `@t-req/app` server) instead of executing locally. This is primarily used for **Observer Mode** (TUI / event streaming / execution store) while keeping the same high-level API (`run`, `runString`, `setVariable(s)`).
47+
48+
```typescript
49+
import { createRemoteClient } from '@t-req/core';
50+
51+
const client = createRemoteClient({
52+
serverUrl: 'http://localhost:4096',
53+
// Optional bearer token if the server requires it:
54+
// token: process.env.TREQ_TOKEN,
55+
variables: { baseUrl: 'https://api.example.com' }
56+
});
57+
58+
await client.run('./auth/login.http');
59+
client.setVariable('token', '...');
60+
await client.close(); // finishes the flow (best-effort; server TTL still applies)
61+
```
62+
63+
### Options
64+
65+
| Option | Type | Description |
66+
|--------|------|-------------|
67+
| `serverUrl` | `string` | Base URL for the server (e.g. `http://localhost:4096`). |
68+
| `token` | `string` | Bearer token (if the server is started with `--token`). |
69+
| `variables` | `Record<string, unknown>` | Initial variables (synced to the server session). |
70+
| `flowLabel` | `string` | Optional flow label (shown in Observer Mode). |
71+
| `flowMeta` | `Record<string, unknown>` | Optional flow metadata. |
72+
| `profile` | `string` | Optional server config profile. |
73+
| `timeout` | `number` | Default timeout in milliseconds. |
74+
75+
### Additional Methods
76+
77+
Remote clients also expose:
78+
79+
- `close(): Promise<void>`
80+
- `getSessionId(): string | undefined`
81+
- `getFlowId(): string | undefined`
82+
4483
## Client Methods
4584

4685
### run(path, options?)

packages/app/README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,17 @@ treq serve --stdio
136136
| `GET` | `/session/:id` | Get session state |
137137
| `PUT` | `/session/:id/variables` | Update session variables |
138138
| `DELETE` | `/session/:id` | Delete session |
139-
| `GET` | `/event` | SSE event stream |
139+
| `POST` | `/flows` | Create a flow (Observer Mode grouping) |
140+
| `POST` | `/flows/:flowId/finish` | Finish a flow (best-effort; server TTL will also clean up) |
141+
| `GET` | `/flows/:flowId/executions/:reqExecId` | Fetch stored execution detail (Observer Mode) |
142+
| `GET` | `/workspace/files` | List `.http` files in workspace |
143+
| `GET` | `/workspace/requests?path=...` | List requests within a `.http` file |
144+
| `GET` | `/event?sessionId=...` | SSE event stream filtered by session |
145+
| `GET` | `/event?flowId=...` | SSE event stream filtered by flow |
140146
| `GET` | `/doc` | OpenAPI documentation |
141147

148+
> When `--token` auth is enabled, `/event` requires either `sessionId` or `flowId` to prevent cross-session leakage.
149+
142150
#### Example: Python Client
143151

144152
```python
@@ -159,7 +167,7 @@ resp, _ := http.Post("http://localhost:4096/execute", "application/json",
159167
strings.NewReader(`{"content": "GET https://api.example.com/users"}`))
160168
```
161169

162-
See `examples/clients/` for complete client examples in Python, Go, and TypeScript.
170+
See `examples/app/` for complete client examples in Python, Go, and TypeScript.
163171

164172
### Help
165173

packages/app/src/server/app.ts

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,17 @@ import { createEventManager, type EventEnvelope } from './events';
99
import {
1010
capabilitiesRoute,
1111
configRoute,
12+
createFlowRoute,
1213
createSessionRoute,
1314
deleteSessionRoute,
1415
eventRoute,
1516
executeRoute,
17+
finishFlowRoute,
18+
getExecutionRoute,
1619
getSessionRoute,
1720
healthRoute,
21+
listWorkspaceFilesRoute,
22+
listWorkspaceRequestsRoute,
1823
parseRoute,
1924
updateSessionVariablesRoute
2025
} from './openapi';
@@ -184,17 +189,56 @@ export function createApp(config: ServerConfig) {
184189
return c.body(null, 204);
185190
});
186191

192+
// ============================================================================
193+
// Flow Endpoints (Observer Mode)
194+
// ============================================================================
195+
196+
app.openapi(createFlowRoute, (c) => {
197+
const request = c.req.valid('json');
198+
const result = service.createFlow(request);
199+
return c.json(result, 201);
200+
});
201+
202+
app.openapi(finishFlowRoute, (c) => {
203+
const { flowId } = c.req.valid('param');
204+
const result = service.finishFlow(flowId);
205+
return c.json(result, 200);
206+
});
207+
208+
app.openapi(getExecutionRoute, (c) => {
209+
const { flowId, reqExecId } = c.req.valid('param');
210+
const result = service.getExecution(flowId, reqExecId);
211+
return c.json(result, 200);
212+
});
213+
214+
// ============================================================================
215+
// Workspace Endpoints
216+
// ============================================================================
217+
218+
app.openapi(listWorkspaceFilesRoute, async (c) => {
219+
const { ignore } = c.req.valid('query');
220+
const additionalIgnore = ignore ? ignore.split(',').map((p) => p.trim()) : undefined;
221+
const result = await service.listWorkspaceFiles(additionalIgnore);
222+
return c.json(result, 200);
223+
});
224+
225+
app.openapi(listWorkspaceRequestsRoute, async (c) => {
226+
const { path } = c.req.valid('query');
227+
const result = await service.listWorkspaceRequests(path);
228+
return c.json(result, 200);
229+
});
230+
187231
// ============================================================================
188232
// Event Streaming (SSE)
189233
// ============================================================================
190234

191235
app.openapi(eventRoute, async (c) => {
192-
const { sessionId } = c.req.valid('query');
236+
const { sessionId, flowId } = c.req.valid('query');
193237

194-
// Require sessionId when auth is enabled (prevents cross-session leakage)
195-
if (config.token && !sessionId) {
238+
// Require sessionId or flowId when auth is enabled (prevents cross-session leakage)
239+
if (config.token && !sessionId && !flowId) {
196240
throw new ValidationError(
197-
'sessionId query parameter is required when authentication is enabled'
241+
'sessionId or flowId query parameter is required when authentication is enabled'
198242
);
199243
}
200244

@@ -213,11 +257,11 @@ export function createApp(config: ServerConfig) {
213257
stream.close();
214258
};
215259

216-
subscriberId = eventManager.subscribe(sessionId, send, close);
260+
subscriberId = eventManager.subscribe(sessionId, send, close, flowId);
217261

218262
// Send initial connection event
219263
stream.writeSSE({
220-
data: JSON.stringify({ connected: true, sessionId }),
264+
data: JSON.stringify({ connected: true, sessionId, flowId }),
221265
event: 'connected'
222266
});
223267

@@ -287,6 +331,8 @@ export function createApp(config: ServerConfig) {
287331
{ name: 'System', description: 'System endpoints for health checks and capabilities' },
288332
{ name: 'Requests', description: 'Parse and execute HTTP requests from .http files' },
289333
{ name: 'Sessions', description: 'Manage stateful sessions with variables and cookies' },
334+
{ name: 'Flows', description: 'Observer Mode - track and correlate request executions' },
335+
{ name: 'Workspace', description: 'Workspace discovery - list .http files and requests' },
290336
{ name: 'Events', description: 'Real-time event streaming via Server-Sent Events' }
291337
],
292338
externalDocs: {

packages/app/src/server/errors.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ export class SessionLimitReachedError extends TreqError {
4545
}
4646
}
4747

48+
export class FlowLimitReachedError extends TreqError {
49+
constructor(limit: number) {
50+
super('FLOW_LIMIT_REACHED', `Maximum flow limit (${limit}) reached`);
51+
this.name = 'FlowLimitReachedError';
52+
}
53+
}
54+
4855
export class ValidationError extends TreqError {
4956
constructor(message: string) {
5057
super('VALIDATION_ERROR', message);
@@ -94,6 +101,27 @@ export class ContentOrPathRequiredError extends TreqError {
94101
}
95102
}
96103

104+
export class FlowNotFoundError extends TreqError {
105+
constructor(id: string) {
106+
super('FLOW_NOT_FOUND', `Flow '${id}' not found`);
107+
this.name = 'FlowNotFoundError';
108+
}
109+
}
110+
111+
export class ExecutionNotFoundError extends TreqError {
112+
constructor(flowId: string, reqExecId: string) {
113+
super('EXECUTION_NOT_FOUND', `Execution '${reqExecId}' not found in flow '${flowId}'`);
114+
this.name = 'ExecutionNotFoundError';
115+
}
116+
}
117+
118+
export class FileNotFoundError extends TreqError {
119+
constructor(path: string) {
120+
super('FILE_NOT_FOUND', `File '${path}' not found`);
121+
this.name = 'FileNotFoundError';
122+
}
123+
}
124+
97125
// ============================================================================
98126
// Status Code Mapping - OpenCode pattern
99127
// ============================================================================
@@ -108,8 +136,12 @@ type HttpStatusCode = 400 | 403 | 404 | 429 | 500;
108136
export function getStatusForError(err: Error): HttpStatusCode {
109137
if (err instanceof SessionNotFoundError) return 404;
110138
if (err instanceof RequestNotFoundError) return 404;
139+
if (err instanceof FlowNotFoundError) return 404;
140+
if (err instanceof ExecutionNotFoundError) return 404;
141+
if (err instanceof FileNotFoundError) return 404;
111142
if (err instanceof PathOutsideWorkspaceError) return 403;
112143
if (err instanceof SessionLimitReachedError) return 429;
144+
if (err instanceof FlowLimitReachedError) return 429;
113145
if (err instanceof ValidationError) return 400;
114146
if (err instanceof ContentOrPathRequiredError) return 400;
115147
if (err instanceof RequestIndexOutOfRangeError) return 400;

packages/app/src/server/events.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ export type EventEnvelope = {
77
ts: number;
88
runId: string;
99
sessionId?: string;
10+
flowId?: string;
11+
reqExecId?: string;
1012
seq: number;
1113
payload: { type: string } & Record<string, unknown>;
1214
};
@@ -18,6 +20,7 @@ export type EventEnvelope = {
1820
export type EventSubscriber = {
1921
id: string;
2022
sessionId?: string;
23+
flowId?: string;
2124
send: (event: EventEnvelope) => void;
2225
close: () => void;
2326
};
@@ -62,10 +65,11 @@ export function createEventManager() {
6265
function subscribe(
6366
sessionId: string | undefined,
6467
send: (event: EventEnvelope) => void,
65-
close: () => void
68+
close: () => void,
69+
flowId?: string
6670
): string {
6771
const id = generateId();
68-
subscribers.set(id, { id, sessionId, send, close });
72+
subscribers.set(id, { id, sessionId, flowId, send, close });
6973
return id;
7074
}
7175

@@ -78,18 +82,34 @@ export function createEventManager() {
7882
runId: string,
7983
event: { type: string } & Record<string, unknown>
8084
): void {
85+
// Extract flowId and reqExecId from the event if present
86+
const eventFlowId = event.flowId as string | undefined;
87+
const eventReqExecId = event.reqExecId as string | undefined;
88+
const eventSeq = typeof event.seq === 'number' ? event.seq : undefined;
89+
8190
const envelope: EventEnvelope = {
8291
type: event.type,
8392
ts: Date.now(),
8493
runId,
8594
sessionId,
86-
seq: getNextSeq(runId),
95+
flowId: eventFlowId,
96+
reqExecId: eventReqExecId,
97+
// Prefer flow-scoped seq when provided by producer (service).
98+
// Fallback to run-scoped sequencing for legacy non-flow events.
99+
seq: eventSeq ?? getNextSeq(runId),
87100
payload: event
88101
};
89102

90-
// Send to all subscribers that match the session (or all if no session filter)
103+
// Send to subscribers that match the filters
91104
for (const subscriber of subscribers.values()) {
92-
if (subscriber.sessionId === undefined || subscriber.sessionId === sessionId) {
105+
// Check session filter
106+
const sessionMatches =
107+
subscriber.sessionId === undefined || subscriber.sessionId === sessionId;
108+
109+
// Check flow filter
110+
const flowMatches = subscriber.flowId === undefined || subscriber.flowId === eventFlowId;
111+
112+
if (sessionMatches && flowMatches) {
93113
try {
94114
subscriber.send(envelope);
95115
} catch {

0 commit comments

Comments
 (0)