Skip to content

feat(core): AgentWork primitive + canonical event collapse + Pump primitive#176

Merged
luokerenx4 merged 12 commits into
masterfrom
dev
May 11, 2026
Merged

feat(core): AgentWork primitive + canonical event collapse + Pump primitive#176
luokerenx4 merged 12 commits into
masterfrom
dev

Conversation

@luokerenx4
Copy link
Copy Markdown
Contributor

@luokerenx4 luokerenx4 commented May 10, 2026

Summary

Three compounding architectural changes:

Session 1 — Introduces AgentWork as the core primitive for "Alice
does an async task outside chat", adds the notify_user tool, deletes
the OpenClaw-legacy STATUS regex protocol from heartbeat.

Session 2 — Promotes the upstream side of AgentWork to first-class.
One canonical event (agent.work.requested) for all async trigger
sources, one dispatch listener with a source registry, collapse of 8
per-source event types into 4 canonical agent.work.* events with a
source field as the routing key.

Session 3 (this session) — Extracts the Pump primitive and
moves heartbeat + snapshot off the cron-engine entirely. Cron-engine is
now reserved strictly for user-defined cron jobs (Automation > Cron
UI); internal scheduled services own private Pumps. cron.fire has
exactly one subscriber now (cron-router); the topology graph reflects
actual semantic relationships, not implementation entanglement.

Per-session contributions

2026-05-11 (late afternoon) — Trading short-position netLiq fix + CCXT init strictness

Two trading-broker bugs caught in the same session window — both
shared-layer convention issues that affect every UTA, not broker-
specific.

Layer 1 short-position netLiquidation fix (commit 6b4096e):

