Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ the item when done — git log is the history.

## Architecture

- [ ] Broker raw-upstream recorder + no-connect replay harness (Layer 2
bug debug surface). When a community user reports a broker-specific
normalize bug — IBKR's `request-bridge.ts:470 .abs()`, the proto
decoder's empty `if (cp.secType !== undefined)` body, a hypothetical
CCXT `entryPrice` mis-parse — code-reading alone is slow and
imprecise. Add a dev-mode raw-upstream recorder per broker (IBKR:
EWrapper callback args; CCXT: `fetchBalance` / `fetchPositions`
return values; Alpaca: REST response bodies), append-only JSONL to
`data/trading/<id>/upstream/session-<timestamp>.jsonl`. Pair with a
replay tool that constructs the broker without network (currently
`init()` forces connect) and re-fires the recorded events through
the same normalize pipeline, returning `getPositions` /
`getAccount`. Prerequisite: factor `init()` to allow no-connect
construction for IBKR / CCXT / Alpaca. Lets future broker-bug
diagnosis happen offline against a recorded session instead of
requiring live broker re-attachment. Companion harness ideas in
`~/.claude/plans/simulator-moonlit-otter.md`.
- [ ] Extract `derivePositionMath(raw): { marketValue, unrealizedPnL }`
shared util. Today's IBroker contract requires every broker's
`getPositions` to multiply by `multiplier` when computing
Expand Down
13 changes: 13 additions & 0 deletions src/ai-providers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,23 @@ export type ProviderEvent =

// ==================== Types ====================

/** A tool the AI invoked during this generation. Captured by AgentCenter
* as `tool_use` events stream through the pipeline. Used by AgentWork's
* outputGate to detect intent-signal tools like `notify_user`. */
export interface ToolCallSummary {
id: string
name: string
input: unknown
}

export interface ProviderResult {
text: string
media: MediaAttachment[]
mediaUrls?: string[]
/** Tool calls observed during this generation, in invocation order.
* AgentCenter populates this when it synthesizes the final done event;
* individual providers don't need to fill it themselves. */
toolCalls?: ReadonlyArray<ToolCallSummary>
}

// ==================== GenerateOpts ====================
Expand Down
10 changes: 9 additions & 1 deletion src/core/agent-center.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/

