Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion apps/webdocs/src/content/docs/reference/client.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: Client
description: API reference for createClient and the Client interface
description: API reference for createClient, createRemoteClient, and the Client interface
---

The Client is the primary interface for executing HTTP requests in @t-req/core.
Expand Down Expand Up @@ -41,6 +41,45 @@ interface RequestDefaults {
}
```

## createRemoteClient

Creates a client that routes requests through `treq serve` (the `@t-req/app` server) instead of executing locally. This is primarily used for **Observer Mode** (TUI / event streaming / execution store) while keeping the same high-level API (`run`, `runString`, `setVariable(s)`).

```typescript
import { createRemoteClient } from '@t-req/core';

const client = createRemoteClient({
serverUrl: 'http://localhost:4096',
// Optional bearer token if the server requires it:
// token: process.env.TREQ_TOKEN,
variables: { baseUrl: 'https://api.example.com' }
});

await client.run('./auth/login.http');
client.setVariable('token', '...');
await client.close(); // finishes the flow (best-effort; server TTL still applies)
```

### Options

| Option | Type | Description |
|--------|------|-------------|
| `serverUrl` | `string` | Base URL for the server (e.g. `http://localhost:4096`). |
| `token` | `string` | Bearer token (if the server is started with `--token`). |
| `variables` | `Record<string, unknown>` | Initial variables (synced to the server session). |
| `flowLabel` | `string` | Optional flow label (shown in Observer Mode). |
| `flowMeta` | `Record<string, unknown>` | Optional flow metadata. |
| `profile` | `string` | Optional server config profile. |
| `timeout` | `number` | Default timeout in milliseconds. |

### Additional Methods

Remote clients also expose:

- `close(): Promise<void>`
- `getSessionId(): string | undefined`
- `getFlowId(): string | undefined`

## Client Methods

### run(path, options?)
Expand Down
12 changes: 10 additions & 2 deletions packages/app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,17 @@ treq serve --stdio
| `GET` | `/session/:id` | Get session state |
| `PUT` | `/session/:id/variables` | Update session variables |
| `DELETE` | `/session/:id` | Delete session |
| `GET` | `/event` | SSE event stream |
| `POST` | `/flows` | Create a flow (Observer Mode grouping) |
| `POST` | `/flows/:flowId/finish` | Finish a flow (best-effort; server TTL will also clean up) |
| `GET` | `/flows/:flowId/executions/:reqExecId` | Fetch stored execution detail (Observer Mode) |
| `GET` | `/workspace/files` | List `.http` files in workspace |
| `GET` | `/workspace/requests?path=...` | List requests within a `.http` file |
| `GET` | `/event?sessionId=...` | SSE event stream filtered by session |
| `GET` | `/event?flowId=...` | SSE event stream filtered by flow |
| `GET` | `/doc` | OpenAPI documentation |

> When `--token` auth is enabled, `/event` requires either `sessionId` or `flowId` to prevent cross-session leakage.

#### Example: Python Client

```python
Expand All @@ -159,7 +167,7 @@ resp, _ := http.Post("http://localhost:4096/execute", "application/json",
strings.NewReader(`{"content": "GET https://api.example.com/users"}`))
```

See `examples/clients/` for complete client examples in Python, Go, and TypeScript.
See `examples/app/` for complete client examples in Python, Go, and TypeScript.

### Help

Expand Down
58 changes: 52 additions & 6 deletions packages/app/src/server/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ import { createEventManager, type EventEnvelope } from './events';
import {
capabilitiesRoute,
configRoute,
createFlowRoute,
createSessionRoute,
deleteSessionRoute,
eventRoute,
executeRoute,
finishFlowRoute,
getExecutionRoute,
getSessionRoute,
healthRoute,
listWorkspaceFilesRoute,
listWorkspaceRequestsRoute,
parseRoute,
updateSessionVariablesRoute
} from './openapi';
Expand Down Expand Up @@ -184,17 +189,56 @@ export function createApp(config: ServerConfig) {
return c.body(null, 204);
});

// ============================================================================
// Flow Endpoints (Observer Mode)
// ============================================================================

app.openapi(createFlowRoute, (c) => {
const request = c.req.valid('json');
const result = service.createFlow(request);
return c.json(result, 201);
});

app.openapi(finishFlowRoute, (c) => {
const { flowId } = c.req.valid('param');
const result = service.finishFlow(flowId);
return c.json(result, 200);
});