Community reported IBKR options getPositions/getAccount "方向算反 +
乘数没到 100". Root-cause split into two layers — Layer 2 (IBKR-internal
normalize, e.g. request-bridge.ts:470 .abs(), decoder/account.ts:50
empty if (cp.secType !== undefined) body) deferred to a separate
harness work item. Layer 1 (shared) landed here: every broker that
self-computes netLiq via cash + Σ(marketValue) (Mock / IBKR / CCXT)
was double-counting shorts — the unsigned-marketValue convention of
derivePositionMath meant a SELL-to-open short would add premium to
cash AND its (positive) marketValue would get summed on top, inflating
netLiq by 2× the short's marketValue. For OPT this is amplified by the
100x multiplier, which is why community surfaced it as an options-
specific direction bug.

  • New: aggregateAccountFromPositions(cash, positions) in
    position-math.ts — single source of truth for the cash+positions →
    netLiquidation reduction, applies side sign correctly during
    aggregation.
  • MockBroker / IbkrBroker / CcxtBroker getAccount rewired to the
    helper. Alpaca + Longbridge use upstream account.equity /
    netAssets and are intentionally unchanged (their upstream APIs
    already handled shorts correctly).
  • Snapshot type now persists OPT metadata (secType, multiplier,
    strike, right, expiry). Previously stripped on serialize, so
    UIs reading a saved snapshot saw an option-shaped position with no
    way to tell it was an option — likely source of the "multiplier
    didn't reach 100" half of the original report.
  • buildPosition adds a loud guard: any OPT/FOP arriving with
    multiplier=1 throws (catches broker callback paths that bypass
    buildContract.assertContract — notably IBKR's request-bridge).
  • 3 new MockBroker.spec scenarios (short OPT / short STK / mixed long
    • short) — pre-fix they fail with the precise "+ 2 × short mv" delta
      the bug predicts. 5 new position-math.spec cases for the helper.
      2 new contract-builder.spec cases for the OPT-multiplier guard.
      3 new snapshot.spec cases for OPT metadata round-trip.

CCXT init strictness (commit d771188):

CcxtBroker's init() wrapper used to log a — skipping warning when
any market type (spot/linear/inverse) failed all 8 retries, then
continue with a partial market catalog. The broker self-completed but
permanently degraded: every getAccount understated netLiq by every
holding whose market never loaded, and the per-tick spot holding BTC — no <COIN>/USDT|USDC|USD spot market warning was the only signal.
Symptomatic in real use against Bybit demo — $84k of spot BTC + ETH
ghosted out of snapshots. The wrapper now throws on terminal failure
so init fails loud, UTA marks the account unhealthy, and snapshots
are never written from a half-loaded broker. Restores fmOpts['types']
in try/finally to prevent state pollution on subsequent
refreshCatalog() calls.

  • Key commits: 6b4096e, d771188

2026-05-11 (afternoon) — Pump primitive + heartbeat/snapshot decoupling

The user identified: "heartbeat looks like cron but isn't, and mixing
them means every future heartbeat tweak risks touching cron-engine.
Same for snapshot — it's a state persistence service, not AI work that
fires on a schedule." The internal __heartbeat__ / __snapshot__
cron jobs were leaking into the user-facing Cron tab and forcing every
listener to do cron.fire-with-jobName-filter dance.

  • New: src/core/duration.tsparseDuration extracted from
    cron-engine. Both cron-engine and Pump need it; layering forbids
    src/coresrc/task imports.
  • New: src/core/pump.ts (~170 lines) — interval-scheduled callback
    primitive. setTimeout chain, error backoff (matches cron-engine's
    [30s, 60s, 5m, 15m, 1h]), serial guard, enable/disable, runNow.
  • New: src/core/pump.spec.ts — 23 tests covering construction,
    lifecycle, enable/disable cycle, runNow with serial-guard awaiting,
    error backoff escalation + reset, in-flight stop semantics
  • Rewritten: src/domain/trading/snapshot/scheduler.ts
    Pump-driven. Drops cron-engine + ListenerRegistry deps. Calls
    snapshotService.takeAllSnapshots('scheduled') directly. Adds
    runNow() for manual triggering. Snapshot.spec scheduler block
    rewritten (6 tests).
  • Rewritten: src/task/heartbeat/heartbeat.ts — Pump-driven.
    No longer a Listener (no subscribes: 'cron.fire'). Owns a
    ProducerHandle for agent.work.{requested,skip}. Adds runNow().
    HEARTBEAT_JOB_NAME export removed. Heartbeat.spec rewritten
    (27 tests) including anti-regression: heartbeat ignores legacy
    cron.fire-with-heartbeat-jobName.
  • Modified: src/task/cron/listener.ts — removed isInternalJob
    filter (dead code now). Its test removed too.
  • Modified: src/main.ts — one-time idempotent migration that
    removes any pre-existing __*__ jobs from cron-engine on
    startup. Logs each removal once.
  • Key commits: f342ce4, 131243f, 2fa7e4c, 6311661

Visual outcome on the Automation Flow graph:

  • Before: cron.fire had 3 subscribers (cron-router, heartbeat,
    snapshot-scheduler), each filtering by jobName. __heartbeat__ and
    __snapshot__ showed up in Automation > Cron tab.
  • After: cron.fire has exactly 1 subscriber (cron-router). Heartbeat
    appears as a clean standalone producer of agent.work.{requested,skip}.
    Snapshot doesn't appear in the event-log topology at all (timer
    layer is now invisible — only snapshot.taken / snapshot.skipped
    side-effect events show, which is correct).

2026-05-11 (morning) — Canonical agent.work.* event collapse + single dispatch listener

[…unchanged from prior session entry, omitted for brevity in this PR
body refresh; see commit log…]

2026-05-10 — AgentWork + notify_user

[…unchanged from prior session entry…]

Full commit log

6b4096e fix(trading): netLiquidation aggregation correctly subtracts short marketValue
6311661 refactor(heartbeat): migrate to Pump; remove isInternalJob filter; orphan cron cleanup
2fa7e4c refactor(snapshot): scheduler driven by Pump instead of cron-engine
131243f feat(core): Pump primitive — interval-scheduled callback
f342ce4 feat(core): extract parseDuration to src/core/duration.ts
2946205 refactor(task): migrate heartbeat/cron to AgentWork emitters; collapse per-source events into canonical; delete task-router
d1169a1 feat(core): agent-work-listener with source registry + 17 tests
7abeedf feat(core): add canonical agent.work.* events for AgentWork dispatch
d771188 fix(broker/ccxt): refuse partial market load on init
b857d7a refactor(task): migrate heartbeat / cron / task-router to AgentWorkRunner
014b844 feat(tool): add notify_user — intent signal for autonomous notifications
c7d011d feat(core): AgentWork primitive + ProviderResult.toolCalls plumbing