import type { AskOptions, ProviderResult, ProviderEvent, GenerateOpts } from './ai-provider-manager.js'
import type { ToolCallSummary } from '../ai-providers/types.js'
import type { ResolvedProfile } from './config.js'
import { GenerateRouter, StreamableResult } from './ai-provider-manager.js'
import { resolveProfile, resolveCredential } from './config.js'
Expand Down Expand Up @@ -131,6 +132,11 @@ export class AgentCenter {
let currentAssistantBlocks: ContentBlock[] = []
let currentUserBlocks: ContentBlock[] = []
let finalResult: ProviderResult | null = null
// Tool calls observed during this generation, captured for the final
// done event so AgentWork (and any other consumer awaiting the
// ProviderResult) can inspect what the AI invoked without having to
// re-stream the events themselves.
const toolCalls: ToolCallSummary[] = []

for await (const event of source) {
switch (event.type) {
Expand All @@ -143,6 +149,7 @@ export class AgentCenter {
// Unified logging — all providers get this now
logToolCall(event.name, event.input)
this.toolCallLog?.start(event.id, event.name, event.input, session.id)
toolCalls.push({ id: event.id, name: event.name, input: event.input })
currentAssistantBlocks.push({
type: 'tool_use',
id: event.id,
Expand Down Expand Up @@ -227,14 +234,15 @@ export class AgentCenter {
]
await session.appendAssistant(finalBlocks, provider.providerTag)

// 9. Yield done with merged media
// 9. Yield done with merged media + observed tool calls
const mediaUrls = mediaBlocks.map(b => (b as { type: 'image'; url: string }).url)
yield {
type: 'done',
result: {
text: finalResult.text,
media: allMedia,
mediaUrls,
toolCalls,
},
}
}
Expand Down
132 changes: 71 additions & 61 deletions src/core/agent-event.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import type { AgentEventMap } from './agent-event.js'

describe('AgentEventSchemas', () => {
const expectedTypes: (keyof AgentEventMap)[] = [
'cron.fire', 'cron.done', 'cron.error',
'heartbeat.done', 'heartbeat.skip', 'heartbeat.error',
'cron.fire',
'message.received', 'message.sent',
'task.requested', 'task.done', 'task.error',
'agent.work.requested', 'agent.work.done', 'agent.work.skip', 'agent.work.error',
]

it('should have a schema for every key in AgentEventMap', () => {
Expand Down Expand Up @@ -46,93 +45,95 @@ describe('validateEventPayload', () => {
})).toThrow(/Invalid payload.*cron\.fire/)
})

// -- cron.done --
it('should accept valid cron.done payload', () => {
expect(() => validateEventPayload('cron.done', {
jobId: 'abc', jobName: 'test', reply: 'ok', durationMs: 100,
// -- message.received --
it('should accept valid message.received payload', () => {
expect(() => validateEventPayload('message.received', {
channel: 'web', to: 'default', prompt: 'hello',
})).not.toThrow()
})

// -- cron.error --
it('should accept valid cron.error payload', () => {
expect(() => validateEventPayload('cron.error', {
jobId: 'abc', jobName: 'test', error: 'boom', durationMs: 50,
// -- message.sent --
it('should accept valid message.sent payload', () => {
expect(() => validateEventPayload('message.sent', {
channel: 'web', to: 'default', prompt: 'hello', reply: 'hi', durationMs: 300,
})).not.toThrow()
})

// -- heartbeat.done --
it('should accept valid heartbeat.done payload', () => {
expect(() => validateEventPayload('heartbeat.done', {
reply: 'all good', reason: 'CHAT_YES', durationMs: 200, delivered: true,
})).not.toThrow()
it('should reject message.sent with missing reply', () => {
expect(() => validateEventPayload('message.sent', {
channel: 'web', to: 'default', prompt: 'hello', durationMs: 300,
})).toThrow(/Invalid payload.*message\.sent/)
})

// -- heartbeat.skip --
it('should accept heartbeat.skip with optional parsedReason', () => {
expect(() => validateEventPayload('heartbeat.skip', {
reason: 'ack', parsedReason: 'All systems normal.',
// -- agent.work.requested --
it('should accept valid agent.work.requested payload', () => {
expect(() => validateEventPayload('agent.work.requested', {
source: 'task',
prompt: 'investigate',
})).not.toThrow()
})

it('should accept heartbeat.skip without parsedReason', () => {
expect(() => validateEventPayload('heartbeat.skip', {
reason: 'outside-active-hours',
it('should accept agent.work.requested with metadata', () => {
expect(() => validateEventPayload('agent.work.requested', {
source: 'cron',
prompt: 'check market',
metadata: { jobId: 'abc', jobName: 'daily' },
})).not.toThrow()
})

it('should reject heartbeat.skip with missing reason', () => {
expect(() => validateEventPayload('heartbeat.skip', {
parsedReason: 'something',
})).toThrow(/Invalid payload.*heartbeat\.skip/)
it('should reject agent.work.requested with unknown source', () => {
expect(() => validateEventPayload('agent.work.requested', {
source: 'bogus',
prompt: 'x',
})).toThrow(/Invalid payload.*agent\.work\.requested/)
})

// -- heartbeat.error --
it('should accept valid heartbeat.error payload', () => {
expect(() => validateEventPayload('heartbeat.error', {
error: 'timeout', durationMs: 5000,
})).not.toThrow()
it('should reject agent.work.requested without prompt', () => {
expect(() => validateEventPayload('agent.work.requested', {
source: 'task',
})).toThrow(/Invalid payload.*agent\.work\.requested/)
})

// -- message.received --
it('should accept valid message.received payload', () => {
expect(() => validateEventPayload('message.received', {
channel: 'web', to: 'default', prompt: 'hello',
// -- agent.work.done --
it('should accept valid agent.work.done payload', () => {
expect(() => validateEventPayload('agent.work.done', {
source: 'heartbeat',
reply: 'BTC alert',
durationMs: 200,
delivered: true,
})).not.toThrow()
})

// -- message.sent --
it('should accept valid message.sent payload', () => {
expect(() => validateEventPayload('message.sent', {
channel: 'web', to: 'default', prompt: 'hello', reply: 'hi', durationMs: 300,
})).not.toThrow()
it('should reject agent.work.done with missing delivered field', () => {
expect(() => validateEventPayload('agent.work.done', {
source: 'heartbeat',
reply: 'x',
durationMs: 100,
})).toThrow(/Invalid payload.*agent\.work\.done/)
})

it('should reject message.sent with missing reply', () => {
expect(() => validateEventPayload('message.sent', {
channel: 'web', to: 'default', prompt: 'hello', durationMs: 300,
})).toThrow(/Invalid payload.*message\.sent/)
})

// -- task.* --
it('should accept valid task.requested payload', () => {
expect(() => validateEventPayload('task.requested', {
prompt: 'check overnight moves',
// -- agent.work.skip --
it('should accept valid agent.work.skip payload', () => {
expect(() => validateEventPayload('agent.work.skip', {
source: 'heartbeat',
reason: 'outside-active-hours',
})).not.toThrow()
})

it('should reject task.requested without prompt', () => {
expect(() => validateEventPayload('task.requested', {})).toThrow(/Invalid payload.*task\.requested/)
})

it('should accept valid task.done payload', () => {
expect(() => validateEventPayload('task.done', {
prompt: 'hi', reply: 'ok', durationMs: 120,
it('should accept agent.work.skip with arbitrary metadata', () => {
expect(() => validateEventPayload('agent.work.skip', {
source: 'heartbeat',
reason: 'duplicate',
metadata: { parsedReason: 'BTC alert (first 80 chars)' },
})).not.toThrow()
})

it('should accept valid task.error payload', () => {
expect(() => validateEventPayload('task.error', {
prompt: 'hi', error: 'boom', durationMs: 50,
// -- agent.work.error --
it('should accept valid agent.work.error payload', () => {
expect(() => validateEventPayload('agent.work.error', {
source: 'cron',
error: 'AI down',
durationMs: 5,
})).not.toThrow()
})

Expand All @@ -146,4 +147,13 @@ describe('validateEventPayload', () => {
it('should pass for unregistered type with null payload', () => {
expect(() => validateEventPayload('unknown.type', null)).not.toThrow()
})

// -- legacy types (now removed from internal map but accepted on webhook wire) --
it('legacy task.requested type is no longer in AgentEventMap', () => {
// The webhook layer handles wire-level legacy alias translation.
// Validation against the canonical type happens after translation.
expect(AgentEventSchemas).not.toHaveProperty('task.requested')
expect(AgentEventSchemas).not.toHaveProperty('heartbeat.done')
expect(AgentEventSchemas).not.toHaveProperty('cron.done')
})
})
Loading
Loading