app.openapi(getExecutionRoute, (c) => {
const { flowId, reqExecId } = c.req.valid('param');
const result = service.getExecution(flowId, reqExecId);
return c.json(result, 200);
});

// ============================================================================
// Workspace Endpoints
// ============================================================================

app.openapi(listWorkspaceFilesRoute, async (c) => {
const { ignore } = c.req.valid('query');
const additionalIgnore = ignore ? ignore.split(',').map((p) => p.trim()) : undefined;
const result = await service.listWorkspaceFiles(additionalIgnore);
return c.json(result, 200);
});

app.openapi(listWorkspaceRequestsRoute, async (c) => {
const { path } = c.req.valid('query');
const result = await service.listWorkspaceRequests(path);
return c.json(result, 200);
});

// ============================================================================
// Event Streaming (SSE)
// ============================================================================

app.openapi(eventRoute, async (c) => {
const { sessionId } = c.req.valid('query');
const { sessionId, flowId } = c.req.valid('query');

// Require sessionId when auth is enabled (prevents cross-session leakage)
if (config.token && !sessionId) {
// Require sessionId or flowId when auth is enabled (prevents cross-session leakage)
if (config.token && !sessionId && !flowId) {
throw new ValidationError(
'sessionId query parameter is required when authentication is enabled'
'sessionId or flowId query parameter is required when authentication is enabled'
);
}
Comment on lines +238 to 243
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Consider stronger enforcement when auth is enabled

When config.token is set, the code requires sessionId OR flowId, but doesn't validate that provided values exist. An attacker could subscribe to arbitrary sessionId/flowId values without verification.

Suggested change
// Require sessionId or flowId when auth is enabled (prevents cross-session leakage)
if (config.token && !sessionId && !flowId) {
throw new ValidationError(
'sessionId query parameter is required when authentication is enabled'
'sessionId or flowId query parameter is required when authentication is enabled'
);
}
// Require sessionId or flowId when auth is enabled (prevents cross-session leakage)
if (config.token && !sessionId && !flowId) {
throw new ValidationError(
'sessionId or flowId query parameter is required when authentication is enabled'
);
}
// Verify sessionId/flowId exist if provided (prevent enumeration)
if (sessionId && !service.getSessions().has(sessionId)) {
throw new SessionNotFoundError(sessionId);
}
if (flowId && !service.getFlows().has(flowId)) {
throw new FlowNotFoundError(flowId);
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/app/src/server/app.ts
Line: 238:243

Comment:
**logic:** Consider stronger enforcement when auth is enabled

When `config.token` is set, the code requires `sessionId` OR `flowId`, but doesn't validate that provided values exist. An attacker could subscribe to arbitrary sessionId/flowId values without verification.

```suggestion
    // Require sessionId or flowId when auth is enabled (prevents cross-session leakage)
    if (config.token && !sessionId && !flowId) {
      throw new ValidationError(
        'sessionId or flowId query parameter is required when authentication is enabled'
      );
    }
    
    // Verify sessionId/flowId exist if provided (prevent enumeration)
    if (sessionId && !service.getSessions().has(sessionId)) {
      throw new SessionNotFoundError(sessionId);
    }
    if (flowId && !service.getFlows().has(flowId)) {
      throw new FlowNotFoundError(flowId);
    }
```

How can I resolve this? If you propose a fix, please make it concise.


Expand All @@ -213,11 +257,11 @@ export function createApp(config: ServerConfig) {
stream.close();
};

subscriberId = eventManager.subscribe(sessionId, send, close);
subscriberId = eventManager.subscribe(sessionId, send, close, flowId);

// Send initial connection event
stream.writeSSE({
data: JSON.stringify({ connected: true, sessionId }),
data: JSON.stringify({ connected: true, sessionId, flowId }),
event: 'connected'
});

Expand Down Expand Up @@ -287,6 +331,8 @@ export function createApp(config: ServerConfig) {
{ name: 'System', description: 'System endpoints for health checks and capabilities' },
{ name: 'Requests', description: 'Parse and execute HTTP requests from .http files' },
{ name: 'Sessions', description: 'Manage stateful sessions with variables and cookies' },
{ name: 'Flows', description: 'Observer Mode - track and correlate request executions' },
{ name: 'Workspace', description: 'Workspace discovery - list .http files and requests' },
{ name: 'Events', description: 'Real-time event streaming via Server-Sent Events' }
],
externalDocs: {
Expand Down
32 changes: 32 additions & 0 deletions packages/app/src/server/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export class SessionLimitReachedError extends TreqError {
}
}

export class FlowLimitReachedError extends TreqError {
constructor(limit: number) {
super('FLOW_LIMIT_REACHED', `Maximum flow limit (${limit}) reached`);
this.name = 'FlowLimitReachedError';
}
}

export class ValidationError extends TreqError {
constructor(message: string) {
super('VALIDATION_ERROR', message);
Expand Down Expand Up @@ -94,6 +101,27 @@ export class ContentOrPathRequiredError extends TreqError {
}
}

export class FlowNotFoundError extends TreqError {
constructor(id: string) {
super('FLOW_NOT_FOUND', `Flow '${id}' not found`);
this.name = 'FlowNotFoundError';
}
}

export class ExecutionNotFoundError extends TreqError {
constructor(flowId: string, reqExecId: string) {
super('EXECUTION_NOT_FOUND', `Execution '${reqExecId}' not found in flow '${flowId}'`);
this.name = 'ExecutionNotFoundError';
}
}

export class FileNotFoundError extends TreqError {
constructor(path: string) {
super('FILE_NOT_FOUND', `File '${path}' not found`);
this.name = 'FileNotFoundError';
}
}

// ============================================================================
// Status Code Mapping - OpenCode pattern
// ============================================================================
Expand All @@ -108,8 +136,12 @@ type HttpStatusCode = 400 | 403 | 404 | 429 | 500;
export function getStatusForError(err: Error): HttpStatusCode {
if (err instanceof SessionNotFoundError) return 404;
if (err instanceof RequestNotFoundError) return 404;
if (err instanceof FlowNotFoundError) return 404;
if (err instanceof ExecutionNotFoundError) return 404;
if (err instanceof FileNotFoundError) return 404;
if (err instanceof PathOutsideWorkspaceError) return 403;
if (err instanceof SessionLimitReachedError) return 429;
if (err instanceof FlowLimitReachedError) return 429;
if (err instanceof ValidationError) return 400;
if (err instanceof ContentOrPathRequiredError) return 400;
if (err instanceof RequestIndexOutOfRangeError) return 400;
Expand Down
30 changes: 25 additions & 5 deletions packages/app/src/server/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ export type EventEnvelope = {
ts: number;
runId: string;
sessionId?: string;
flowId?: string;
reqExecId?: string;
seq: number;
payload: { type: string } & Record<string, unknown>;
};
Expand All @@ -18,6 +20,7 @@ export type EventEnvelope = {
export type EventSubscriber = {
id: string;
sessionId?: string;
flowId?: string;
send: (event: EventEnvelope) => void;
close: () => void;
};
Expand Down Expand Up @@ -62,10 +65,11 @@ export function createEventManager() {
function subscribe(
sessionId: string | undefined,
send: (event: EventEnvelope) => void,
close: () => void
close: () => void,
flowId?: string
): string {
const id = generateId();
subscribers.set(id, { id, sessionId, send, close });
subscribers.set(id, { id, sessionId, flowId, send, close });
return id;
}

Expand All @@ -78,18 +82,34 @@ export function createEventManager() {
runId: string,
event: { type: string } & Record<string, unknown>
): void {
// Extract flowId and reqExecId from the event if present
const eventFlowId = event.flowId as string | undefined;
const eventReqExecId = event.reqExecId as string | undefined;
const eventSeq = typeof event.seq === 'number' ? event.seq : undefined;

const envelope: EventEnvelope = {
type: event.type,
ts: Date.now(),
runId,
sessionId,
seq: getNextSeq(runId),
flowId: eventFlowId,
reqExecId: eventReqExecId,
// Prefer flow-scoped seq when provided by producer (service).
// Fallback to run-scoped sequencing for legacy non-flow events.
seq: eventSeq ?? getNextSeq(runId),
payload: event
};

// Send to all subscribers that match the session (or all if no session filter)
// Send to subscribers that match the filters
for (const subscriber of subscribers.values()) {
if (subscriber.sessionId === undefined || subscriber.sessionId === sessionId) {
// Check session filter
const sessionMatches =
subscriber.sessionId === undefined || subscriber.sessionId === sessionId;

// Check flow filter
const flowMatches = subscriber.flowId === undefined || subscriber.flowId === eventFlowId;

if (sessionMatches && flowMatches) {
try {
subscriber.send(envelope);
} catch {
Expand Down
Loading