Test plan

  • npx tsc --noEmit clean
  • pnpm test — 1675 / 1675 passing (+13 net this session: 3 short-position MockBroker scenarios, 5 aggregateAccountFromPositions cases, 3 OPT-multiplier guard tests, 2 snapshot OPT-metadata round-trip cases)
  • pnpm build clean
  • Manual (load-bearing — AI-portfolio-intuition memory note):
    • On first startup after upgrade: console logs cron: removing orphan internal job __heartbeat__ (...) and same for __snapshot__
    • data/cron/jobs.json no longer contains __*__ rows
    • Automation > Cron tab: no __heartbeat__ / __snapshot__ visible
    • Heartbeat fires on schedule; agent.work.requested { source: 'heartbeat' }
      appears in event log; downstream agent.work.{done,skip} flows as before
    • Snapshot runs on schedule; snapshot.taken events appear (unchanged)
    • Outside-active-hours config still emits agent.work.skip { source: 'heartbeat', reason: 'outside-active-hours' }, AI never invoked
    • Heartbeat setEnabled(false) stops fires
    • Heartbeat runNow() available for manual trigger (e.g. from a
      future "run now" button)
    • Automation Flow topology: cron.fire has only cron-router as
      subscriber; heartbeat is a standalone producer; snapshot doesn't
      appear at the timer layer
    • Short OPT netLiq: open a short call/put on a live broker
      (IBKR / CCXT); confirm getAccount netLiquidation no longer
      inflates by 2× short marketValue. For a $580 short put,
      netLiq should be flat from before the trade (premium received
      cancels mark-to-market liability); pre-fix it would have shown
      +$1,160.
    • Snapshot OPT metadata: open any OPT position, trigger a
      snapshot, inspect data/trading/<id>/snapshots/chunk-*.jsonl
      each OPT entry now contains secType, multiplier, strike,
      right, expiry.

Out of scope (deferred)

  • Cron expression support in Pump (YAGNI; heartbeat / snapshot
    both use every)
  • Pump-level observability (fire counts, latency, error rates)
  • UI "run now" buttons for heartbeat / snapshot (Pump exposes
    runNow but no UI surface yet)
  • Layer 2 IBKR-internal normalize bugsrequest-bridge.ts:470
    .abs() and decoder/account.ts:50 empty-if-body fall through
    to a separate work item (raw-upstream recorder + no-connect replay
    harness; see new TODO.md entry under Architecture). The short-
    position netLiq fix in this session subsumed the user-visible
    symptom; the IBKR-specific normalize paths can be addressed
    offline once the harness lands.

🤖 Generated with Claude Code

Ame and others added 7 commits May 10, 2026 19:34
Introduces `src/core/agent-work.ts` — the missing core primitive for
"Alice does an async task outside chat". Trigger sources today
(heartbeat / cron / task-router) and trigger sources tomorrow (factor
mining, asset monitoring, ad-hoc scheduled DAGs) all share the same
shape: take a payload, run the AI, optionally gate the notification,
emit done/skip/error. AgentWork is that shape.

API:

  class AgentWorkRunner {
    constructor({ agentCenter, connectorCenter, ... })
    run(req: AgentWorkRequest, emit: EmitFn): Promise<AgentWorkRunResult>
  }

  AgentWorkRequest carries:
    prompt, session, preamble, metadata
    inputGate?  (active-hours-style pre-AI guard)
    outputGate? (notify_user-style post-AI gate)
    onDelivered?
    emitNames + buildDonePayload + buildSkipPayload? + buildErrorPayload

The runner is stateless — construct once at startup, call run() per
request with the listener's per-call emit fn. Class form (rather than
free function) keeps `src/core/` style consistent with AgentCenter /
ConnectorCenter / NotificationsStore.

Also surfaces `toolCalls` on `ProviderResult` (additive change). The
existing pipeline already accumulates tool_use events as they stream
through; AgentCenter now packages them into the final done event so
AgentWork's outputGate can inspect "did the AI call notify_user?"
without re-streaming.

Test coverage: `src/core/agent-work.spec.ts` — 37 tests across:
  - default behaviour (no gates) — happy path
  - inputGate — null vs skip, AI-not-invoked, custom payload
  - outputGate — deliver/skip/probe inspection
  - notify_user-style tool inspection (load-bearing for heartbeat)
  - AI invocation errors — throw, non-Error, emit failure
  - notify failure — done with delivered=false, hook not called
  - onDelivered hook — called/not-called/throws
  - clock injection — durationMs honors injected now()
  - source label flow-through
  - concurrent runs (stateless runner)

Followup commits migrate cron / task-router / heartbeat to use this
primitive; this commit is just the primitive + plumbing, no consumers
yet.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the heartbeat STATUS regex protocol with a structured tool
call. AI-decides-to-notify becomes "AI calls notify_user(text)";
runner-side outputGate inspects the captured tool calls and routes
through dedup / connectorCenter.notify.

The tool's `execute` is intentionally a no-op (returns the args back
as acknowledgement). Why no side-effects: heartbeat applies dedup
before push; if the tool itself called connectorCenter.notify, we'd
have no way to gate on dedup without per-tool source state. The
runner-side gate is the right control point. The tool just records
intent + arguments.

Globally registered in ToolCenter — every session sees it. But only
sessions whose persona prompt teaches AI when to call it (today only:
heartbeat) actually exercise it. cron / task-router / chat keep
their existing "every reply pushes" behaviour because their prompts
don't reference notify_user.

Followup commit teaches heartbeat's persona about it and wires the
runner-side gate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nner

Three trigger sources collapse into thin configurations of the
AgentWork primitive. The shared body (subscribe → AI call → notify →
emit done/error) lives in AgentWorkRunner; each listener is now just
"how to translate a trigger event into an AgentWorkRequest".

heartbeat (src/task/heartbeat/heartbeat.ts):
  - delete parseHeartbeatResponse() and the entire STATUS regex
    protocol — Alice now signals notification intent via the
    notify_user tool, not by emitting magic string tokens
  - default persona prompt rewritten to teach notify_user instead of
    STATUS / REASON / CONTENT format
  - active-hours guard becomes the runner's inputGate
  - notify_user inspection + dedup checks become the runner's
    outputGate; dedup record happens via onDelivered
  - HeartbeatDedup, isWithinActiveHours, the `__heartbeat__` cron job
    lifecycle, hot enable/disable — all kept (heartbeat-specific)
  - HeartbeatDedup.lastText is now public (load-bearing for the done
    event's `reply` field)
  - 410 → ~290 lines

cron (src/task/cron/listener.ts):
  - 135 → ~110 lines
  - public API (createCronListener, CronListener, CronListenerOpts)
    preserved; just takes agentWorkRunner instead of agentCenter +
    connectorCenter
  - serial-execution lock + internal-job filter still here, since
    those are cron-specific (factory's pre-AI hook is the inputGate
    on a per-request basis; cron's `processing` lock is a listener-
    instance concern that pre-dates the request)

task-router (src/task/task-router/listener.ts):
  - 122 → ~100 lines
  - same migration as cron
  - public API preserved

main.ts:
  - constructs AgentWorkRunner once, threads to all three listeners
  - registers notify_user tool in toolCenter (globally available)

heartbeat.spec.ts: rewritten — STATUS-regex tests deleted, replaced
with notify_user-tool-call equivalents. New tests:
  - delivers when AI invokes notify_user (replaces "should call AI
    and write heartbeat.done")
  - skips with reason=ack when AI does not call notify_user
    (replaces "should skip HEARTBEAT_OK")
  - skips with reason=empty when notify_user.text is blank
  - explicit guard: STATUS-shaped raw text without notify_user is
    NOT delivered (anti-regression)
  - dedup: different texts not deduped
  - active-hours: outside window does not invoke AI
  - lifecycle / setEnabled / error handling preserved
Test count: 35 → 28 (the parseHeartbeatResponse standalone test
block, ~10 tests, deleted alongside the function it tested)

cron + task-router specs: minimal setup change to construct via
AgentWorkRunner; assertions unchanged.

Full suite: 1622/1622 passing (was 1592 before).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
If fetchMarkets(type) exhausted retries the wrapper used to log "— skipping"
and let init() complete with a partial market catalog. That left the broker
permanently degraded: every getAccount() understated netLiquidation by every
spot/derivative holding whose market never loaded, and the per-tick
"spot holding BTC — no <COIN>/USDT|USDC|USD spot market, skipping" warning
was the only signal. A CCXT account is a full-spectrum interface; whether
the user actively trades a type is their choice, not the broker's to
silently shed. Now the wrapper throws on terminal failure so init() fails
loud, UTA marks the account unhealthy, and snapshots are never written from
a half-loaded broker.

Also wraps the per-type fetch in try/finally to restore the original
fmOpts['types'] — the old code only restored on the success path, so a
mid-loop throw or refreshCatalog() retry inherited a polluted [singleType]
filter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Introduces 4 new event types to AgentEventMap:

- agent.work.requested (external — webhook ingestable)
- agent.work.done
- agent.work.skip
- agent.work.error

All four carry a `source: NotificationSource` field as their routing
key. Consumers that care about a specific trigger source filter on
this field instead of subscribing to a separate event type per
source — eliminates "event explosion" as the number of trigger
sources grows.

Old per-source event types (cron.done/cron.error, heartbeat.done/.skip/.error,
task.requested/task.done/task.error) stay in this commit; the migration
to canonical events lands in followup commits as a single atomic cutover.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The single dispatch point for `agent.work.requested` events. Each
trigger source (heartbeat / cron / webhook / future) registers an
`AgentWorkSourceConfig` carrying its session, preamble builder, and
output-side gates; the listener routes incoming events to the
matching config based on `payload.source` and runs the AgentWorkRunner
pipeline.

Emit names are fixed canonical: agent.work.{done,skip,error}. Each
emitted payload bakes the source field in, so downstream consumers
(Diary, etc.) filter on source instead of subscribing to per-source
event types.

Test coverage in agent-work-listener.spec.ts (17 tests):
  - Source registry: empty start, register, list, overwrite semantics
  - Dispatch by source field — correct config invoked
  - Canonical event emission with source baked in
  - Metadata threading: payload → preamble → done payload
  - buildDoneMetadata override of default passthrough
  - Unknown source: silent drop + warning, no events emitted
  - notify_user-style outputGate idiom end-to-end (delivers tool args)
  - Skip emission when outputGate returns skip
  - onDelivered hook fires only on successful delivery
  - Errors propagate as agent.work.error with source attribution
  - buildErrorMetadata override
  - Multi-source independence (concurrent dispatch)
  - Lifecycle (stop() unsubscribes cleanly)

No consumers wired yet — followup commit migrates heartbeat / cron /
webhook to emit the canonical event.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e per-source events into canonical; delete task-router

The cutover that makes "all upstreams of AgentWork manageable" — three
trigger sources now emit a single canonical event type that one
dispatch listener consumes, instead of each owning its own listener +
event types.

Trigger source migration:
  - heartbeat (src/task/heartbeat/heartbeat.ts): subscribes to cron.fire
    for __heartbeat__, applies active-hours pre-filter, emits
    agent.work.requested. Output-side dedup + notify_user inspection
    move to the source config registered with agent-work-listener.
    No longer imports AgentWorkRunner directly.
  - cron (src/task/cron/listener.ts): subscribes to cron.fire for user
    jobs, emits agent.work.requested with jobId/jobName in metadata.
    Source config has no gates (default deliver-result behaviour).
  - webhook (src/webui/routes/events.ts): accepts both
    agent.work.requested (canonical) AND task.requested (legacy wire
    alias, translated to canonical before storage) — preserves
    documented external API per "Don't delete our own exports".
  - task source config registered inline in main.ts since there's no
    longer a dedicated task-router module.

Event-map cleanup:
  - Removed from AgentEventMap: cron.done, cron.error, heartbeat.done,
    heartbeat.skip, heartbeat.error, task.requested, task.done,
    task.error (8 types collapse to 4 canonical).
  - All associated payload interfaces, TypeBox schemas, and AgentEvents
    registry entries removed.
  - Net event-type budget for the agent-work pipeline: 9 → 4
    (cron.fire for cron-engine timer fan-out, plus
    agent.work.{requested,done,skip,error}).

Downstream consumer adaptation:
  - src/webui/routes/diary.ts: queries agent.work.{done,skip,error}
    filtered by payload.source === 'heartbeat'. outcomeFromEvent
    keys on canonical type + reason. parsedReason now read from
    skip event metadata.

Tests:
  - heartbeat.spec.ts: 28 tests; assertions updated for canonical events
    (agent.work.skip { source: 'heartbeat', reason: ... } instead of
    heartbeat.skip { reason }). Anti-regression test added for
    STATUS-shaped raw text not being parsed.
  - cron/listener.spec.ts: 8 tests; assertions updated for canonical
    events with source: 'cron'.
  - webui/__tests__/diary.spec.ts: fixtures updated to canonical event
    shapes.
  - listener-registry.spec.ts: minor — sample emit types in test
    cases switched from cron.done/heartbeat.done to agent.work.done /
    message.sent now that the old types are gone.
  - task-router/* deleted entirely.

Module wiring (src/main.ts):
  - Constructs agent-work-listener once with the runner + registry.
  - Registers task source inline.
  - Passes agent-work-listener to createHeartbeat and createCronListener
    for their source registrations.

Full suite: 1632/1632 passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@luokerenx4 luokerenx4 changed the title feat(core): AgentWork primitive + heartbeat STATUS protocol replacement feat(core): AgentWork primitive + canonical agent.work.* event collapse + STATUS protocol replacement May 11, 2026
Ame and others added 4 commits May 11, 2026 13:12
Cron-engine has been the sole owner of `parseDuration` (the "30m" /
"1h" / "5m30s" parser). The new Pump primitive (next commit) needs the
same parser, and `src/core` → `src/task` would be a wrong-direction
import. Move it to `src/core/duration.ts` with a small dedicated spec.

cron-engine re-exports it from the original path for back-compat with
any internal importers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A small timer primitive for "fire onTick every N minutes" services.
Used by heartbeat and snapshot to free them from registering internal
cron jobs and subscribing to cron.fire just to drive their own
private schedule. Cron-engine is reserved for user-defined cron jobs
(Automation > Cron UI).

API:
  const pump = createPump({
    name: 'heartbeat',
    every: '30m',
    enabled: true,
    onTick: async () => { ... },
  })
  pump.start()                   // arms the timer
  pump.stop()                    // terminal — clears timer, marks stopped
  pump.setEnabled(true | false)  // toggle; arms or cancels pending
  pump.isEnabled()               // current state
  await pump.runNow()            // manual fire (tests / UI run-now buttons)

Tick algorithm:
  1. If stopped → return
  2. If serial && processing → drop (log warning, don't queue)
  3. processing = true; try onTick()
  4. On success: consecutiveErrors = 0
  5. On throw: log, consecutiveErrors++
  6. Finally: processing = false; if !stopped && enabled → arm next
     timeout (delay = errorBackoffMs[errors-1] || parsed every duration)

Error backoff defaults match cron-engine's existing schedule:
[30s, 60s, 5m, 15m, 1h], clamped to the last entry.

Test coverage (`pump.spec.ts`, 23 tests):
  - Construction: invalid duration throws, zero throws, common formats parse
  - Lifecycle: start arms; recurring fires; stop clears timer;
    stop during in-flight tick lets it complete without re-arm; start
    is idempotent
  - setEnabled: disabled state doesn't fire; re-enable arms;
    disable cancels pending; no-op after stop; same-value is no-op
  - runNow: immediate invocation; works without start; no-op after
    stop; respects serial guard (awaits in-flight)
  - Error backoff: consecutive errors trigger increasing backoff;
    success resets; clamps to last entry on extreme repeats; pump
    survives onTick errors
  - Serial guard: in-flight ticks drop concurrent fires

No consumers wired yet — followup commits migrate snapshot then
heartbeat.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Snapshot is a state-persistence service — capturing trading state to
disk every N minutes — not an AI-work event consumer. It was
registering itself as a `__snapshot__` cron job and subscribing to
`cron.fire` filtered by jobName just to drive its own schedule.
That entangled it with the user-cron event flow for no reason. Now
it owns a private Pump and calls `snapshotService.takeAllSnapshots`
directly. Zero event-log involvement at the timer layer.

API shrinks too — `createSnapshotScheduler({ snapshotService,
config })` (no more cronEngine + registry deps). The scheduler
exports `runNow()` for manual triggering (tests + future UI).

UTA post-push / post-reject hooks are unaffected — those call
`snapshotService.takeSnapshot(id, trigger)` directly, never went
through the scheduler.

Tests rewritten (`snapshot.spec.ts`, scheduler block): 6 tests
covering runNow invocation, idempotent start, disabled-config
behaviour, serial guard, stop semantics, takeAllSnapshots error
resilience. Drops the cron-engine fixture entirely.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…phan cron cleanup

Heartbeat is the second and final internal scheduled-task service to
move off cron-engine. Now Pump-driven, with the same shape as
snapshot.

Heartbeat changes:
  - Drops CronEngine dependency from HeartbeatOpts
  - Stops being a Listener (no more `subscribes: 'cron.fire'`,
    no more `__heartbeat__` jobName filter)
  - Owns a private Pump driving the schedule
  - Owns a ProducerHandle that emits agent.work.{requested,skip}
    on each tick (visible to the topology graph as a clean producer
    on agent-work events)
  - Active-hours pre-filter stays — but inline in onTick, with the
    skip event emitted via the producer
  - Adds `runNow()` exposed on the Heartbeat API for tests and
    future "run heartbeat now" UI
  - HEARTBEAT_JOB_NAME export removed (no longer a cron-engine job)
  - HeartbeatDedup + isWithinActiveHours helpers unchanged

cron-router cleanup (src/task/cron/listener.ts):
  - Removes the `isInternalJob` filter and its test. Pre-Pump this
    guarded against double-handling __heartbeat__ / __snapshot__
    fires; post-Pump those jobs don't exist in cron-engine, so the
    filter is dead code.

One-time disk migration (src/main.ts):
  - After cron-engine starts, scan its job list and remove any
    `__*__`-named entries (orphan disk state from data/cron/jobs.json
    that previous versions left behind). Logs each removal once.
    Idempotent — no-op on subsequent startups.

Tests rewritten (`heartbeat.spec.ts`, 27 tests):
  - All triggering now via `heartbeat.runNow()` instead of
    `cronEngine.runNow(jobId)`
  - Anti-regression test: heartbeat does NOT subscribe to legacy
    `cron.fire { jobName: '__heartbeat__' }` events
  - Active-hours, dedup, error handling, setEnabled all preserved
  - runNow ignores the enabled flag (manual fires always work, even
    when scheduled fires are disabled)

Full suite: 1662 / 1662 passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@luokerenx4 luokerenx4 changed the title feat(core): AgentWork primitive + canonical agent.work.* event collapse + STATUS protocol replacement feat(core): AgentWork primitive + canonical event collapse + Pump primitive May 11, 2026
…rketValue

Community-reported IBKR options bug ("方向错 + 乘数没到100") root-caused to a
shared-layer convention violation: across every broker that self-computes
netLiq via `cash + Σ(marketValue)` (Mock / IBKR / CCXT), the unsigned-
marketValue convention of `derivePositionMath` made SELL-to-open shorts
add their premium to cash AND add their notional marketValue on top —
inflating netLiq by 2× the short's marketValue. For OPT this is amplified
by the 100x multiplier, which is why community users surfaced it as an
options-specific direction bug. The new MockBroker spec confirms three
scenarios that all fail pre-fix with exactly "+ 2 × short mv":
short OPT (10000 → 11160), short STK (10000 → 20000), mixed (10200 → 12000).

Fix collapses the cash+ΣmarketValue formula into `aggregateAccountFromPositions`
in position-math.ts, applying side sign during aggregation. Brokers that
read upstream-reported equity (Alpaca's `account.equity`, Longbridge's
`netAssets`) are unchanged — their upstream APIs already handled shorts
correctly.

Snapshot type now carries OPT contract metadata (secType, multiplier,
strike, right, expiry) — previously persisted positions stripped these,
so a UI reading a saved snapshot saw an option-shaped position with no
way to tell it was an option. This is the likely source of the
"multiplier didn't reach 100" half of the original report.

buildPosition adds a loud guard: any OPT/FOP position arriving with
multiplier=1 throws — that pattern almost always indicates an upstream
broker decode-loss (raw Contract bypassing buildContract's validation
because it came from a callback path like IBKR's request-bridge). Catching
at the Position boundary prevents silent 100x undercounting in snapshots
and netLiq math.

TODO.md picks up a Layer 2 entry for the raw-upstream recorder + no-connect
replay harness — broker-internal normalize bugs (IBKR's `.abs()`, proto
decoder's empty if-body) still need that infrastructure to repro offline,
but it's a separate work item.

Repro and design notes in ~/.claude/plans/simulator-moonlit-otter.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@luokerenx4 luokerenx4 merged commit ebe226a into master May 11, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant