diff --git a/rivetkit-typescript/packages/workflow-engine/QUICKSTART.md b/rivetkit-typescript/packages/workflow-engine/QUICKSTART.md new file mode 100644 index 0000000000..0d1826ae6b --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/QUICKSTART.md @@ -0,0 +1,428 @@ +# Workflow Engine Quickstart + +A durable execution engine for TypeScript. Write long-running, fault-tolerant workflows as regular async functions that can survive process restarts, crashes, and deployments. + +## Overview + +The workflow engine enables reliable execution through: + +1. **History Tracking** - Every operation is recorded in persistent storage +2. **Replay** - On restart, operations replay from history instead of re-executing +3. **Deterministic Execution** - Same inputs produce the same execution path + +## Installation + +```bash +npm install @rivetkit/workflow-engine +``` + +## Quick Example + +```typescript +import { runWorkflow, Loop, type WorkflowContextInterface } from "@rivetkit/workflow-engine"; + +async function orderWorkflow(ctx: WorkflowContextInterface, orderId: string) { + // Steps are durable - if this crashes after payment, it won't charge twice on restart + const payment = await ctx.step("process-payment", async () => { + return await chargeCard(orderId); + }); + + // Wait for shipping confirmation (external signal) + const tracking = await ctx.listen("wait-shipping", "shipment-confirmed"); + + // Send notification + await ctx.step("notify-customer", async () => { + await sendEmail(orderId, tracking); + }); + + return { orderId, payment, tracking }; +} + +// Run the workflow +const handle = runWorkflow("order-123", orderWorkflow, "order-123", driver); + +// Send a signal from external system +await handle.signal("shipment-confirmed", "TRACK-456"); + +// Wait for completion +const result = await handle.result; +``` + +## Core Concepts + +### The Driver + +The workflow engine requires an `EngineDriver` implementation for persistence and scheduling. Each workflow instance operates on an isolated KV namespace. + +```typescript +interface EngineDriver { + // KV Operations + get(key: Uint8Array): Promise; + set(key: Uint8Array, value: Uint8Array): Promise; + delete(key: Uint8Array): Promise; + deletePrefix(prefix: Uint8Array): Promise; + list(prefix: Uint8Array): Promise; // MUST be sorted + batch(writes: KVWrite[]): Promise; // Should be atomic + + // Scheduling + setAlarm(workflowId: string, wakeAt: number): Promise; + clearAlarm(workflowId: string): Promise; + readonly workerPollInterval: number; +} +``` + +### Running Workflows + +```typescript +import { runWorkflow } from "@rivetkit/workflow-engine"; + +const handle = runWorkflow( + "workflow-id", // Unique ID for this workflow instance + myWorkflow, // Your workflow function + { input: "data" }, // Input passed to workflow + driver // Your EngineDriver implementation +); + +// The handle provides methods to interact with the running workflow +await handle.result; // Wait for completion/yield +await handle.signal("name", data); // Send a signal +await handle.wake(); // Wake immediately +handle.evict(); // Request graceful shutdown +await handle.cancel(); // Cancel permanently +await handle.getState(); // Get current state +await handle.getOutput(); // Get output if completed +``` + +## Features + +### Steps + +Steps execute arbitrary async code. Results are persisted and replayed on restart. + +```typescript +// Simple form +const result = await ctx.step("fetch-user", async () => { + return await fetchUser(userId); +}); + +// With configuration +const result = await ctx.step({ + name: "external-api", + maxRetries: 5, // Default: 3 + retryBackoffBase: 200, // Default: 100ms + retryBackoffMax: 60000, // Default: 30000ms + timeout: 10000, // Default: 30000ms (0 to disable) + ephemeral: false, // Default: false - batch writes + run: async () => { + return await callExternalApi(); + }, +}); +``` + +**Retry Behavior:** +- Regular errors trigger automatic retry with exponential backoff +- `CriticalError` bypasses retry logic for unrecoverable errors +- After exhausting retries, `StepExhaustedError` is thrown + +```typescript +import { CriticalError } from "@rivetkit/workflow-engine"; + +await ctx.step("validate", async () => { + if (!isValid(input)) { + throw new CriticalError("Invalid input - no point retrying"); + } + return processInput(input); +}); +``` + +### Loops + +Loops maintain durable state across iterations with periodic checkpointing. + +```typescript +import { Loop } from "@rivetkit/workflow-engine"; + +const total = await ctx.loop({ + name: "process-batches", + state: { cursor: null, count: 0 }, // Initial state + commitInterval: 10, // Checkpoint every 10 iterations + run: async (ctx, state) => { + const batch = await ctx.step("fetch", () => fetchBatch(state.cursor)); + + if (!batch.items.length) { + return Loop.break(state.count); // Exit with final value + } + + await ctx.step("process", () => processBatch(batch.items)); + + return Loop.continue({ // Continue with new state + cursor: batch.nextCursor, + count: state.count + batch.items.length, + }); + }, +}); +``` + +**Simple loops** (no state needed): + +```typescript +const result = await ctx.loop("my-loop", async (ctx) => { + // ... do work + if (done) return Loop.break(finalValue); + return Loop.continue(undefined); +}); +``` + +### Sleep + +Pause workflow execution for a duration or until a specific time. + +```typescript +// Sleep for duration +await ctx.sleep("wait-5-min", 5 * 60 * 1000); + +// Sleep until timestamp +await ctx.sleepUntil("wait-midnight", midnightTimestamp); +``` + +Short sleeps (< `driver.workerPollInterval`) wait in memory. Longer sleeps yield to the scheduler and set an alarm for wake-up. + +### Signals (Listen) + +Wait for external events delivered via `handle.signal()`. + +```typescript +// Wait for a single signal +const data = await ctx.listen("payment", "payment-completed"); + +// Wait for N signals +const items = await ctx.listenN("batch", "item-added", 10); + +// Wait with timeout (returns null on timeout) +const result = await ctx.listenWithTimeout( + "api-response", + "response-received", + 30000 +); + +// Wait until timestamp (returns null on timeout) +const result = await ctx.listenUntil( + "api-response", + "response-received", + deadline +); + +// Wait for up to N signals with timeout +const items = await ctx.listenNWithTimeout( + "batch", + "item-added", + 10, // max items + 60000 // timeout ms +); + +// Wait for up to N signals until timestamp +const items = await ctx.listenNUntil( + "batch", + "item-added", + 10, // max items + deadline // timestamp +); +``` + +**Signal delivery:** Signals are loaded once at workflow start. If a signal is sent during execution, the workflow yields and picks it up on the next run. + +### Join (Parallel - Wait All) + +Execute multiple branches in parallel and wait for all to complete. + +```typescript +const results = await ctx.join("fetch-all", { + user: { + run: async (ctx) => { + return await ctx.step("get-user", () => fetchUser(userId)); + } + }, + posts: { + run: async (ctx) => { + return await ctx.step("get-posts", () => fetchPosts(userId)); + } + }, + notifications: { + run: async (ctx) => { + return await ctx.step("get-notifs", () => fetchNotifications(userId)); + } + }, +}); + +// results.user, results.posts, results.notifications are all available +// Type-safe: each branch output type is preserved +``` + +If any branch fails, all errors are collected into a `JoinError`: + +```typescript +import { JoinError } from "@rivetkit/workflow-engine"; + +try { + await ctx.join("risky", { /* branches */ }); +} catch (error) { + if (error instanceof JoinError) { + console.log("Failed branches:", Object.keys(error.errors)); + } +} +``` + +### Race (Parallel - First Wins) + +Execute multiple branches and return when the first completes. + +```typescript +const { winner, value } = await ctx.race("timeout-race", [ + { + name: "work", + run: async (ctx) => { + return await ctx.step("do-work", () => doExpensiveWork()); + } + }, + { + name: "timeout", + run: async (ctx) => { + await ctx.sleep("wait", 30000); + return null; // Timeout value + } + }, +]); + +if (winner === "work") { + console.log("Work completed:", value); +} else { + console.log("Timed out"); +} +``` + +- Other branches are cancelled via `AbortSignal` when a winner is determined +- If all branches fail, throws `RaceError` with all errors + +### Eviction and Cancellation + +**Eviction** - Graceful shutdown (workflow can be resumed elsewhere): + +```typescript +handle.evict(); // Request shutdown + +// In workflow, check eviction status: +if (ctx.isEvicted()) { + // Clean up and return +} + +// Or use the abort signal directly: +await fetch(url, { signal: ctx.signal }); +``` + +**Cancellation** - Permanent stop: + +```typescript +await handle.cancel(); // Sets state to "cancelled", clears alarms +``` + +### Workflow Migrations (Removed) + +When removing steps from workflow code, use `ctx.removed()` to maintain history compatibility: + +```typescript +async function myWorkflow(ctx: WorkflowContextInterface) { + // This step was removed in v2 + await ctx.removed("old-validation", "step"); + + // New code continues here + await ctx.step("new-logic", async () => { /* ... */ }); +} +``` + +This creates a placeholder entry that satisfies history validation without executing anything. + +## Configuration Constants + +Default values are exported and can be referenced when overriding: + +```typescript +import { + DEFAULT_MAX_RETRIES, // 3 + DEFAULT_RETRY_BACKOFF_BASE, // 100ms + DEFAULT_RETRY_BACKOFF_MAX, // 30000ms + DEFAULT_LOOP_COMMIT_INTERVAL, // 20 iterations + DEFAULT_STEP_TIMEOUT, // 30000ms +} from "@rivetkit/workflow-engine"; +``` + +## Error Types + +```typescript +import { + // User-facing errors + CriticalError, // Throw to skip retries + StepExhaustedError, // Step failed after all retries + JoinError, // One or more join branches failed + RaceError, // All race branches failed + HistoryDivergedError, // Workflow code changed incompatibly + + // Internal yield errors (caught by runtime) + SleepError, // Workflow sleeping + SignalWaitError, // Waiting for signals + EvictedError, // Workflow evicted + + // User errors + EntryInProgressError, // Forgot to await a step + CancelledError, // Branch cancelled (race) +} from "@rivetkit/workflow-engine"; +``` + +## Workflow States + +```typescript +type WorkflowState = + | "pending" // Not yet started + | "running" // Currently executing + | "sleeping" // Waiting for deadline or signal + | "completed" // Finished successfully + | "failed" // Unrecoverable error + | "cancelled"; // Permanently stopped +``` + +## Best Practices + +1. **Unique step names** - Each step/loop/sleep/listen within a scope must have a unique name + +2. **Deterministic code** - Workflow code outside of steps must be deterministic. Don't use `Math.random()`, `Date.now()`, or read external state outside steps. + +3. **Use steps for side effects** - All I/O and non-deterministic operations should be inside steps + +4. **Use CriticalError for permanent failures** - When an error is unrecoverable, throw `CriticalError` to avoid wasting retries + +5. **Check eviction in long operations** - Use `ctx.isEvicted()` or `ctx.signal` to handle graceful shutdown + +6. **Pass AbortSignal to cancellable operations**: + ```typescript + await ctx.step("fetch", async () => { + return fetch(url, { signal: ctx.signal }); + }); + ``` + +7. **Ephemeral steps for batching** - Use `ephemeral: true` for steps where you want to batch writes: + ```typescript + // These batch their writes + await ctx.step({ name: "a", ephemeral: true, run: async () => { ... }}); + await ctx.step({ name: "b", ephemeral: true, run: async () => { ... }}); + // This flushes all pending writes + await ctx.step({ name: "c", run: async () => { ... }}); + ``` + +## Further Reading + +See [architecture.md](./architecture.md) for detailed implementation information including: + +- Storage schema and key encoding +- Location system and NameIndex optimization +- Driver requirements +- Error handling internals +- Loop state management and history forgetting diff --git a/rivetkit-typescript/packages/workflow-engine/TODO.md b/rivetkit-typescript/packages/workflow-engine/TODO.md new file mode 100644 index 0000000000..ecf629caf0 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/TODO.md @@ -0,0 +1,18 @@ +## ephemeral-by-default steps + +- make steps ephemeral by default +- add helper fns for things like fetch, clients, etc that auto-flag a step as required to be durable +- can also opt-in to flag a step as durable + +## rollback + +- support rollback steps + +## misc + +- remove workflow state in favor of actor state + +## types + +- generic signals + diff --git a/rivetkit-typescript/packages/workflow-engine/architecture.md b/rivetkit-typescript/packages/workflow-engine/architecture.md new file mode 100644 index 0000000000..06d0be6722 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/architecture.md @@ -0,0 +1,459 @@ +# Workflow Engine Architecture + +This document describes the architecture of the workflow engine, a durable execution system for TypeScript. + +## Overview + +The workflow engine enables writing long-running, fault-tolerant workflows as regular async functions. Workflows can be interrupted at any point (process crash, eviction, sleep) and resume from where they left off. This is achieved through: + +1. **History tracking** - Every operation is recorded in persistent storage +2. **Replay** - On restart, operations replay from history instead of re-executing +3. **Deterministic execution** - Same inputs produce same execution path + +## Isolation Model + +**Critical architectural assumption**: Each workflow instance operates on an isolated KV namespace. + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Host System │ +│ ┌─────────────────┐ ┌─────────────────┐ │ +│ │ Workflow A │ │ Workflow B │ │ +│ │ ┌───────────┐ │ │ ┌───────────┐ │ │ +│ │ │ Engine │ │ │ │ Engine │ │ │ +│ │ └─────┬─────┘ │ │ └─────┬─────┘ │ │ +│ │ │ │ │ │ │ │ +│ │ ┌─────▼─────┐ │ │ ┌─────▼─────┐ │ │ +│ │ │ Driver │ │ │ │ Driver │ │ │ +│ │ │(isolated) │ │ │ │(isolated) │ │ │ +│ │ └─────┬─────┘ │ │ └─────┬─────┘ │ │ +│ │ │ │ │ │ │ │ +│ │ ┌─────▼─────┐ │ │ ┌─────▼─────┐ │ │ +│ │ │ KV A │ │ │ │ KV B │ │ │ +│ │ └───────────┘ │ │ └───────────┘ │ │ +│ └─────────────────┘ └─────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +Key guarantees: + +1. **KV Isolation** - Each workflow's `EngineDriver` operates on a completely separate KV namespace. The workflow engine does not include workflow IDs in KV keys because isolation is provided by the driver implementation. + +2. **Single Writer** - A workflow instance is the **only** reader and writer of its KV namespace during execution. There is no concurrent access from other workflow instances. + +3. **Signal Delivery** - Signals are written to the workflow's isolated KV by external systems (via `WorkflowHandle.signal()`), then read by the workflow on its next execution. Since each workflow has its own KV, signals are inherently workflow-scoped. + +4. **External Mutation** - The only external mutations to a workflow's KV are: + - Signal delivery (appending to signal queue) + - Eviction markers (not yet implemented) + + These are coordinated through the host system's scheduling to avoid conflicts. + +This isolation model means: +- The `EngineDriver` interface has no workflow ID parameters for KV operations +- Keys like `signals/0`, `history/...` are relative to each workflow's namespace +- The host system (e.g., Cloudflare Durable Objects, dedicated actor processes) provides the isolation boundary +- Alarms use workflow IDs because they may be managed by a shared scheduler + +## Driver Requirements + +The `EngineDriver` implementation must satisfy these requirements: + +1. **Sorted list results** - `list()` MUST return entries sorted by key in lexicographic byte order. The workflow engine relies on this for: + - Signal FIFO ordering (signals consumed in order received) + - Name registry reconstruction (names at correct indices) + - Deterministic replay behavior + +2. **Atomic batch writes** - `batch()` SHOULD be atomic (all-or-nothing). If atomicity is not possible, partial writes may cause inconsistent state on crash. + +3. **Prefix isolation** - `list(prefix)` and `deletePrefix(prefix)` must only affect keys that start with the exact prefix bytes. + +4. **No concurrent modification** - The driver may assume no other writer modifies the KV during workflow execution (see Isolation Model). + +## Core Concepts + +### Workflow + +A workflow is an async function that receives a `WorkflowContext` and optional input: + +```typescript +async function myWorkflow(ctx: WorkflowContext, input: MyInput): Promise { + const result = await ctx.step("fetch-data", async () => { + return await fetchData(input.id); + }); + return result; +} +``` + +### Entries + +Every operation in a workflow creates an **entry** in the history. Entry types: + +| Type | Purpose | +|------|---------| +| `step` | Execute arbitrary async code | +| `loop` | Iterate with durable state | +| `sleep` | Wait for a duration or timestamp | +| `signal` | Wait for external events | +| `join` | Execute branches in parallel, wait for all | +| `race` | Execute branches in parallel, first wins | +| `removed` | Placeholder for migrated-away entries | + +### Location System + +Each entry is identified by a **location** - a path through the workflow's execution tree: + +``` +step("a") -> [0] -> "a" +step("b") -> [1] -> "b" +loop("outer") { + step("inner") -> [2, ~0, 3] -> "outer/~0/inner" +} +join("parallel") { + branch "x" { + step("work") -> [4, 5, 6] -> "parallel/x/work" + } +} +``` + +#### NameIndex Optimization + +Locations use numeric indices into a **name registry** rather than storing strings directly: + +```typescript +type NameIndex = number; +type PathSegment = NameIndex | LoopIterationMarker; +type Location = PathSegment[]; + +// Name registry: ["a", "b", "outer", "inner", "parallel", "x", "work"] +// Location [4, 5, 6] resolves to "parallel/x/work" +``` + +This optimization reduces storage size when the same names appear many times. + +## Component Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ runWorkflow() │ +│ Entry point that orchestrates workflow execution │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ WorkflowContextImpl │ +│ Implements WorkflowContext interface │ +│ - step(), loop(), sleep(), listen(), join(), race() │ +│ - Manages current location │ +│ - Creates branch contexts for parallel execution │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Storage │ +│ In-memory representation of workflow state │ +│ - nameRegistry: string[] │ +│ - history: Map │ +│ - entryMetadata: Map │ +│ - signals: Signal[] │ +│ - state: WorkflowState │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ EngineDriver │ +│ Interface for persistent storage (per-workflow isolated) │ +│ - get/set/delete/list for KV operations │ +│ - batch for atomic writes │ +│ - setAlarm/clearAlarm for scheduled wake-ups │ +│ See "Isolation Model" above for KV scoping │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Data Flow + +### First Execution + +``` +1. runWorkflow() called +2. loadStorage() loads empty state from driver +3. Workflow function executes +4. Each ctx.step() call: + a. Check history for existing entry (none found) + b. Create new entry + c. Execute the step callback + d. Save output to entry + e. flush() writes to driver +5. Workflow completes, final flush() +``` + +### Replay Execution + +``` +1. runWorkflow() called +2. loadStorage() loads previous state from driver +3. Workflow function executes +4. Each ctx.step() call: + a. Check history for existing entry (found!) + b. Entry has output -> return immediately (no callback execution) +5. Workflow continues from where it left off +``` + +### Sleep/Signal Yielding + +``` +1. ctx.sleep() or ctx.listen() called +2. Check if deadline passed or signal available (in memory) +3. If not ready: + a. Throw SleepError or SignalWaitError + b. runWorkflow() catches error + c. flush() saves current state + d. setAlarm() schedules wake-up (for sleep) + e. Return { state: "sleeping", ... } +4. External scheduler calls runWorkflow() again when ready +5. loadStorage() loads any new signals added while yielded +6. Replay proceeds, sleep/signal now succeeds +``` + +### Signal Delivery Model + +**Important**: The workflow's KV is only mutated by the workflow engine itself during execution. Signals follow a specific delivery pattern: + +``` +1. External system sends signal to workflow +2. Signal written directly to KV: signals/{index} +3. External system triggers workflow wake-up (scheduler/notify) +4. runWorkflow() called +5. loadStorage() loads ALL signals from KV into memory +6. Workflow checks storage.signals (in-memory, no polling) +7. If signal found: consume and continue +8. If not found: yield with SignalWaitError +``` + +There is no polling for signals during execution. Signals must be present in KV before `runWorkflow()` is called for them to be available. + +## Storage Schema + +Data is stored using binary key encoding with fdb-tuple for proper byte ordering: + +``` +Key Format (binary tuples): Value Format: +[1, index] -> BARE (name string) +[2, ...locationSegments] -> BARE+versioning (Entry) +[3, signalId] -> BARE+versioning (Signal) +[4, 1] -> text (WorkflowState) +[4, 2] -> CBOR (workflow output) +[4, 3] -> CBOR (WorkflowError) +[4, 4] -> text (version) +[4, 5] -> CBOR (workflow input) +[5, entryId] -> BARE+versioning (EntryMetadata) + +Key prefixes: +1 = NAMES - Name registry +2 = HISTORY - History entries +3 = SIGNALS - Signal queue +4 = WORKFLOW - Workflow metadata +5 = ENTRY_METADATA - Entry metadata + +Location segments in keys: +- NameIndex (number) -> encoded directly +- LoopIterationMarker -> [loopIdx, iteration] nested tuple +``` + +The fdb-tuple encoding ensures: +- Proper lexicographic byte ordering for `list()` operations +- Compact representation for numeric indices +- Nested tuples for complex segments (loop iterations) + +### Entry Structure + +```typescript +interface Entry { + id: string; // UUID + location: Location; // Path in execution tree + kind: EntryKind; // Type-specific data + dirty: boolean; // Needs flushing? +} + +interface StepEntry { + type: "step"; + data: { + output?: unknown; // Successful result + error?: string; // Error message if failed + }; +} +``` + +### Metadata Structure + +```typescript +interface EntryMetadata { + status: "pending" | "running" | "completed" | "failed" | "exhausted"; + attempts: number; + lastAttemptAt: number; + createdAt: number; + completedAt?: number; + dirty: boolean; +} +``` + +## Error Handling + +### Retryable Errors + +Regular errors thrown from step callbacks trigger retry logic: + +1. Error caught, saved to entry +2. `StepFailedError` thrown +3. On next run, metadata.attempts checked +4. If attempts < maxRetries, apply backoff and retry +5. If attempts >= maxRetries, throw `StepExhaustedError` + +### Critical Errors + +`CriticalError` bypasses retry logic: + +```typescript +await ctx.step("validate", async () => { + if (!isValid(input)) { + throw new CriticalError("Invalid input"); + } +}); +``` + +### Yield Errors + +These signal the workflow should pause: + +| Error | Meaning | +|-------|---------| +| `SleepError` | Waiting for a deadline | +| `SignalWaitError` | Waiting for signals | +| `EvictedError` | Workflow being moved to another worker | + +## Parallel Execution + +### Join (All) + +```typescript +const results = await ctx.join("fetch-all", { + user: { run: async (ctx) => await ctx.step("user", fetchUser) }, + posts: { run: async (ctx) => await ctx.step("posts", fetchPosts) }, +}); +// results.user and results.posts available +``` + +- All branches execute concurrently +- Waits for ALL branches to complete +- If any branch fails, collects all errors into `JoinError` +- Branch state tracked: pending -> running -> completed/failed + +### Race (First) + +```typescript +const { winner, value } = await ctx.race("timeout", [ + { name: "work", run: async (ctx) => await doWork(ctx) }, + { name: "timeout", run: async (ctx) => { await ctx.sleep("wait", 5000); return null; } }, +]); +``` + +- All branches execute concurrently +- Returns when FIRST branch completes +- Other branches are cancelled via AbortSignal +- Winner tracked in history for replay + +## Loop State Management + +Loops maintain durable state across iterations: + +```typescript +await ctx.loop({ + name: "process-items", + state: { cursor: null, processed: 0 }, + commitInterval: 10, + run: async (ctx, state) => { + const batch = await ctx.step("fetch", () => fetchBatch(state.cursor)); + if (!batch.items.length) { + return Loop.break(state.processed); + } + await ctx.step("process", () => processBatch(batch.items)); + return Loop.continue({ + cursor: batch.nextCursor, + processed: state.processed + batch.items.length, + }); + }, +}); +``` + +### Commit Interval + +- State is persisted every `commitInterval` iterations +- On crash, replay resumes from last committed state +- Old iteration entries are deleted to save space + +### History Forgetting + +After commit, old iterations are deleted: + +``` +Before commit at iteration 20 (commitInterval=10): + process-items/~0/fetch, ~0/process + process-items/~1/fetch, ~1/process + ... + process-items/~19/fetch, ~19/process + +After commit: + process-items/~10/fetch, ~10/process (kept) + ... + process-items/~19/fetch, ~19/process (kept) + // Iterations 0-9 deleted +``` + +## Dirty Tracking + +To minimize writes, entries track a `dirty` flag: + +1. New entries created with `dirty: true` +2. Modified entries set `dirty = true` +3. `flush()` only writes entries where `dirty === true` +4. After write, `dirty = false` + +This means replay operations that don't modify state don't trigger writes. + +## Design Decisions + +### Why Path-Based Locations? + +Alternative: Coordinate-based (index into flat array) + +Path-based advantages: +- Human-readable keys for debugging +- Natural hierarchy for nested structures +- Prefix-based queries for loop cleanup +- Stable across code changes (names vs positions) + +### Why NameIndex? + +Locations could store strings directly, but: +- Same names repeat frequently (e.g., "step-1" in every loop iteration) +- Numeric indices compress better +- Registry loaded once, indices resolved in memory + +### Why Dirty Tracking? + +Could flush everything on every operation, but: +- Replay would write identical data +- Batch operations would have redundant writes +- Dirty tracking makes replay essentially read-only + +### Why Sequential Test Execution? + +Tests share a module-level `driver` variable via `beforeEach`. While each test gets a fresh driver, Vitest's parallel execution caused race conditions. Sequential execution ensures isolation. + +## Future Considerations + +1. **Version checking** - Detect workflow code changes +2. **Compaction** - Merge history entries to reduce size +3. **Sharding** - Distribute workflow state across multiple keys +4. **Observability** - Structured logging, metrics, tracing +5. **Workflow composition** - Child workflows, signals between workflows diff --git a/rivetkit-typescript/packages/workflow-engine/package.json b/rivetkit-typescript/packages/workflow-engine/package.json new file mode 100644 index 0000000000..f96b4bf6dd --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/package.json @@ -0,0 +1,69 @@ +{ + "name": "@rivetkit/workflow-engine", + "version": "0.0.1", + "description": "Durable workflow engine with reentrant execution", + "license": "Apache-2.0", + "keywords": [ + "workflow", + "durable", + "reentrant", + "stateful" + ], + "files": [ + "dist", + "src", + "schemas", + "package.json" + ], + "type": "module", + "exports": { + ".": { + "import": { + "types": "./dist/tsup/index.d.ts", + "default": "./dist/tsup/index.js" + }, + "require": { + "types": "./dist/tsup/index.d.cts", + "default": "./dist/tsup/index.cjs" + } + }, + "./testing": { + "import": { + "types": "./dist/tsup/testing.d.ts", + "default": "./dist/tsup/testing.js" + }, + "require": { + "types": "./dist/tsup/testing.d.cts", + "default": "./dist/tsup/testing.cjs" + } + } + }, + "engines": { + "node": ">=18.0.0" + }, + "scripts": { + "build": "pnpm run compile:bare && tsup src/index.ts src/testing.ts", + "compile:bare": "tsx scripts/compile-bare.ts compile schemas/v1.bare -o dist/schemas/v1.ts", + "check-types": "tsc --noEmit", + "lint": "biome check .", + "lint:fix": "biome check --write .", + "test": "vitest run", + "test:watch": "vitest" + }, + "dependencies": { + "@rivetkit/bare-ts": "^0.6.2", + "cbor-x": "^1.6.0", + "fdb-tuple": "^1.0.0", + "vbare": "^0.0.4" + }, + "devDependencies": { + "@bare-ts/tools": "^0.13.0", + "commander": "^12.0.0", + "tsx": "^4.7.0", + "@biomejs/biome": "^2.2.3", + "@types/node": "^22.13.1", + "tsup": "^8.4.0", + "typescript": "^5.7.3", + "vitest": "^3.1.1" + } +} diff --git a/rivetkit-typescript/packages/workflow-engine/schemas/serde.ts b/rivetkit-typescript/packages/workflow-engine/schemas/serde.ts new file mode 100644 index 0000000000..52e0ff03cc --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/schemas/serde.ts @@ -0,0 +1,550 @@ +/** + * Serialization/deserialization utilities for converting between + * internal TypeScript types and BARE schema types. + */ + +import * as cbor from "cbor-x"; +import type * as v1 from "../dist/schemas/v1.js"; +import { + EntryStatus as BareEntryStatus, + SleepState as BareSleepState, + BranchStatusType as BareBranchStatusType, +} from "../dist/schemas/v1.js"; +import type { + Entry as InternalEntry, + EntryMetadata as InternalEntryMetadata, + Signal as InternalSignal, + EntryStatus as InternalEntryStatus, + SleepState as InternalSleepState, + BranchStatusType as InternalBranchStatusType, + WorkflowState as InternalWorkflowState, + Location as InternalLocation, + PathSegment as InternalPathSegment, + LoopIterationMarker as InternalLoopIterationMarker, + EntryKind as InternalEntryKind, + BranchStatus as InternalBranchStatus, +} from "../src/types.js"; +import { + CURRENT_VERSION, + ENTRY_VERSIONED, + ENTRY_METADATA_VERSIONED, + SIGNAL_VERSIONED, + WORKFLOW_METADATA_VERSIONED, +} from "./versioned.js"; + +// === Helper: ArrayBuffer to/from utilities === + +function bufferToArrayBuffer(buf: Uint8Array): ArrayBuffer { + // Create a new ArrayBuffer and copy the data to ensure it's not a SharedArrayBuffer + const arrayBuffer = new ArrayBuffer(buf.byteLength); + new Uint8Array(arrayBuffer).set(buf); + return arrayBuffer; +} + +function encodeCbor(value: unknown): ArrayBuffer { + return bufferToArrayBuffer(cbor.encode(value)); +} + +function decodeCbor(data: ArrayBuffer): T { + return cbor.decode(new Uint8Array(data)) as T; +} + +/** + * Validate that a value is a non-null object. + */ +function assertObject(value: unknown, context: string): asserts value is Record { + if (typeof value !== "object" || value === null) { + throw new Error(`${context}: expected object, got ${typeof value}`); + } +} + +/** + * Validate that a value is a string. + */ +function assertString(value: unknown, context: string): asserts value is string { + if (typeof value !== "string") { + throw new Error(`${context}: expected string, got ${typeof value}`); + } +} + +/** + * Validate that a value is a number. + */ +function assertNumber(value: unknown, context: string): asserts value is number { + if (typeof value !== "number") { + throw new Error(`${context}: expected number, got ${typeof value}`); + } +} + +// === Entry Status Conversion === + +function entryStatusToBare(status: InternalEntryStatus): BareEntryStatus { + switch (status) { + case "pending": + return BareEntryStatus.PENDING; + case "running": + return BareEntryStatus.RUNNING; + case "completed": + return BareEntryStatus.COMPLETED; + case "failed": + return BareEntryStatus.FAILED; + case "exhausted": + return BareEntryStatus.EXHAUSTED; + } +} + +function entryStatusFromBare(status: BareEntryStatus): InternalEntryStatus { + switch (status) { + case BareEntryStatus.PENDING: + return "pending"; + case BareEntryStatus.RUNNING: + return "running"; + case BareEntryStatus.COMPLETED: + return "completed"; + case BareEntryStatus.FAILED: + return "failed"; + case BareEntryStatus.EXHAUSTED: + return "exhausted"; + } +} + +// === Sleep State Conversion === + +function sleepStateToBare(state: InternalSleepState): BareSleepState { + switch (state) { + case "pending": + return BareSleepState.PENDING; + case "completed": + return BareSleepState.COMPLETED; + case "interrupted": + return BareSleepState.INTERRUPTED; + } +} + +function sleepStateFromBare(state: BareSleepState): InternalSleepState { + switch (state) { + case BareSleepState.PENDING: + return "pending"; + case BareSleepState.COMPLETED: + return "completed"; + case BareSleepState.INTERRUPTED: + return "interrupted"; + } +} + +// === Branch Status Type Conversion === + +function branchStatusTypeToBare(status: InternalBranchStatusType): BareBranchStatusType { + switch (status) { + case "pending": + return BareBranchStatusType.PENDING; + case "running": + return BareBranchStatusType.RUNNING; + case "completed": + return BareBranchStatusType.COMPLETED; + case "failed": + return BareBranchStatusType.FAILED; + case "cancelled": + return BareBranchStatusType.CANCELLED; + } +} + +function branchStatusTypeFromBare(status: BareBranchStatusType): InternalBranchStatusType { + switch (status) { + case BareBranchStatusType.PENDING: + return "pending"; + case BareBranchStatusType.RUNNING: + return "running"; + case BareBranchStatusType.COMPLETED: + return "completed"; + case BareBranchStatusType.FAILED: + return "failed"; + case BareBranchStatusType.CANCELLED: + return "cancelled"; + } +} + +// === Location Conversion === + +function locationToBare(location: InternalLocation): v1.Location { + return location.map((segment): v1.PathSegment => { + if (typeof segment === "number") { + return { tag: "NameIndex", val: segment }; + } + return { + tag: "LoopIterationMarker", + val: { + loop: segment.loop, + iteration: segment.iteration, + }, + }; + }); +} + +function locationFromBare(location: v1.Location): InternalLocation { + return location.map((segment): InternalPathSegment => { + if (segment.tag === "NameIndex") { + return segment.val; + } + return { + loop: segment.val.loop, + iteration: segment.val.iteration, + }; + }); +} + +// === Branch Status Conversion === + +function branchStatusToBare(status: InternalBranchStatus): v1.BranchStatus { + return { + status: branchStatusTypeToBare(status.status), + output: status.output !== undefined ? encodeCbor(status.output) : null, + error: status.error ?? null, + }; +} + +function branchStatusFromBare(status: v1.BranchStatus): InternalBranchStatus { + return { + status: branchStatusTypeFromBare(status.status), + output: status.output !== null ? decodeCbor(status.output) : undefined, + error: status.error ?? undefined, + }; +} + +// === Entry Kind Conversion === + +function entryKindToBare(kind: InternalEntryKind): v1.EntryKind { + switch (kind.type) { + case "step": + return { + tag: "StepEntry", + val: { + output: kind.data.output !== undefined ? encodeCbor(kind.data.output) : null, + error: kind.data.error ?? null, + }, + }; + case "loop": + return { + tag: "LoopEntry", + val: { + state: encodeCbor(kind.data.state), + iteration: kind.data.iteration, + output: kind.data.output !== undefined ? encodeCbor(kind.data.output) : null, + }, + }; + case "sleep": + return { + tag: "SleepEntry", + val: { + deadline: BigInt(kind.data.deadline), + state: sleepStateToBare(kind.data.state), + }, + }; + case "signal": + return { + tag: "SignalEntry", + val: { + name: kind.data.name, + signalData: encodeCbor(kind.data.data), + }, + }; + case "join": + return { + tag: "JoinEntry", + val: { + branches: new Map( + Object.entries(kind.data.branches).map(([name, status]) => [ + name, + branchStatusToBare(status), + ]), + ), + }, + }; + case "race": + return { + tag: "RaceEntry", + val: { + winner: kind.data.winner, + branches: new Map( + Object.entries(kind.data.branches).map(([name, status]) => [ + name, + branchStatusToBare(status), + ]), + ), + }, + }; + case "removed": + return { + tag: "RemovedEntry", + val: { + originalType: kind.data.originalType, + originalName: kind.data.originalName ?? null, + }, + }; + } +} + +function entryKindFromBare(kind: v1.EntryKind): InternalEntryKind { + switch (kind.tag) { + case "StepEntry": + return { + type: "step", + data: { + output: kind.val.output !== null ? decodeCbor(kind.val.output) : undefined, + error: kind.val.error ?? undefined, + }, + }; + case "LoopEntry": + return { + type: "loop", + data: { + state: decodeCbor(kind.val.state), + iteration: kind.val.iteration, + output: kind.val.output !== null ? decodeCbor(kind.val.output) : undefined, + }, + }; + case "SleepEntry": + return { + type: "sleep", + data: { + deadline: Number(kind.val.deadline), + state: sleepStateFromBare(kind.val.state), + }, + }; + case "SignalEntry": + return { + type: "signal", + data: { + name: kind.val.name, + data: decodeCbor(kind.val.signalData), + }, + }; + case "JoinEntry": + return { + type: "join", + data: { + branches: Object.fromEntries( + Array.from(kind.val.branches.entries()).map(([name, status]) => [ + name, + branchStatusFromBare(status), + ]), + ), + }, + }; + case "RaceEntry": + return { + type: "race", + data: { + winner: kind.val.winner, + branches: Object.fromEntries( + Array.from(kind.val.branches.entries()).map(([name, status]) => [ + name, + branchStatusFromBare(status), + ]), + ), + }, + }; + case "RemovedEntry": + return { + type: "removed", + data: { + originalType: kind.val.originalType as InternalEntryKind["type"], + originalName: kind.val.originalName ?? undefined, + }, + }; + default: + throw new Error(`Unknown entry kind: ${(kind as { tag: string }).tag}`); + } +} + +// === Entry Conversion & Serialization === + +function entryToBare(entry: InternalEntry): v1.Entry { + return { + id: entry.id, + location: locationToBare(entry.location), + kind: entryKindToBare(entry.kind), + }; +} + +function entryFromBare(bareEntry: v1.Entry): InternalEntry { + return { + id: bareEntry.id, + location: locationFromBare(bareEntry.location), + kind: entryKindFromBare(bareEntry.kind), + dirty: false, + }; +} + +export function serializeEntry(entry: InternalEntry): Uint8Array { + const bareEntry = entryToBare(entry); + return ENTRY_VERSIONED.serializeWithEmbeddedVersion(bareEntry, CURRENT_VERSION); +} + +export function deserializeEntry(bytes: Uint8Array): InternalEntry { + const bareEntry = ENTRY_VERSIONED.deserializeWithEmbeddedVersion(bytes); + return entryFromBare(bareEntry); +} + +// === Entry Metadata Conversion & Serialization === + +function entryMetadataToBare(metadata: InternalEntryMetadata): v1.EntryMetadata { + return { + status: entryStatusToBare(metadata.status), + error: metadata.error ?? null, + attempts: metadata.attempts, + lastAttemptAt: BigInt(metadata.lastAttemptAt), + createdAt: BigInt(metadata.createdAt), + completedAt: metadata.completedAt !== undefined ? BigInt(metadata.completedAt) : null, + }; +} + +function entryMetadataFromBare(bareMetadata: v1.EntryMetadata): InternalEntryMetadata { + return { + status: entryStatusFromBare(bareMetadata.status), + error: bareMetadata.error ?? undefined, + attempts: bareMetadata.attempts, + lastAttemptAt: Number(bareMetadata.lastAttemptAt), + createdAt: Number(bareMetadata.createdAt), + completedAt: bareMetadata.completedAt !== null ? Number(bareMetadata.completedAt) : undefined, + dirty: false, + }; +} + +export function serializeEntryMetadata(metadata: InternalEntryMetadata): Uint8Array { + const bareMetadata = entryMetadataToBare(metadata); + return ENTRY_METADATA_VERSIONED.serializeWithEmbeddedVersion(bareMetadata, CURRENT_VERSION); +} + +export function deserializeEntryMetadata(bytes: Uint8Array): InternalEntryMetadata { + const bareMetadata = ENTRY_METADATA_VERSIONED.deserializeWithEmbeddedVersion(bytes); + return entryMetadataFromBare(bareMetadata); +} + +// === Signal Conversion & Serialization === + +function signalToBare(signal: InternalSignal): v1.Signal { + return { + id: signal.id, + name: signal.name, + signalData: encodeCbor(signal.data), + sentAt: BigInt(signal.sentAt), + }; +} + +function signalFromBare(bareSignal: v1.Signal): InternalSignal { + return { + id: bareSignal.id, + name: bareSignal.name, + data: decodeCbor(bareSignal.signalData), + sentAt: Number(bareSignal.sentAt), + }; +} + +export function serializeSignal(signal: InternalSignal): Uint8Array { + const bareSignal = signalToBare(signal); + return SIGNAL_VERSIONED.serializeWithEmbeddedVersion(bareSignal, CURRENT_VERSION); +} + +export function deserializeSignal(bytes: Uint8Array): InternalSignal { + const bareSignal = SIGNAL_VERSIONED.deserializeWithEmbeddedVersion(bytes); + return signalFromBare(bareSignal); +} + +// === Workflow Metadata Serialization === +// Note: These are used for reading/writing individual workflow fields + +export function serializeWorkflowState(state: InternalWorkflowState): Uint8Array { + // For simple values, we can encode them directly without the full metadata struct + // Using a single byte for efficiency + const encoder = new TextEncoder(); + return encoder.encode(state); +} + +export function deserializeWorkflowState(bytes: Uint8Array): InternalWorkflowState { + const decoder = new TextDecoder(); + const state = decoder.decode(bytes) as InternalWorkflowState; + const validStates: InternalWorkflowState[] = ["pending", "running", "sleeping", "failed", "completed", "cancelled"]; + if (!validStates.includes(state)) { + throw new Error(`Invalid workflow state: ${state}`); + } + return state; +} + +export function serializeWorkflowOutput(output: unknown): Uint8Array { + return cbor.encode(output); +} + +export function deserializeWorkflowOutput(bytes: Uint8Array): T { + try { + return cbor.decode(bytes) as T; + } catch (error) { + throw new Error(`Failed to deserialize workflow output: ${error instanceof Error ? error.message : String(error)}`); + } +} + +/** + * Structured error type for serialization. + */ +interface SerializedWorkflowError { + name: string; + message: string; + stack?: string; + metadata?: Record; +} + +export function serializeWorkflowError(error: SerializedWorkflowError): Uint8Array { + return cbor.encode(error); +} + +export function deserializeWorkflowError(bytes: Uint8Array): SerializedWorkflowError { + try { + const decoded = cbor.decode(bytes); + // Handle legacy string format + if (typeof decoded === "string") { + return { name: "Error", message: decoded }; + } + assertObject(decoded, "WorkflowError"); + // Validate required fields + const obj = decoded as Record; + assertString(obj.name, "WorkflowError.name"); + assertString(obj.message, "WorkflowError.message"); + return { + name: obj.name, + message: obj.message, + stack: typeof obj.stack === "string" ? obj.stack : undefined, + metadata: typeof obj.metadata === "object" && obj.metadata !== null + ? obj.metadata as Record + : undefined, + }; + } catch { + // If decoding fails, try legacy text format + const decoder = new TextDecoder(); + const message = decoder.decode(bytes); + return { name: "Error", message }; + } +} + +export function serializeWorkflowInput(input: unknown): Uint8Array { + return cbor.encode(input); +} + +export function deserializeWorkflowInput(bytes: Uint8Array): T { + try { + return cbor.decode(bytes) as T; + } catch (error) { + throw new Error(`Failed to deserialize workflow input: ${error instanceof Error ? error.message : String(error)}`); + } +} + +// === Name Registry Serialization === + +export function serializeName(name: string): Uint8Array { + const encoder = new TextEncoder(); + return encoder.encode(name); +} + +export function deserializeName(bytes: Uint8Array): string { + const decoder = new TextDecoder(); + return decoder.decode(bytes); +} diff --git a/rivetkit-typescript/packages/workflow-engine/schemas/v1.bare b/rivetkit-typescript/packages/workflow-engine/schemas/v1.bare new file mode 100644 index 0000000000..2d375eb8ee --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/schemas/v1.bare @@ -0,0 +1,190 @@ +# Workflow Engine BARE Schema v1 +# +# This schema defines the binary encoding for workflow engine persistence. +# Types marked with `data` are arbitrary binary blobs (for user-provided data). + +# Opaque user data (CBOR-encoded) +type Cbor data + +# MARK: Location +# Index into the entry name registry +type NameIndex u32 + +# Marker for a loop iteration in a location path +type LoopIterationMarker struct { + loop: NameIndex + iteration: u32 +} + +# A segment in a location path - either a name index or a loop iteration marker +type PathSegment union { + NameIndex | + LoopIterationMarker +} + +# Location identifies where an entry exists in the workflow execution tree +type Location list + +# MARK: Entry Status +type EntryStatus enum { + PENDING + RUNNING + COMPLETED + FAILED + EXHAUSTED +} + +# MARK: Sleep State +type SleepState enum { + PENDING + COMPLETED + INTERRUPTED +} + +# MARK: Branch Status +type BranchStatusType enum { + PENDING + RUNNING + COMPLETED + FAILED + CANCELLED +} + +# MARK: Step Entry +type StepEntry struct { + # Output value (CBOR-encoded arbitrary data) + output: optional + # Error message if step failed + error: optional +} + +# MARK: Loop Entry +type LoopEntry struct { + # Loop state (CBOR-encoded arbitrary data) + state: Cbor + # Current iteration number + iteration: u32 + # Output value if loop completed (CBOR-encoded arbitrary data) + output: optional +} + +# MARK: Sleep Entry +type SleepEntry struct { + # Deadline timestamp in milliseconds + deadline: u64 + # Current sleep state + state: SleepState +} + +# MARK: Signal Entry +type SignalEntry struct { + # Signal name + name: str + # Signal data (CBOR-encoded arbitrary data) + signalData: Cbor +} + +# MARK: Branch Status +type BranchStatus struct { + status: BranchStatusType + # Output value if completed (CBOR-encoded arbitrary data) + output: optional + # Error message if failed + error: optional +} + +# MARK: Join Entry +type JoinEntry struct { + # Map of branch name to status + branches: map +} + +# MARK: Race Entry +type RaceEntry struct { + # Name of the winning branch, or null if no winner yet + winner: optional + # Map of branch name to status + branches: map +} + +# MARK: Removed Entry +type RemovedEntry struct { + # Original entry type before removal + originalType: str + # Original entry name + originalName: optional +} + +# MARK: Entry Kind +# Type-specific entry data +type EntryKind union { + StepEntry | + LoopEntry | + SleepEntry | + SignalEntry | + JoinEntry | + RaceEntry | + RemovedEntry +} + +# MARK: Entry +# An entry in the workflow history +type Entry struct { + # Unique entry ID + id: str + # Location in the workflow tree + location: Location + # Entry kind and data + kind: EntryKind +} + +# MARK: Entry Metadata +# Metadata for an entry (stored separately, lazily loaded) +type EntryMetadata struct { + status: EntryStatus + # Error message if failed + error: optional + # Number of execution attempts + attempts: u32 + # Last attempt timestamp in milliseconds + lastAttemptAt: u64 + # Creation timestamp in milliseconds + createdAt: u64 + # Completion timestamp in milliseconds + completedAt: optional +} + +# MARK: Signal +# A signal in the queue +type Signal struct { + # Unique signal ID (used as KV key) + id: str + # Signal name + name: str + # Signal data (CBOR-encoded arbitrary data) + signalData: Cbor + # Timestamp when signal was sent in milliseconds + sentAt: u64 +} + +# MARK: Workflow State +type WorkflowState enum { + PENDING + RUNNING + SLEEPING + FAILED + COMPLETED +} + +# MARK: Workflow Metadata +# Workflow-level metadata stored separately from entries +type WorkflowMetadata struct { + # Current workflow state + state: WorkflowState + # Workflow output if completed (CBOR-encoded arbitrary data) + output: optional + # Error message if failed + error: optional + # Workflow version hash for migration detection + version: optional +} diff --git a/rivetkit-typescript/packages/workflow-engine/schemas/versioned.ts b/rivetkit-typescript/packages/workflow-engine/schemas/versioned.ts new file mode 100644 index 0000000000..266ed88e9c --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/schemas/versioned.ts @@ -0,0 +1,123 @@ +import { createVersionedDataHandler } from "vbare"; +import * as v1 from "../dist/schemas/v1.js"; + +export const CURRENT_VERSION = 1; + +// Re-export generated types for convenience +export type { + Entry, + EntryMetadata, + Signal, + WorkflowMetadata, + EntryKind, + StepEntry, + LoopEntry, + SleepEntry, + SignalEntry, + JoinEntry, + RaceEntry, + RemovedEntry, + BranchStatus, + Location, + PathSegment, + LoopIterationMarker, +} from "../dist/schemas/v1.js"; + +export { + EntryStatus, + SleepState, + BranchStatusType, + WorkflowState, +} from "../dist/schemas/v1.js"; + +// === Entry Handler === + +export const ENTRY_VERSIONED = createVersionedDataHandler({ + deserializeVersion: (bytes, version) => { + switch (version) { + case 1: + return v1.decodeEntry(bytes); + default: + throw new Error(`Unknown Entry version ${version}`); + } + }, + serializeVersion: (data, version) => { + switch (version) { + case 1: + return v1.encodeEntry(data as v1.Entry); + default: + throw new Error(`Unknown Entry version ${version}`); + } + }, + deserializeConverters: () => [], + serializeConverters: () => [], +}); + +// === Entry Metadata Handler === + +export const ENTRY_METADATA_VERSIONED = createVersionedDataHandler({ + deserializeVersion: (bytes, version) => { + switch (version) { + case 1: + return v1.decodeEntryMetadata(bytes); + default: + throw new Error(`Unknown EntryMetadata version ${version}`); + } + }, + serializeVersion: (data, version) => { + switch (version) { + case 1: + return v1.encodeEntryMetadata(data as v1.EntryMetadata); + default: + throw new Error(`Unknown EntryMetadata version ${version}`); + } + }, + deserializeConverters: () => [], + serializeConverters: () => [], +}); + +// === Signal Handler === + +export const SIGNAL_VERSIONED = createVersionedDataHandler({ + deserializeVersion: (bytes, version) => { + switch (version) { + case 1: + return v1.decodeSignal(bytes); + default: + throw new Error(`Unknown Signal version ${version}`); + } + }, + serializeVersion: (data, version) => { + switch (version) { + case 1: + return v1.encodeSignal(data as v1.Signal); + default: + throw new Error(`Unknown Signal version ${version}`); + } + }, + deserializeConverters: () => [], + serializeConverters: () => [], +}); + +// === Workflow Metadata Handler === + +export const WORKFLOW_METADATA_VERSIONED = createVersionedDataHandler({ + deserializeVersion: (bytes, version) => { + switch (version) { + case 1: + return v1.decodeWorkflowMetadata(bytes); + default: + throw new Error(`Unknown WorkflowMetadata version ${version}`); + } + }, + serializeVersion: (data, version) => { + switch (version) { + case 1: + return v1.encodeWorkflowMetadata(data as v1.WorkflowMetadata); + default: + throw new Error(`Unknown WorkflowMetadata version ${version}`); + } + }, + deserializeConverters: () => [], + serializeConverters: () => [], +}); diff --git a/rivetkit-typescript/packages/workflow-engine/scripts/compile-bare.ts b/rivetkit-typescript/packages/workflow-engine/scripts/compile-bare.ts new file mode 100644 index 0000000000..fecf622c01 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/scripts/compile-bare.ts @@ -0,0 +1,127 @@ +#!/usr/bin/env -S tsx + +/** + * BARE schema compiler for TypeScript + * + * This script compiles .bare schema files to TypeScript using @bare-ts/tools, + * then post-processes the output to: + * 1. Replace @bare-ts/lib import with @rivetkit/bare-ts + * 2. Replace Node.js assert import with a custom assert function + * + * IMPORTANT: Keep the post-processing logic in sync with: + * engine/sdks/rust/runner-protocol/build.rs + */ + +import * as fs from "node:fs/promises"; +import * as path from "node:path"; +import { type Config, transform } from "@bare-ts/tools"; +import { Command } from "commander"; + +const program = new Command(); + +program + .name("bare-compiler") + .description("Compile BARE schemas to TypeScript") + .version("0.0.1"); + +program + .command("compile") + .description("Compile a BARE schema file") + .argument("", "Input BARE schema file") + .option("-o, --output ", "Output file path") + .option("--pedantic", "Enable pedantic mode", false) + .option("--generator ", "Generator type (ts, js, dts, bare)", "ts") + .action(async (input: string, options) => { + try { + const schemaPath = path.resolve(input); + const outputPath = options.output + ? path.resolve(options.output) + : schemaPath.replace(/\.bare$/, ".ts"); + + await compileSchema({ + schemaPath, + outputPath, + config: { + pedantic: options.pedantic, + generator: options.generator, + }, + }); + + console.log(`Successfully compiled ${input} to ${outputPath}`); + } catch (error) { + console.error("Failed to compile schema:", error); + process.exit(1); + } + }); + +program.parse(); + +export interface CompileOptions { + schemaPath: string; + outputPath: string; + config?: Partial; +} + +export async function compileSchema(options: CompileOptions): Promise { + const { schemaPath, outputPath, config = {} } = options; + + const schema = await fs.readFile(schemaPath, "utf-8"); + const outputDir = path.dirname(outputPath); + + await fs.mkdir(outputDir, { recursive: true }); + + const defaultConfig: Partial = { + pedantic: true, + generator: "ts", + ...config, + }; + + let result = transform(schema, defaultConfig); + + result = postProcess(result); + + await fs.writeFile(outputPath, result); +} + +const POST_PROCESS_MARKER = "// @generated - post-processed by compile-bare.ts\n"; + +const ASSERT_FUNCTION = ` +function assert(condition: boolean, message?: string): asserts condition { + if (!condition) throw new Error(message ?? "Assertion failed") +} +`; + +/** + * Post-process the generated TypeScript file to: + * 1. Replace @bare-ts/lib import with @rivetkit/bare-ts + * 2. Replace Node.js assert import with a custom assert function + * + * IMPORTANT: Keep this in sync with engine/sdks/rust/runner-protocol/build.rs + */ +function postProcess(code: string): string { + // Skip if already post-processed + if (code.startsWith(POST_PROCESS_MARKER)) { + return code; + } + + // Replace @bare-ts/lib with @rivetkit/bare-ts + code = code.replace(/@bare-ts\/lib/g, "@rivetkit/bare-ts"); + + // Remove Node.js assert import + code = code.replace(/^import assert from "assert"/m, ""); + + // Add marker and assert function + code = POST_PROCESS_MARKER + code + `\n${ASSERT_FUNCTION}`; + + // Validate post-processing succeeded + if (code.includes("@bare-ts/lib")) { + throw new Error("Failed to replace @bare-ts/lib import"); + } + if (code.includes("import assert from")) { + throw new Error("Failed to remove Node.js assert import"); + } + + return code; +} + +export type { Config } from "@bare-ts/tools"; diff --git a/rivetkit-typescript/packages/workflow-engine/src/context.ts b/rivetkit-typescript/packages/workflow-engine/src/context.ts new file mode 100644 index 0000000000..3cdfd78180 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/context.ts @@ -0,0 +1,1451 @@ +import type { EngineDriver } from "./driver.js"; +import { + CancelledError, + CriticalError, + EntryInProgressError, + EvictedError, + HistoryDivergedError, + JoinError, + RaceError, + SignalWaitError, + SleepError, + StepExhaustedError, + StepFailedError, +} from "./errors.js"; +import { + appendLoopIteration, + appendName, + emptyLocation, + locationToKey, + registerName, +} from "./location.js"; +import { + consumeSignal, + consumeSignals, + createEntry, + deleteEntriesWithPrefix, + flush, + getEntry, + getOrCreateMetadata, + loadMetadata, + setEntry, +} from "./storage.js"; +import type { + BranchConfig, + BranchOutput, + BranchStatus, + Entry, + EntryKindType, + Location, + LoopConfig, + LoopResult, + Signal, + StepConfig, + Storage, + WorkflowContextInterface, +} from "./types.js"; +import { sleep } from "./utils.js"; + +/** + * Default values for step configuration. + * These are exported so users can reference them when overriding. + */ +export const DEFAULT_MAX_RETRIES = 3; +export const DEFAULT_RETRY_BACKOFF_BASE = 100; +export const DEFAULT_RETRY_BACKOFF_MAX = 30000; +export const DEFAULT_LOOP_COMMIT_INTERVAL = 20; +export const DEFAULT_STEP_TIMEOUT = 30000; // 30 seconds + +/** + * Calculate backoff delay with exponential backoff. + * Uses deterministic calculation (no jitter) for replay consistency. + */ +function calculateBackoff( + attempts: number, + base: number, + max: number, +): number { + // Exponential backoff without jitter for determinism + return Math.min(max, base * 2 ** attempts); +} + +/** + * Error thrown when a step times out. + */ +export class StepTimeoutError extends Error { + constructor( + public readonly stepName: string, + public readonly timeoutMs: number, + ) { + super(`Step "${stepName}" timed out after ${timeoutMs}ms`); + this.name = "StepTimeoutError"; + } +} + +/** + * Internal implementation of WorkflowContext. + */ +export class WorkflowContextImpl implements WorkflowContextInterface { + private entryInProgress = false; + private abortController: AbortController; + private currentLocation: Location; + private visitedKeys = new Set(); + /** Track names used in current execution to detect duplicates */ + private usedNamesInExecution = new Set(); + + constructor( + public readonly workflowId: string, + private storage: Storage, + private driver: EngineDriver, + location: Location = emptyLocation(), + abortController?: AbortController, + ) { + this.currentLocation = location; + this.abortController = abortController ?? new AbortController(); + } + + get signal(): AbortSignal { + return this.abortController.signal; + } + + isEvicted(): boolean { + return this.signal.aborted; + } + + private assertNotInProgress(): void { + if (this.entryInProgress) { + throw new EntryInProgressError(); + } + } + + private checkEvicted(): void { + if (this.signal.aborted) { + throw new EvictedError(); + } + } + + /** + * Create a new branch context for parallel/nested execution. + */ + createBranch(location: Location, abortController?: AbortController): WorkflowContextImpl { + return new WorkflowContextImpl( + this.workflowId, + this.storage, + this.driver, + location, + abortController ?? this.abortController, + ); + } + + /** + * Mark a key as visited. + */ + private markVisited(key: string): void { + this.visitedKeys.add(key); + } + + /** + * Check if a name has already been used at the current location in this execution. + * Throws HistoryDivergedError if duplicate detected. + */ + private checkDuplicateName(name: string): void { + const fullKey = locationToKey(this.storage, this.currentLocation) + "/" + name; + if (this.usedNamesInExecution.has(fullKey)) { + throw new HistoryDivergedError( + `Duplicate entry name "${name}" at location "${locationToKey(this.storage, this.currentLocation)}". ` + + `Each step/loop/sleep/listen/join/race must have a unique name within its scope.`, + ); + } + this.usedNamesInExecution.add(fullKey); + } + + /** + * Validate that all expected entries in the branch were visited. + * Throws HistoryDivergedError if there are unvisited entries. + */ + validateComplete(): void { + const prefix = locationToKey(this.storage, this.currentLocation); + + for (const key of this.storage.history.entries.keys()) { + // Check if this key is under our current location prefix + // Handle root prefix (empty string) specially - all keys are under root + const isUnderPrefix = + prefix === "" + ? true // Root: all keys are children + : key.startsWith(prefix + "/") || key === prefix; + + if (isUnderPrefix) { + if (!this.visitedKeys.has(key)) { + // Entry exists in history but wasn't visited + // This means workflow code may have changed + throw new HistoryDivergedError( + `Entry "${key}" exists in history but was not visited. ` + + `Workflow code may have changed. Use ctx.removed() to handle migrations.`, + ); + } + } + } + } + + /** + * Evict the workflow. + */ + evict(): void { + this.abortController.abort(new EvictedError()); + } + + /** + * Wait for eviction signal. + * + * The event listener uses { once: true } to auto-remove after firing, + * preventing memory leaks if this method is called multiple times. + */ + waitForEviction(): Promise { + return new Promise((_, reject) => { + if (this.signal.aborted) { + reject(new EvictedError()); + return; + } + this.signal.addEventListener( + "abort", + () => { + reject(new EvictedError()); + }, + { once: true }, + ); + }); + } + + // === Step === + + async step(nameOrConfig: string | StepConfig, run?: () => Promise): Promise { + this.assertNotInProgress(); + this.checkEvicted(); + + const config: StepConfig = + typeof nameOrConfig === "string" + ? { name: nameOrConfig, run: run! } + : nameOrConfig; + + this.entryInProgress = true; + try { + return await this.executeStep(config); + } finally { + this.entryInProgress = false; + } + } + + private async executeStep(config: StepConfig): Promise { + // Check for duplicate name in current execution + this.checkDuplicateName(config.name); + + const location = appendName(this.storage, this.currentLocation, config.name); + const key = locationToKey(this.storage, location); + const existing = this.storage.history.entries.get(key); + + // Mark this entry as visited for validateComplete + this.markVisited(key); + + if (existing) { + if (existing.kind.type !== "step") { + throw new HistoryDivergedError( + `Expected step "${config.name}" at ${key}, found ${existing.kind.type}`, + ); + } + + const stepData = existing.kind.data; + + // Replay successful result + if (stepData.output !== undefined) { + return stepData.output as T; + } + + // Check if we should retry + const metadata = await loadMetadata(this.storage, this.driver, existing.id); + const maxRetries = config.maxRetries ?? DEFAULT_MAX_RETRIES; + + if (metadata.attempts >= maxRetries) { + throw new StepExhaustedError(config.name, stepData.error); + } + + // Calculate backoff and yield to scheduler + // This allows the workflow to be evicted during backoff + const backoffDelay = calculateBackoff( + metadata.attempts, + config.retryBackoffBase ?? DEFAULT_RETRY_BACKOFF_BASE, + config.retryBackoffMax ?? DEFAULT_RETRY_BACKOFF_MAX, + ); + const retryAt = metadata.lastAttemptAt + backoffDelay; + const now = Date.now(); + + if (now < retryAt) { + // Yield to scheduler - will be woken up at retryAt + throw new SleepError(retryAt); + } + } + + // Execute the step + const entry = + existing ?? + createEntry(location, { type: "step", data: {} }); + if (!existing) { + setEntry(this.storage, location, entry); + } + + const metadata = getOrCreateMetadata(this.storage, entry.id); + metadata.status = "running"; + metadata.attempts++; + metadata.lastAttemptAt = Date.now(); + metadata.dirty = true; + + // Get timeout configuration + const timeout = config.timeout ?? DEFAULT_STEP_TIMEOUT; + + try { + // Execute with timeout + const output = await this.executeWithTimeout( + config.run(), + timeout, + config.name, + ); + + if (entry.kind.type === "step") { + entry.kind.data.output = output; + } + entry.dirty = true; + metadata.status = "completed"; + metadata.completedAt = Date.now(); + + // Ephemeral steps don't trigger an immediate flush. This avoids the + // synchronous write overhead for transient operations. Note that the + // step's entry is still marked dirty and WILL be persisted on the + // next flush from a non-ephemeral operation. The purpose of ephemeral + // is to batch writes, not to avoid persistence entirely. + if (!config.ephemeral) { + await flush(this.storage, this.driver); + } + + return output; + } catch (error) { + // Timeout errors are treated as critical (no retry) + if (error instanceof StepTimeoutError) { + if (entry.kind.type === "step") { + entry.kind.data.error = String(error); + } + entry.dirty = true; + metadata.status = "exhausted"; + await flush(this.storage, this.driver); + throw new CriticalError(error.message); + } + + if (error instanceof CriticalError) { + if (entry.kind.type === "step") { + entry.kind.data.error = String(error); + } + entry.dirty = true; + metadata.status = "exhausted"; + await flush(this.storage, this.driver); + throw error; + } + + if (entry.kind.type === "step") { + entry.kind.data.error = String(error); + } + entry.dirty = true; + metadata.status = "failed"; + + await flush(this.storage, this.driver); + + throw new StepFailedError(config.name, error, metadata.attempts); + } + } + + /** + * Execute a promise with timeout. + * + * Note: This does NOT cancel the underlying operation. JavaScript Promises + * cannot be cancelled once started. When a timeout occurs: + * - The step is marked as failed with StepTimeoutError + * - The underlying async operation continues running in the background + * - Any side effects from the operation may still occur + * + * For cancellable operations, pass ctx.signal to APIs that support AbortSignal: + * + * await ctx.step("fetch", async () => { + * return fetch(url, { signal: ctx.signal }); + * }); + * + * Or check ctx.isEvicted() periodically in long-running loops. + */ + private async executeWithTimeout( + promise: Promise, + timeoutMs: number, + stepName: string, + ): Promise { + if (timeoutMs <= 0) { + return promise; + } + + let timeoutId: ReturnType | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + reject(new StepTimeoutError(stepName, timeoutMs)); + }, timeoutMs); + }); + + try { + return await Promise.race([promise, timeoutPromise]); + } finally { + if (timeoutId !== undefined) { + clearTimeout(timeoutId); + } + } + } + + // === Loop === + + async loop( + nameOrConfig: string | LoopConfig, + run?: (ctx: WorkflowContextInterface) => Promise>, + ): Promise { + this.assertNotInProgress(); + this.checkEvicted(); + + const config: LoopConfig = + typeof nameOrConfig === "string" + ? { name: nameOrConfig, run: run as LoopConfig["run"] } + : nameOrConfig; + + this.entryInProgress = true; + try { + return await this.executeLoop(config); + } finally { + this.entryInProgress = false; + } + } + + private async executeLoop(config: LoopConfig): Promise { + // Check for duplicate name in current execution + this.checkDuplicateName(config.name); + + const location = appendName(this.storage, this.currentLocation, config.name); + const key = locationToKey(this.storage, location); + const existing = this.storage.history.entries.get(key); + + // Mark this entry as visited for validateComplete + this.markVisited(key); + + let entry: Entry; + let state: S; + let iteration: number; + + if (existing) { + if (existing.kind.type !== "loop") { + throw new HistoryDivergedError( + `Expected loop "${config.name}" at ${key}, found ${existing.kind.type}`, + ); + } + + const loopData = existing.kind.data; + + // Loop already completed + if (loopData.output !== undefined) { + return loopData.output as T; + } + + // Resume from saved state + entry = existing; + state = loopData.state as S; + iteration = loopData.iteration; + } else { + // New loop + state = config.state as S; + iteration = 0; + entry = createEntry(location, { + type: "loop", + data: { state, iteration }, + }); + setEntry(this.storage, location, entry); + } + + // TODO: Add validation for commitInterval (must be > 0) + const commitInterval = config.commitInterval ?? DEFAULT_LOOP_COMMIT_INTERVAL; + + // Execute loop iterations + while (true) { + this.checkEvicted(); + + // Create branch for this iteration + const iterationLocation = appendLoopIteration( + this.storage, + location, + config.name, + iteration, + ); + const branchCtx = this.createBranch(iterationLocation); + + // Execute iteration + const result = await config.run(branchCtx, state); + + // Validate branch completed cleanly + branchCtx.validateComplete(); + + if ("break" in result && result.break) { + // Loop complete + if (entry.kind.type === "loop") { + entry.kind.data.output = result.value; + entry.kind.data.state = state; + entry.kind.data.iteration = iteration; + } + entry.dirty = true; + + await flush(this.storage, this.driver); + await this.forgetOldIterations(location, iteration, commitInterval); + + return result.value; + } + + // Continue with new state + if ("continue" in result && result.continue) { + state = result.state; + } + iteration++; + + // Periodic commit + if (iteration % commitInterval === 0) { + if (entry.kind.type === "loop") { + entry.kind.data.state = state; + entry.kind.data.iteration = iteration; + } + entry.dirty = true; + + await flush(this.storage, this.driver); + await this.forgetOldIterations(location, iteration, commitInterval); + } + } + } + + /** + * Delete old loop iteration entries to save storage space. + * + * Loop locations always end with a NameIndex (number) because loops are + * created via appendName(). Even for nested loops, the innermost loop's + * location ends with its name index: + * + * ctx.loop("outer") → location: [outerIndex] + * iteration 0 → location: [{ loop: outerIndex, iteration: 0 }] + * ctx.loop("inner") → location: [{ loop: outerIndex, iteration: 0 }, innerIndex] + * + * This function removes iterations older than (currentIteration - commitInterval). + */ + private async forgetOldIterations( + loopLocation: Location, + currentIteration: number, + commitInterval: number, + ): Promise { + const keepFrom = Math.max(0, currentIteration - commitInterval); + // Get the loop name index from the last segment of loopLocation. + // This is always a NameIndex (number) because loop entries are created + // via appendName(), not appendLoopIteration(). + const loopSegment = loopLocation[loopLocation.length - 1]; + if (typeof loopSegment !== "number") { + throw new Error("Expected loop location to end with a name index"); + } + + for (let i = 0; i < keepFrom; i++) { + // Build location prefix for this iteration. + // We replace the last segment (the loop's name index) with an + // iteration marker to target all entries under that iteration. + const iterationLocation: Location = [ + ...loopLocation.slice(0, -1), + { loop: loopSegment, iteration: i }, + ]; + await deleteEntriesWithPrefix(this.storage, this.driver, iterationLocation); + } + } + + // === Sleep === + + async sleep(name: string, durationMs: number): Promise { + const deadline = Date.now() + durationMs; + return this.sleepUntil(name, deadline); + } + + async sleepUntil(name: string, timestampMs: number): Promise { + this.assertNotInProgress(); + this.checkEvicted(); + + this.entryInProgress = true; + try { + await this.executeSleep(name, timestampMs); + } finally { + this.entryInProgress = false; + } + } + + private async executeSleep(name: string, deadline: number): Promise { + // Check for duplicate name in current execution + this.checkDuplicateName(name); + + const location = appendName(this.storage, this.currentLocation, name); + const key = locationToKey(this.storage, location); + const existing = this.storage.history.entries.get(key); + + // Mark this entry as visited for validateComplete + this.markVisited(key); + + let entry: Entry; + + if (existing) { + if (existing.kind.type !== "sleep") { + throw new HistoryDivergedError( + `Expected sleep "${name}" at ${key}, found ${existing.kind.type}`, + ); + } + + const sleepData = existing.kind.data; + + // Already completed or interrupted + if (sleepData.state !== "pending") { + return; + } + + // Use stored deadline + deadline = sleepData.deadline; + entry = existing; + } else { + entry = createEntry(location, { + type: "sleep", + data: { deadline, state: "pending" }, + }); + setEntry(this.storage, location, entry); + entry.dirty = true; + await flush(this.storage, this.driver); + } + + const now = Date.now(); + const remaining = deadline - now; + + if (remaining <= 0) { + // Deadline passed + if (entry.kind.type === "sleep") { + entry.kind.data.state = "completed"; + } + entry.dirty = true; + await flush(this.storage, this.driver); + return; + } + + // Short sleep: wait in memory + if (remaining < this.driver.workerPollInterval) { + await Promise.race([sleep(remaining), this.waitForEviction()]); + + this.checkEvicted(); + + if (entry.kind.type === "sleep") { + entry.kind.data.state = "completed"; + } + entry.dirty = true; + return; + } + + // Long sleep: yield to scheduler + throw new SleepError(deadline); + } + + // === Listen === + // + // IMPORTANT: Signals are loaded once at workflow start (in loadStorage). + // If a signal is sent via handle.signal() DURING workflow execution, + // it won't be visible until the next execution. The workflow will yield + // (SleepError/SignalWaitError), then on the next run, loadStorage() will + // pick up the new signal. This is intentional - no polling during execution. + + async listen(name: string, signalName: string): Promise { + const signals = await this.listenN(name, signalName, 1); + return signals[0]; + } + + async listenN(name: string, signalName: string, limit: number): Promise { + this.assertNotInProgress(); + this.checkEvicted(); + + this.entryInProgress = true; + try { + return await this.executeListenN(name, signalName, limit); + } finally { + this.entryInProgress = false; + } + } + + private async executeListenN( + name: string, + signalName: string, + limit: number, + ): Promise { + // Check for duplicate name in current execution + this.checkDuplicateName(name); + + // Check for replay: first check if we have a count entry + const countLocation = appendName( + this.storage, + this.currentLocation, + `${name}:count`, + ); + const countKey = locationToKey(this.storage, countLocation); + const existingCount = this.storage.history.entries.get(countKey); + + // Mark count entry as visited + this.markVisited(countKey); + + if (existingCount && existingCount.kind.type === "signal") { + // Replay: read all recorded signals + const count = existingCount.kind.data.data as number; + const results: T[] = []; + + for (let i = 0; i < count; i++) { + const signalLocation = appendName( + this.storage, + this.currentLocation, + `${name}:${i}`, + ); + const signalKey = locationToKey(this.storage, signalLocation); + + // Mark each signal entry as visited + this.markVisited(signalKey); + + const existingSignal = this.storage.history.entries.get(signalKey); + if (existingSignal && existingSignal.kind.type === "signal") { + results.push(existingSignal.kind.data.data as T); + } + } + + return results; + } + + // Try to consume signals immediately + const signals = await consumeSignals( + this.storage, + this.driver, + signalName, + limit, + ); + + if (signals.length > 0) { + // Record each signal in history with indexed names + for (let i = 0; i < signals.length; i++) { + const signalLocation = appendName( + this.storage, + this.currentLocation, + `${name}:${i}`, + ); + const signalEntry = createEntry(signalLocation, { + type: "signal", + data: { name: signalName, data: signals[i].data }, + }); + setEntry(this.storage, signalLocation, signalEntry); + + // Mark as visited + this.markVisited(locationToKey(this.storage, signalLocation)); + } + + // Record the count for replay + const countEntry = createEntry(countLocation, { + type: "signal", + data: { name: `${signalName}:count`, data: signals.length }, + }); + setEntry(this.storage, countLocation, countEntry); + + await flush(this.storage, this.driver); + + return signals.map((s) => s.data as T); + } + + // No signals found, throw to yield to scheduler + throw new SignalWaitError([signalName]); + } + + async listenWithTimeout( + name: string, + signalName: string, + timeoutMs: number, + ): Promise { + const deadline = Date.now() + timeoutMs; + return this.listenUntil(name, signalName, deadline); + } + + async listenUntil( + name: string, + signalName: string, + timestampMs: number, + ): Promise { + this.assertNotInProgress(); + this.checkEvicted(); + + this.entryInProgress = true; + try { + return await this.executeListenUntil(name, signalName, timestampMs); + } finally { + this.entryInProgress = false; + } + } + + private async executeListenUntil( + name: string, + signalName: string, + deadline: number, + ): Promise { + // Check for duplicate name in current execution + this.checkDuplicateName(name); + + const sleepLocation = appendName(this.storage, this.currentLocation, name); + const signalLocation = appendName( + this.storage, + this.currentLocation, + `${name}:signal`, + ); + const sleepKey = locationToKey(this.storage, sleepLocation); + const signalKey = locationToKey(this.storage, signalLocation); + + // Mark entries as visited for validateComplete + this.markVisited(sleepKey); + this.markVisited(signalKey); + + const existingSleep = this.storage.history.entries.get(sleepKey); + + // Check for replay + if (existingSleep && existingSleep.kind.type === "sleep") { + const sleepData = existingSleep.kind.data; + + if (sleepData.state === "completed") { + return null; + } + + if (sleepData.state === "interrupted") { + const existingSignal = this.storage.history.entries.get(signalKey); + if (existingSignal && existingSignal.kind.type === "signal") { + return existingSignal.kind.data.data as T; + } + throw new HistoryDivergedError( + "Expected signal entry after interrupted sleep", + ); + } + + deadline = sleepData.deadline; + } else { + // Create sleep entry + const sleepEntry = createEntry(sleepLocation, { + type: "sleep", + data: { deadline, state: "pending" }, + }); + setEntry(this.storage, sleepLocation, sleepEntry); + sleepEntry.dirty = true; + await flush(this.storage, this.driver); + } + + const now = Date.now(); + const remaining = deadline - now; + + // Deadline passed, check for signal one more time + if (remaining <= 0) { + const signal = await consumeSignal(this.storage, this.driver, signalName); + const sleepEntry = getEntry(this.storage, sleepLocation)!; + + if (signal) { + if (sleepEntry.kind.type === "sleep") { + sleepEntry.kind.data.state = "interrupted"; + } + sleepEntry.dirty = true; + + const signalEntry = createEntry(signalLocation, { + type: "signal", + data: { name: signalName, data: signal.data }, + }); + setEntry(this.storage, signalLocation, signalEntry); + await flush(this.storage, this.driver); + + return signal.data as T; + } + + if (sleepEntry.kind.type === "sleep") { + sleepEntry.kind.data.state = "completed"; + } + sleepEntry.dirty = true; + await flush(this.storage, this.driver); + return null; + } + + // Check for signal (signals are loaded at workflow start, no polling needed) + const signal = await consumeSignal(this.storage, this.driver, signalName); + if (signal) { + const sleepEntry = getEntry(this.storage, sleepLocation)!; + if (sleepEntry.kind.type === "sleep") { + sleepEntry.kind.data.state = "interrupted"; + } + sleepEntry.dirty = true; + + const signalEntry = createEntry(signalLocation, { + type: "signal", + data: { name: signalName, data: signal.data }, + }); + setEntry(this.storage, signalLocation, signalEntry); + await flush(this.storage, this.driver); + + return signal.data as T; + } + + // Signal not available, yield to scheduler until deadline + throw new SleepError(deadline); + } + + async listenNWithTimeout( + name: string, + signalName: string, + limit: number, + timeoutMs: number, + ): Promise { + this.assertNotInProgress(); + this.checkEvicted(); + + this.entryInProgress = true; + try { + return await this.executeListenNWithTimeout( + name, + signalName, + limit, + timeoutMs, + ); + } finally { + this.entryInProgress = false; + } + } + + private async executeListenNWithTimeout( + name: string, + signalName: string, + limit: number, + timeoutMs: number, + ): Promise { + // Check for duplicate name in current execution + this.checkDuplicateName(name); + + // Use a sleep entry to store the deadline for replay + const sleepLocation = appendName(this.storage, this.currentLocation, `${name}:deadline`); + const sleepKey = locationToKey(this.storage, sleepLocation); + const existingSleep = this.storage.history.entries.get(sleepKey); + + this.markVisited(sleepKey); + + let deadline: number; + + if (existingSleep && existingSleep.kind.type === "sleep") { + // Replay: use stored deadline + deadline = existingSleep.kind.data.deadline; + } else { + // New execution: calculate and store deadline + deadline = Date.now() + timeoutMs; + const sleepEntry = createEntry(sleepLocation, { + type: "sleep", + data: { deadline, state: "pending" }, + }); + setEntry(this.storage, sleepLocation, sleepEntry); + sleepEntry.dirty = true; + } + + return this.executeListenNUntilImpl(name, signalName, limit, deadline); + } + + async listenNUntil( + name: string, + signalName: string, + limit: number, + timestampMs: number, + ): Promise { + this.assertNotInProgress(); + this.checkEvicted(); + + // Check for duplicate name in current execution + this.checkDuplicateName(name); + + this.entryInProgress = true; + try { + return await this.executeListenNUntilImpl(name, signalName, limit, timestampMs); + } finally { + this.entryInProgress = false; + } + } + + /** + * Internal implementation for listenNUntil with proper replay support. + * Stores the count and individual signals for deterministic replay. + */ + private async executeListenNUntilImpl( + name: string, + signalName: string, + limit: number, + deadline: number, + ): Promise { + // Check for replay: look for count entry + const countLocation = appendName( + this.storage, + this.currentLocation, + `${name}:count`, + ); + const countKey = locationToKey(this.storage, countLocation); + const existingCount = this.storage.history.entries.get(countKey); + + this.markVisited(countKey); + + if (existingCount && existingCount.kind.type === "signal") { + // Replay: read all recorded signals + const count = existingCount.kind.data.data as number; + const results: T[] = []; + + for (let i = 0; i < count; i++) { + const signalLocation = appendName( + this.storage, + this.currentLocation, + `${name}:${i}`, + ); + const signalKey = locationToKey(this.storage, signalLocation); + + this.markVisited(signalKey); + + const existingSignal = this.storage.history.entries.get(signalKey); + if (existingSignal && existingSignal.kind.type === "signal") { + results.push(existingSignal.kind.data.data as T); + } + } + + return results; + } + + // New execution: collect signals until timeout or limit reached + const results: T[] = []; + + for (let i = 0; i < limit; i++) { + const now = Date.now(); + if (now >= deadline) { + break; + } + + // Try to consume a signal + const signal = await consumeSignal(this.storage, this.driver, signalName); + if (!signal) { + // No signal available - check if we should wait + if (results.length === 0) { + // No signals yet - yield to scheduler until deadline + throw new SleepError(deadline); + } + // We have some signals - return what we have + break; + } + + // Record the signal + const signalLocation = appendName( + this.storage, + this.currentLocation, + `${name}:${i}`, + ); + const signalEntry = createEntry(signalLocation, { + type: "signal", + data: { name: signalName, data: signal.data }, + }); + setEntry(this.storage, signalLocation, signalEntry); + this.markVisited(locationToKey(this.storage, signalLocation)); + + results.push(signal.data as T); + } + + // Record the count for replay + const countEntry = createEntry(countLocation, { + type: "signal", + data: { name: `${signalName}:count`, data: results.length }, + }); + setEntry(this.storage, countLocation, countEntry); + + await flush(this.storage, this.driver); + + return results; + } + + // === Join === + + async join>>( + name: string, + branches: T, + ): Promise<{ [K in keyof T]: BranchOutput }> { + this.assertNotInProgress(); + this.checkEvicted(); + + this.entryInProgress = true; + try { + return await this.executeJoin(name, branches); + } finally { + this.entryInProgress = false; + } + } + + private async executeJoin>>( + name: string, + branches: T, + ): Promise<{ [K in keyof T]: BranchOutput }> { + // Check for duplicate name in current execution + this.checkDuplicateName(name); + + const location = appendName(this.storage, this.currentLocation, name); + const key = locationToKey(this.storage, location); + const existing = this.storage.history.entries.get(key); + + // Mark this entry as visited for validateComplete + this.markVisited(key); + + let entry: Entry; + + if (existing) { + if (existing.kind.type !== "join") { + throw new HistoryDivergedError( + `Expected join "${name}" at ${key}, found ${existing.kind.type}`, + ); + } + entry = existing; + } else { + entry = createEntry(location, { + type: "join", + data: { + branches: Object.fromEntries( + Object.keys(branches).map((k) => [k, { status: "pending" as const }]), + ), + }, + }); + setEntry(this.storage, location, entry); + entry.dirty = true; + } + + if (entry.kind.type !== "join") { + throw new HistoryDivergedError("Entry type mismatch"); + } + + const joinData = entry.kind.data; + const results: Record = {}; + const errors: Record = {}; + + // Execute all branches in parallel + const branchPromises = Object.entries(branches).map( + async ([branchName, config]) => { + const branchStatus = joinData.branches[branchName]; + + // Already completed + if (branchStatus.status === "completed") { + results[branchName] = branchStatus.output; + return; + } + + // Already failed + if (branchStatus.status === "failed") { + errors[branchName] = new Error(branchStatus.error); + return; + } + + // Execute branch + const branchLocation = appendName(this.storage, location, branchName); + const branchCtx = this.createBranch(branchLocation); + + branchStatus.status = "running"; + entry.dirty = true; + + try { + const output = await config.run(branchCtx); + branchCtx.validateComplete(); + + branchStatus.status = "completed"; + branchStatus.output = output; + results[branchName] = output; + } catch (error) { + branchStatus.status = "failed"; + branchStatus.error = String(error); + errors[branchName] = error as Error; + } + + entry.dirty = true; + }, + ); + + // Wait for ALL branches (no short-circuit on error) + await Promise.allSettled(branchPromises); + await flush(this.storage, this.driver); + + // Throw if any branches failed + if (Object.keys(errors).length > 0) { + throw new JoinError(errors); + } + + return results as { [K in keyof T]: BranchOutput }; + } + + // === Race === + + async race( + name: string, + branches: Array<{ name: string; run: (ctx: WorkflowContextInterface) => Promise }>, + ): Promise<{ winner: string; value: T }> { + this.assertNotInProgress(); + this.checkEvicted(); + + this.entryInProgress = true; + try { + return await this.executeRace(name, branches); + } finally { + this.entryInProgress = false; + } + } + + private async executeRace( + name: string, + branches: Array<{ name: string; run: (ctx: WorkflowContextInterface) => Promise }>, + ): Promise<{ winner: string; value: T }> { + // Check for duplicate name in current execution + this.checkDuplicateName(name); + + const location = appendName(this.storage, this.currentLocation, name); + const key = locationToKey(this.storage, location); + const existing = this.storage.history.entries.get(key); + + // Mark this entry as visited for validateComplete + this.markVisited(key); + + let entry: Entry; + + if (existing) { + if (existing.kind.type !== "race") { + throw new HistoryDivergedError( + `Expected race "${name}" at ${key}, found ${existing.kind.type}`, + ); + } + entry = existing; + + // Check if we already have a winner + const raceKind = existing.kind; + if (raceKind.data.winner !== null) { + const winnerStatus = raceKind.data.branches[raceKind.data.winner]; + return { + winner: raceKind.data.winner, + value: winnerStatus.output as T, + }; + } + } else { + entry = createEntry(location, { + type: "race", + data: { + winner: null, + branches: Object.fromEntries( + branches.map((b) => [b.name, { status: "pending" as const }]), + ), + }, + }); + setEntry(this.storage, location, entry); + entry.dirty = true; + } + + if (entry.kind.type !== "race") { + throw new HistoryDivergedError("Entry type mismatch"); + } + + const raceData = entry.kind.data; + + // Create abort controller for cancellation + const raceAbortController = new AbortController(); + + // Track all branch promises to wait for cleanup + const branchPromises: Promise[] = []; + + // Track winner info + let winnerName: string | null = null; + let winnerValue: T | null = null; + let settled = false; + let pendingCount = branches.length; + const errors: Record = {}; + const lateErrors: Array<{ name: string; error: string }> = []; + + // Check for replay winners first + for (const branch of branches) { + const branchStatus = raceData.branches[branch.name]; + if ( + branchStatus.status !== "pending" && + branchStatus.status !== "running" + ) { + pendingCount--; + if (branchStatus.status === "completed" && !settled) { + settled = true; + winnerName = branch.name; + winnerValue = branchStatus.output as T; + } + } + } + + // If we found a replay winner, return immediately + if (settled && winnerName !== null && winnerValue !== null) { + return { winner: winnerName, value: winnerValue }; + } + + // Execute branches that need to run + for (const branch of branches) { + const branchStatus = raceData.branches[branch.name]; + + // Skip already completed/cancelled + if ( + branchStatus.status !== "pending" && + branchStatus.status !== "running" + ) { + continue; + } + + const branchLocation = appendName(this.storage, location, branch.name); + const branchCtx = this.createBranch(branchLocation, raceAbortController); + + branchStatus.status = "running"; + entry.dirty = true; + + const branchPromise = branch.run(branchCtx).then( + async (output) => { + if (settled) { + // This branch completed after a winner was determined + // Still record the completion for observability + branchStatus.status = "completed"; + branchStatus.output = output; + entry.dirty = true; + return; + } + settled = true; + winnerName = branch.name; + winnerValue = output; + + branchCtx.validateComplete(); + + branchStatus.status = "completed"; + branchStatus.output = output; + raceData.winner = branch.name; + entry.dirty = true; + + // Cancel other branches + raceAbortController.abort(); + }, + (error) => { + pendingCount--; + + if (error instanceof CancelledError || error instanceof EvictedError) { + branchStatus.status = "cancelled"; + } else { + branchStatus.status = "failed"; + branchStatus.error = String(error); + + if (settled) { + // Track late errors for observability + lateErrors.push({ name: branch.name, error: String(error) }); + } else { + errors[branch.name] = error; + } + } + entry.dirty = true; + + // All branches failed (only if no winner yet) + if (pendingCount === 0 && !settled) { + settled = true; + } + }, + ); + + branchPromises.push(branchPromise); + } + + // Wait for all branches to complete or be cancelled + await Promise.allSettled(branchPromises); + + // Clean up entries from non-winning branches + if (winnerName !== null) { + for (const branch of branches) { + if (branch.name !== winnerName) { + const branchLocation = appendName(this.storage, location, branch.name); + await deleteEntriesWithPrefix(this.storage, this.driver, branchLocation); + } + } + } + + // Flush final state + await flush(this.storage, this.driver); + + // Log late errors if any (these occurred after a winner was determined) + if (lateErrors.length > 0) { + console.warn( + `Race "${name}" had ${lateErrors.length} branch(es) fail after winner was determined:`, + lateErrors, + ); + } + + // Return result or throw error + if (winnerName !== null && winnerValue !== null) { + return { winner: winnerName, value: winnerValue }; + } + + // All branches failed + throw new RaceError( + "All branches failed", + Object.entries(errors).map(([name, error]) => ({ + name, + error: String(error), + })), + ); + } + + // === Removed === + + async removed(name: string, originalType: EntryKindType): Promise { + this.assertNotInProgress(); + this.checkEvicted(); + + this.entryInProgress = true; + try { + await this.executeRemoved(name, originalType); + } finally { + this.entryInProgress = false; + } + } + + private async executeRemoved( + name: string, + originalType: EntryKindType, + ): Promise { + // Check for duplicate name in current execution + this.checkDuplicateName(name); + + const location = appendName(this.storage, this.currentLocation, name); + const key = locationToKey(this.storage, location); + const existing = this.storage.history.entries.get(key); + + // Mark this entry as visited for validateComplete + this.markVisited(key); + + if (existing) { + // Validate the existing entry matches what we expect + if ( + existing.kind.type !== "removed" && + existing.kind.type !== originalType + ) { + throw new HistoryDivergedError( + `Expected ${originalType} or removed at ${key}, found ${existing.kind.type}`, + ); + } + + // If it's not already marked as removed, we just skip it + return; + } + + // Create a removed entry placeholder + const entry = createEntry(location, { + type: "removed", + data: { originalType, originalName: name }, + }); + setEntry(this.storage, location, entry); + await flush(this.storage, this.driver); + } +} diff --git a/rivetkit-typescript/packages/workflow-engine/src/driver.ts b/rivetkit-typescript/packages/workflow-engine/src/driver.ts new file mode 100644 index 0000000000..f08d596017 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/driver.ts @@ -0,0 +1,88 @@ +/** + * A key-value entry returned from list operations. + */ +export interface KVEntry { + key: Uint8Array; + value: Uint8Array; +} + +/** + * A write operation for batch writes. + */ +export interface KVWrite { + key: Uint8Array; + value: Uint8Array; +} + +/** + * The engine driver provides the KV and scheduling interface. + * Implementations must provide these methods to integrate with different backends. + * + * IMPORTANT: Each workflow instance must have its own isolated driver/KV namespace. + * The workflow engine is the sole reader/writer of its KV during execution. + * KV operations do not include workflow IDs because isolation is provided externally + * by the host system (e.g., Cloudflare Durable Objects, dedicated actor processes). + * + * External systems may only write signals to the KV (via WorkflowHandle.signal()). + * See architecture.md "Isolation Model" for details. + */ +export interface EngineDriver { + // === KV Operations === + + /** + * Get a value by key. + * Returns null if the key doesn't exist. + */ + get(key: Uint8Array): Promise; + + /** + * Set a value by key. + */ + set(key: Uint8Array, value: Uint8Array): Promise; + + /** + * Delete a key. + */ + delete(key: Uint8Array): Promise; + + /** + * Delete all keys with a given prefix. + */ + deletePrefix(prefix: Uint8Array): Promise; + + /** + * List all key-value pairs with a given prefix. + * + * IMPORTANT: Results MUST be sorted by key in lexicographic byte order. + * The workflow engine relies on this ordering for correct signal FIFO + * processing and name registry reconstruction. Failing to sort will + * cause non-deterministic replay behavior. + */ + list(prefix: Uint8Array): Promise; + + /** + * Batch write multiple key-value pairs. + * Should be atomic if possible. + */ + batch(writes: KVWrite[]): Promise; + + // === Scheduling === + + /** + * Set an alarm to wake the workflow at a specific time. + * @param workflowId The workflow to wake + * @param wakeAt Timestamp in milliseconds when to wake + */ + setAlarm(workflowId: string, wakeAt: number): Promise; + + /** + * Clear any pending alarm for a workflow. + */ + clearAlarm(workflowId: string): Promise; + + /** + * How often the worker polls for work (in milliseconds). + * Affects the threshold for in-memory vs scheduled sleeps. + */ + readonly workerPollInterval: number; +} diff --git a/rivetkit-typescript/packages/workflow-engine/src/errors.ts b/rivetkit-typescript/packages/workflow-engine/src/errors.ts new file mode 100644 index 0000000000..503a3849be --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/errors.ts @@ -0,0 +1,130 @@ +/** + * Thrown from steps to prevent retry. + * Use this when an error is unrecoverable and retrying would be pointless. + */ +export class CriticalError extends Error { + constructor(message: string) { + super(message); + this.name = "CriticalError"; + } +} + +/** + * Internal: Workflow should sleep until deadline. + * This is thrown to yield control back to the scheduler. + */ +export class SleepError extends Error { + constructor(public readonly deadline: number) { + super(`Sleeping until ${deadline}`); + this.name = "SleepError"; + } +} + +/** + * Internal: Workflow is waiting for signals. + * This is thrown to yield control back to the scheduler. + */ +export class SignalWaitError extends Error { + constructor(public readonly signalNames: string[]) { + super(`Waiting for signals: ${signalNames.join(", ")}`); + this.name = "SignalWaitError"; + } +} + +/** + * Internal: Workflow was evicted. + * This is thrown when the workflow is being gracefully stopped. + */ +export class EvictedError extends Error { + constructor() { + super("Workflow evicted"); + this.name = "EvictedError"; + } +} + +/** + * Workflow code changed incompatibly. + * Thrown when history doesn't match the current workflow code. + */ +export class HistoryDivergedError extends Error { + constructor(message: string) { + super(message); + this.name = "HistoryDivergedError"; + } +} + +/** + * Step exhausted all retries. + */ +export class StepExhaustedError extends Error { + constructor( + public readonly stepName: string, + public readonly lastError?: string, + ) { + super(`Step "${stepName}" exhausted retries: ${lastError ?? "unknown error"}`); + this.name = "StepExhaustedError"; + } +} + +/** + * Step failed (will be retried). + * Internal error used to trigger retry logic. + */ +export class StepFailedError extends Error { + constructor( + public readonly stepName: string, + public readonly originalError: unknown, + public readonly attempts: number, + ) { + super(`Step "${stepName}" failed (attempt ${attempts})`); + this.name = "StepFailedError"; + this.cause = originalError; + } +} + +/** + * Join had branch failures. + */ +export class JoinError extends Error { + constructor(public readonly errors: Record) { + super(`Join failed: ${Object.keys(errors).join(", ")}`); + this.name = "JoinError"; + } +} + +/** + * Race had all branches fail. + */ +export class RaceError extends Error { + constructor( + message: string, + public readonly errors: Array<{ name: string; error: string }>, + ) { + super(message); + this.name = "RaceError"; + } +} + +/** + * Branch was cancelled (used by race). + */ +export class CancelledError extends Error { + constructor() { + super("Branch cancelled"); + this.name = "CancelledError"; + } +} + +/** + * Entry is currently being processed. + * Thrown when user forgets to await a step. + */ +export class EntryInProgressError extends Error { + constructor() { + super( + "Cannot start a new workflow entry while another is in progress. " + + "Did you forget to await the previous step/loop/sleep?", + ); + this.name = "EntryInProgressError"; + } +} diff --git a/rivetkit-typescript/packages/workflow-engine/src/index.ts b/rivetkit-typescript/packages/workflow-engine/src/index.ts new file mode 100644 index 0000000000..6b862776aa --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/index.ts @@ -0,0 +1,366 @@ +// Types +export type { + BranchConfig, + BranchOutput, + BranchStatus, + Entry, + EntryKind, + EntryKindType, + EntryMetadata, + EntryStatus, + History, + JoinEntry, + Location, + LoopConfig, + LoopEntry, + LoopIterationMarker, + LoopResult, + NameIndex, + PathSegment, + RaceEntry, + RemovedEntry, + Signal, + SignalEntry, + SleepEntry, + SleepState, + StepConfig, + StepEntry, + Storage, + WorkflowContextInterface, + WorkflowFunction, + WorkflowHandle, + WorkflowResult, + WorkflowState, +} from "./types.js"; + +// Errors +export { + CancelledError, + CriticalError, + EntryInProgressError, + EvictedError, + HistoryDivergedError, + JoinError, + RaceError, + SignalWaitError, + SleepError, + StepExhaustedError, + StepFailedError, +} from "./errors.js"; + +// Driver +export type { EngineDriver, KVEntry, KVWrite } from "./driver.js"; + +// Location utilities +export { + appendLoopIteration, + appendName, + emptyLocation, + isLocationPrefix, + isLoopIterationMarker, + locationToKey, + locationsEqual, + parentLocation, + registerName, + resolveName, +} from "./location.js"; + +// Storage utilities +export { + addSignal, + consumeSignal, + consumeSignals, + createEntry, + createStorage, + deleteEntriesWithPrefix, + flush, + generateId, + getEntry, + getOrCreateMetadata, + loadMetadata, + loadStorage, + setEntry, +} from "./storage.js"; + +// Context +export { + WorkflowContextImpl, + DEFAULT_MAX_RETRIES, + DEFAULT_RETRY_BACKOFF_BASE, + DEFAULT_RETRY_BACKOFF_MAX, + DEFAULT_LOOP_COMMIT_INTERVAL, + DEFAULT_STEP_TIMEOUT, +} from "./context.js"; + +// Loop result helpers +export const Loop = { + continue: (state: S): { continue: true; state: S } => ({ + continue: true, + state, + }), + break: (value: T): { break: true; value: T } => ({ + break: true, + value, + }), +}; + +// Main workflow runner +import type { EngineDriver } from "./driver.js"; +import { + EvictedError, + SignalWaitError, + SleepError, + StepFailedError, +} from "./errors.js"; +import { + deserializeWorkflowInput, + deserializeWorkflowOutput, + deserializeWorkflowState, + serializeSignal, + serializeWorkflowInput, + serializeWorkflowState, +} from "../schemas/serde.js"; +import { + buildSignalKey, + buildSignalPrefix, + buildWorkflowInputKey, + buildWorkflowOutputKey, + buildWorkflowStateKey, +} from "./keys.js"; +import { flush, loadStorage } from "./storage.js"; +import type { + WorkflowFunction, + WorkflowHandle, + WorkflowResult, + WorkflowState, +} from "./types.js"; +import { WorkflowContextImpl } from "./context.js"; +import { generateId } from "./storage.js"; + + +/** + * Run a workflow and return a handle for managing it. + * + * The workflow starts executing immediately. Use the returned handle to: + * - `handle.result` - Await workflow completion or yield + * - `handle.signal()` - Send signals to the workflow + * - `handle.wake()` - Wake the workflow early + * - `handle.evict()` - Request graceful shutdown + * - `handle.getOutput()` / `handle.getState()` - Query status + */ +export function runWorkflow( + workflowId: string, + workflowFn: WorkflowFunction, + input: TInput, + driver: EngineDriver, +): WorkflowHandle { + const abortController = new AbortController(); + + // Start workflow execution (runs in background) + const resultPromise = executeWorkflow( + workflowId, + workflowFn, + input, + driver, + abortController, + ); + + return { + workflowId, + result: resultPromise, + + async signal(name: string, data: unknown): Promise { + // Use unique ID to avoid race conditions when signaling concurrently + const signalId = generateId(); + await driver.set( + buildSignalKey(signalId), + serializeSignal({ + id: signalId, + name, + data, + sentAt: Date.now(), + }), + ); + }, + + async wake(): Promise { + await driver.setAlarm(workflowId, Date.now()); + }, + + evict(): void { + abortController.abort(new EvictedError()); + }, + + async cancel(): Promise { + // Evict the workflow first + abortController.abort(new EvictedError()); + + // Set the workflow state to cancelled + await driver.set( + buildWorkflowStateKey(), + serializeWorkflowState("cancelled"), + ); + + // Clear any pending alarms + await driver.clearAlarm(workflowId); + }, + + async getOutput(): Promise { + const value = await driver.get(buildWorkflowOutputKey()); + if (!value) { + return undefined; + } + return deserializeWorkflowOutput(value); + }, + + async getState(): Promise { + const value = await driver.get(buildWorkflowStateKey()); + if (!value) { + return "pending"; + } + return deserializeWorkflowState(value); + }, + }; +} + +/** + * Internal: Execute the workflow and return the result. + */ +async function executeWorkflow( + workflowId: string, + workflowFn: WorkflowFunction, + input: TInput, + driver: EngineDriver, + abortController: AbortController, +): Promise> { + const storage = await loadStorage(driver); + + // Check if workflow was cancelled + if (storage.state === "cancelled") { + throw new EvictedError(); + } + + // Input persistence: store on first run, use stored input on resume + const storedInputBytes = await driver.get(buildWorkflowInputKey()); + let effectiveInput: TInput; + + if (storedInputBytes) { + // Resume: use stored input for deterministic replay + effectiveInput = deserializeWorkflowInput(storedInputBytes); + } else { + // First run: store the input + effectiveInput = input; + await driver.set(buildWorkflowInputKey(), serializeWorkflowInput(input)); + } + + const ctx = new WorkflowContextImpl( + workflowId, + storage, + driver, + undefined, + abortController, + ); + + storage.state = "running"; + + try { + const output = await workflowFn(ctx, effectiveInput); + + storage.state = "completed"; + storage.output = output; + await flush(storage, driver); + await driver.clearAlarm(workflowId); + + return { state: "completed", output }; + } catch (error) { + if (error instanceof SleepError) { + storage.state = "sleeping"; + await flush(storage, driver); + await driver.setAlarm(workflowId, error.deadline); + + return { state: "sleeping", sleepUntil: error.deadline }; + } + + if (error instanceof SignalWaitError) { + storage.state = "sleeping"; + await flush(storage, driver); + + return { state: "sleeping", waitingForSignals: error.signalNames }; + } + + if (error instanceof EvictedError) { + // Just save state, workflow will be resumed elsewhere + await flush(storage, driver); + return { state: storage.state }; + } + + if (error instanceof StepFailedError) { + // Step failed but can be retried - yield to scheduler + storage.state = "sleeping"; + await flush(storage, driver); + + // Set minimal alarm for retry (backoff is handled in executeStep) + const retryAt = Date.now() + 100; + await driver.setAlarm(workflowId, retryAt); + + return { state: "sleeping", sleepUntil: retryAt }; + } + + // Unrecoverable error + storage.state = "failed"; + storage.error = extractErrorInfo(error); + await flush(storage, driver); + + throw error; + } +} + +/** + * Extract structured error information from an error. + */ +function extractErrorInfo(error: unknown): { + name: string; + message: string; + stack?: string; + metadata?: Record; +} { + if (error instanceof Error) { + const result: { + name: string; + message: string; + stack?: string; + metadata?: Record; + } = { + name: error.name, + message: error.message, + stack: error.stack, + }; + + // Extract custom properties from error + const metadata: Record = {}; + for (const key of Object.keys(error)) { + if (key !== "name" && key !== "message" && key !== "stack") { + const value = (error as unknown as Record)[key]; + // Only include serializable values + if ( + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" || + value === null + ) { + metadata[key] = value; + } + } + } + if (Object.keys(metadata).length > 0) { + result.metadata = metadata; + } + + return result; + } + + return { + name: "Error", + message: String(error), + }; +} diff --git a/rivetkit-typescript/packages/workflow-engine/src/keys.ts b/rivetkit-typescript/packages/workflow-engine/src/keys.ts new file mode 100644 index 0000000000..494f506928 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/keys.ts @@ -0,0 +1,309 @@ +/** + * Binary key encoding/decoding using fdb-tuple. + * All keys are encoded as tuples with integer prefixes for proper sorting. + */ + +import * as tuple from "fdb-tuple"; +import type { Location, LoopIterationMarker, PathSegment } from "./types.js"; + +// === Key Prefixes === +// Using integers for compact encoding and proper sorting + +export const KEY_PREFIX = { + NAMES: 1, // Name registry: [1, index] + HISTORY: 2, // History entries: [2, ...locationSegments] + SIGNALS: 3, // Signal queue: [3, index] + WORKFLOW: 4, // Workflow metadata: [4, field] + ENTRY_METADATA: 5, // Entry metadata: [5, entryId] +} as const; + +// Workflow metadata field identifiers +export const WORKFLOW_FIELD = { + STATE: 1, + OUTPUT: 2, + ERROR: 3, + VERSION: 4, + INPUT: 5, +} as const; + +// === Type Definitions === + +// fdb-tuple's TupleItem type - we use a subset +type TupleItem = string | number | boolean | null | TupleItem[]; + +// === Location Segment Encoding === + +/** + * Convert a path segment to tuple elements. + * - NameIndex (number) → just the number + * - LoopIterationMarker → nested tuple [loopIdx, iteration] + */ +function segmentToTuple(segment: PathSegment): TupleItem { + if (typeof segment === "number") { + return segment; + } + // LoopIterationMarker + return [segment.loop, segment.iteration]; +} + +/** + * Convert tuple elements back to a path segment. + */ +function tupleToSegment(element: TupleItem): PathSegment { + if (typeof element === "number") { + return element; + } + if (Array.isArray(element) && element.length === 2) { + const [loop, iteration] = element; + if (typeof loop === "number" && typeof iteration === "number") { + return { loop, iteration } as LoopIterationMarker; + } + } + throw new Error(`Invalid path segment tuple element: ${JSON.stringify(element)}`); +} + +/** + * Convert a location to tuple elements. + */ +function locationToTupleElements(location: Location): TupleItem[] { + return location.map(segmentToTuple); +} + +/** + * Convert tuple elements back to a location. + */ +function tupleElementsToLocation(elements: TupleItem[]): Location { + return elements.map(tupleToSegment); +} + +// === Helper Functions === + +/** + * Convert Buffer to Uint8Array. + */ +function bufferToUint8Array(buf: Buffer): Uint8Array { + return new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength); +} + +/** + * Convert Uint8Array to Buffer. + */ +function uint8ArrayToBuffer(arr: Uint8Array): Buffer { + return Buffer.from(arr.buffer, arr.byteOffset, arr.byteLength); +} + +/** + * Pack tuple items and return as Uint8Array. + */ +function pack(items: TupleItem | TupleItem[]): Uint8Array { + const buf = tuple.pack(items); + return bufferToUint8Array(buf); +} + +/** + * Unpack a Uint8Array and return tuple items. + */ +function unpack(data: Uint8Array): TupleItem[] { + const buf = uint8ArrayToBuffer(data); + return tuple.unpack(buf) as TupleItem[]; +} + +// === Key Builders === + +/** + * Build a key for the name registry. + * Key: [1, index] + */ +export function buildNameKey(index: number): Uint8Array { + return pack([KEY_PREFIX.NAMES, index]); +} + +/** + * Build a prefix for listing all names. + * Prefix: [1] + */ +export function buildNamePrefix(): Uint8Array { + return pack([KEY_PREFIX.NAMES]); +} + +/** + * Build a key for a history entry. + * Key: [2, ...locationSegments] + */ +export function buildHistoryKey(location: Location): Uint8Array { + return pack([KEY_PREFIX.HISTORY, ...locationToTupleElements(location)]); +} + +/** + * Build a prefix for listing history entries under a location. + * Prefix: [2, ...locationSegments] + */ +export function buildHistoryPrefix(location: Location): Uint8Array { + return pack([KEY_PREFIX.HISTORY, ...locationToTupleElements(location)]); +} + +/** + * Build a prefix for listing all history entries. + * Prefix: [2] + */ +export function buildHistoryPrefixAll(): Uint8Array { + return pack([KEY_PREFIX.HISTORY]); +} + +/** + * Build a key for a signal. + * Key: [3, signalId] + * + * Signal IDs can be either: + * - A string UUID (for new signals added via handle.signal()) + * - Used with the prefix to list all signals + */ +export function buildSignalKey(signalId: string): Uint8Array { + return pack([KEY_PREFIX.SIGNALS, signalId]); +} + +/** + * Build a prefix for listing all signals. + * Prefix: [3] + */ +export function buildSignalPrefix(): Uint8Array { + return pack([KEY_PREFIX.SIGNALS]); +} + +/** + * Build a key for workflow state. + * Key: [4, 1] + */ +export function buildWorkflowStateKey(): Uint8Array { + return pack([KEY_PREFIX.WORKFLOW, WORKFLOW_FIELD.STATE]); +} + +/** + * Build a key for workflow output. + * Key: [4, 2] + */ +export function buildWorkflowOutputKey(): Uint8Array { + return pack([KEY_PREFIX.WORKFLOW, WORKFLOW_FIELD.OUTPUT]); +} + +/** + * Build a key for workflow error. + * Key: [4, 3] + */ +export function buildWorkflowErrorKey(): Uint8Array { + return pack([KEY_PREFIX.WORKFLOW, WORKFLOW_FIELD.ERROR]); +} + +/** + * Build a key for workflow input. + * Key: [4, 5] + */ +export function buildWorkflowInputKey(): Uint8Array { + return pack([KEY_PREFIX.WORKFLOW, WORKFLOW_FIELD.INPUT]); +} + +/** + * Build a key for entry metadata. + * Key: [5, entryId] + */ +export function buildEntryMetadataKey(entryId: string): Uint8Array { + return pack([KEY_PREFIX.ENTRY_METADATA, entryId]); +} + +/** + * Build a prefix for listing all entry metadata. + * Prefix: [5] + */ +export function buildEntryMetadataPrefix(): Uint8Array { + return pack([KEY_PREFIX.ENTRY_METADATA]); +} + +// === Key Parsers === + +/** + * Parse a name key and return the index. + * Key: [1, index] → index + */ +export function parseNameKey(key: Uint8Array): number { + const elements = unpack(key); + if (elements.length !== 2 || elements[0] !== KEY_PREFIX.NAMES) { + throw new Error("Invalid name key"); + } + return elements[1] as number; +} + +/** + * Parse a history key and return the location. + * Key: [2, ...segments] → Location + */ +export function parseHistoryKey(key: Uint8Array): Location { + const elements = unpack(key); + if (elements.length < 1 || elements[0] !== KEY_PREFIX.HISTORY) { + throw new Error("Invalid history key"); + } + return tupleElementsToLocation(elements.slice(1)); +} + +/** + * Parse a signal key and return the signal ID. + * Key: [3, signalId] → signalId + */ +export function parseSignalKey(key: Uint8Array): string { + const elements = unpack(key); + if (elements.length !== 2 || elements[0] !== KEY_PREFIX.SIGNALS) { + throw new Error("Invalid signal key"); + } + return elements[1] as string; +} + +/** + * Parse an entry metadata key and return the entry ID. + * Key: [5, entryId] → entryId + */ +export function parseEntryMetadataKey(key: Uint8Array): string { + const elements = unpack(key); + if (elements.length !== 2 || elements[0] !== KEY_PREFIX.ENTRY_METADATA) { + throw new Error("Invalid entry metadata key"); + } + return elements[1] as string; +} + +// === Key Comparison Utilities === + +/** + * Check if a key starts with a prefix. + */ +export function keyStartsWith(key: Uint8Array, prefix: Uint8Array): boolean { + if (key.length < prefix.length) { + return false; + } + for (let i = 0; i < prefix.length; i++) { + if (key[i] !== prefix[i]) { + return false; + } + } + return true; +} + +/** + * Compare two keys lexicographically. + * Returns negative if a < b, 0 if a === b, positive if a > b. + */ +export function compareKeys(a: Uint8Array, b: Uint8Array): number { + const minLen = Math.min(a.length, b.length); + for (let i = 0; i < minLen; i++) { + if (a[i] !== b[i]) { + return a[i] - b[i]; + } + } + return a.length - b.length; +} + +/** + * Convert a key to a hex string for debugging. + */ +export function keyToHex(key: Uint8Array): string { + return Array.from(key) + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); +} diff --git a/rivetkit-typescript/packages/workflow-engine/src/location.ts b/rivetkit-typescript/packages/workflow-engine/src/location.ts new file mode 100644 index 0000000000..9898621e71 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/location.ts @@ -0,0 +1,168 @@ +import type { + Location, + LoopIterationMarker, + NameIndex, + PathSegment, + Storage, +} from "./types.js"; + +/** + * Check if a path segment is a loop iteration marker. + */ +export function isLoopIterationMarker( + segment: PathSegment, +): segment is LoopIterationMarker { + return typeof segment === "object" && "loop" in segment; +} + +/** + * Register a name in the registry and return its index. + * If the name already exists, returns the existing index. + */ +export function registerName(storage: Storage, name: string): NameIndex { + const existing = storage.nameRegistry.indexOf(name); + if (existing !== -1) { + return existing; + } + storage.nameRegistry.push(name); + return storage.nameRegistry.length - 1; +} + +/** + * Resolve a name index to its string value. + */ +export function resolveName(storage: Storage, index: NameIndex): string { + const name = storage.nameRegistry[index]; + if (name === undefined) { + throw new Error(`Name index ${index} not found in registry`); + } + return name; +} + +/** + * Convert a location to a KV key string. + * Named entries use their string name, loop iterations use ~N format. + */ +export function locationToKey(storage: Storage, location: Location): string { + return location + .map((segment) => { + if (typeof segment === "number") { + return resolveName(storage, segment); + } + return `~${segment.iteration}`; + }) + .join("/"); +} + +/** + * Append a named segment to a location. + */ +export function appendName( + storage: Storage, + location: Location, + name: string, +): Location { + const nameIndex = registerName(storage, name); + return [...location, nameIndex]; +} + +/** + * Append a loop iteration segment to a location. + */ +export function appendLoopIteration( + storage: Storage, + location: Location, + loopName: string, + iteration: number, +): Location { + const loopIndex = registerName(storage, loopName); + return [...location, { loop: loopIndex, iteration }]; +} + +/** + * Create an empty location (root). + */ +export function emptyLocation(): Location { + return []; +} + +/** + * Get the parent location (all segments except the last). + */ +export function parentLocation(location: Location): Location { + return location.slice(0, -1); +} + +/** + * Check if one location is a prefix of another. + */ +export function isLocationPrefix( + prefix: Location, + location: Location, +): boolean { + if (prefix.length > location.length) { + return false; + } + for (let i = 0; i < prefix.length; i++) { + const prefixSegment = prefix[i]; + const locationSegment = location[i]; + + if (typeof prefixSegment === "number" && typeof locationSegment === "number") { + if (prefixSegment !== locationSegment) { + return false; + } + } else if ( + isLoopIterationMarker(prefixSegment) && + isLoopIterationMarker(locationSegment) + ) { + if ( + prefixSegment.loop !== locationSegment.loop || + prefixSegment.iteration !== locationSegment.iteration + ) { + return false; + } + } else { + return false; + } + } + return true; +} + +/** + * Compare two locations for equality. + */ +export function locationsEqual(a: Location, b: Location): boolean { + if (a.length !== b.length) { + return false; + } + return isLocationPrefix(a, b); +} + +/** + * Get all entry keys that are children of a given location. + * + * Note: Returns a map of key → entry for convenience, not key → location. + * The location can be retrieved from the entry itself via entry.location. + */ +export function getChildEntries( + storage: Storage, + parentLoc: Location, +): Map { + const parentKey = locationToKey(storage, parentLoc); + const children = new Map(); + + for (const [key, entry] of storage.history.entries) { + // Handle empty parent (root) - all entries are children + const isChild = + parentKey === "" + ? true + : key.startsWith(parentKey + "/") || key === parentKey; + + if (isChild) { + // Return the actual entry's location, not the parent location + children.set(key, entry.location); + } + } + + return children; +} diff --git a/rivetkit-typescript/packages/workflow-engine/src/storage.ts b/rivetkit-typescript/packages/workflow-engine/src/storage.ts new file mode 100644 index 0000000000..b4e1a16ff7 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/storage.ts @@ -0,0 +1,414 @@ +import type { EngineDriver, KVWrite } from "./driver.js"; +import { + buildNameKey, + buildNamePrefix, + buildHistoryKey, + buildHistoryPrefix, + buildHistoryPrefixAll, + buildSignalKey, + buildSignalPrefix, + buildWorkflowStateKey, + buildWorkflowOutputKey, + buildWorkflowErrorKey, + buildEntryMetadataKey, + parseNameKey, + compareKeys, +} from "./keys.js"; +import { isLocationPrefix, locationToKey } from "./location.js"; +import { + deserializeEntry, + deserializeEntryMetadata, + deserializeName, + deserializeSignal, + deserializeWorkflowError, + deserializeWorkflowOutput, + deserializeWorkflowState, + serializeEntry, + serializeEntryMetadata, + serializeName, + serializeSignal, + serializeWorkflowError, + serializeWorkflowOutput, + serializeWorkflowState, +} from "../schemas/serde.js"; +import type { + Entry, + EntryKind, + EntryMetadata, + Location, + Signal, + Storage, +} from "./types.js"; + +/** + * Create an empty storage instance. + */ +export function createStorage(): Storage { + return { + nameRegistry: [], + flushedNameCount: 0, + history: { entries: new Map() }, + entryMetadata: new Map(), + signals: [], + output: undefined, + state: "pending", + flushedState: undefined, + error: undefined, + flushedError: undefined, + flushedOutput: undefined, + }; +} + +/** + * Generate a UUID v4. + */ +export function generateId(): string { + return crypto.randomUUID(); +} + +/** + * Create a new entry. + */ +export function createEntry(location: Location, kind: EntryKind): Entry { + return { + id: generateId(), + location, + kind, + dirty: true, + }; +} + +/** + * Create or get metadata for an entry. + */ +export function getOrCreateMetadata( + storage: Storage, + entryId: string, +): EntryMetadata { + let metadata = storage.entryMetadata.get(entryId); + if (!metadata) { + metadata = { + status: "pending", + attempts: 0, + lastAttemptAt: 0, + createdAt: Date.now(), + dirty: true, + }; + storage.entryMetadata.set(entryId, metadata); + } + return metadata; +} + + +/** + * Load storage from the driver. + */ +export async function loadStorage(driver: EngineDriver): Promise { + const storage = createStorage(); + + // Load name registry + const nameEntries = await driver.list(buildNamePrefix()); + // Sort by index to ensure correct order + nameEntries.sort((a, b) => compareKeys(a.key, b.key)); + for (const entry of nameEntries) { + const index = parseNameKey(entry.key); + storage.nameRegistry[index] = deserializeName(entry.value); + } + // Track how many names are already persisted + storage.flushedNameCount = storage.nameRegistry.length; + + // Load history entries + const historyEntries = await driver.list(buildHistoryPrefixAll()); + for (const entry of historyEntries) { + const parsed = deserializeEntry(entry.value); + parsed.dirty = false; + // Use locationToKey to match how context.ts looks up entries + const key = locationToKey(storage, parsed.location); + storage.history.entries.set(key, parsed); + } + + // Load signals + const signalEntries = await driver.list(buildSignalPrefix()); + // Sort by index to ensure correct FIFO order + signalEntries.sort((a, b) => compareKeys(a.key, b.key)); + for (const entry of signalEntries) { + const signal = deserializeSignal(entry.value); + storage.signals.push(signal); + } + + // Load workflow state + const stateValue = await driver.get(buildWorkflowStateKey()); + if (stateValue) { + storage.state = deserializeWorkflowState(stateValue); + storage.flushedState = storage.state; + } + + // Load output if present + const outputValue = await driver.get(buildWorkflowOutputKey()); + if (outputValue) { + storage.output = deserializeWorkflowOutput(outputValue); + storage.flushedOutput = storage.output; + } + + // Load error if present + const errorValue = await driver.get(buildWorkflowErrorKey()); + if (errorValue) { + storage.error = deserializeWorkflowError(errorValue); + storage.flushedError = storage.error; + } + + return storage; +} + +/** + * Load metadata for an entry (lazy loading). + */ +export async function loadMetadata( + storage: Storage, + driver: EngineDriver, + entryId: string, +): Promise { + // Check if already loaded + const existing = storage.entryMetadata.get(entryId); + if (existing) { + return existing; + } + + // Load from driver + const value = await driver.get(buildEntryMetadataKey(entryId)); + if (value) { + const metadata = deserializeEntryMetadata(value); + metadata.dirty = false; + storage.entryMetadata.set(entryId, metadata); + return metadata; + } + + // Create new metadata + return getOrCreateMetadata(storage, entryId); +} + +/** + * Flush all dirty data to the driver. + */ +export async function flush( + storage: Storage, + driver: EngineDriver, +): Promise { + const writes: KVWrite[] = []; + + // Flush only new names (those added since last flush) + for (let i = storage.flushedNameCount; i < storage.nameRegistry.length; i++) { + const name = storage.nameRegistry[i]; + if (name !== undefined) { + writes.push({ + key: buildNameKey(i), + value: serializeName(name), + }); + } + } + + // Flush dirty entries + for (const [, entry] of storage.history.entries) { + if (entry.dirty) { + writes.push({ + key: buildHistoryKey(entry.location), + value: serializeEntry(entry), + }); + entry.dirty = false; + } + } + + // Flush dirty metadata + for (const [id, metadata] of storage.entryMetadata) { + if (metadata.dirty) { + writes.push({ + key: buildEntryMetadataKey(id), + value: serializeEntryMetadata(metadata), + }); + metadata.dirty = false; + } + } + + // Flush workflow state if changed + if (storage.state !== storage.flushedState) { + writes.push({ + key: buildWorkflowStateKey(), + value: serializeWorkflowState(storage.state), + }); + } + + // Flush output if changed + if (storage.output !== undefined && storage.output !== storage.flushedOutput) { + writes.push({ + key: buildWorkflowOutputKey(), + value: serializeWorkflowOutput(storage.output), + }); + } + + // Flush error if changed (compare by message since objects aren't reference-equal) + const errorChanged = storage.error !== undefined && + (storage.flushedError === undefined || + storage.error.name !== storage.flushedError.name || + storage.error.message !== storage.flushedError.message); + if (errorChanged) { + writes.push({ + key: buildWorkflowErrorKey(), + value: serializeWorkflowError(storage.error!), + }); + } + + if (writes.length > 0) { + await driver.batch(writes); + } + + // Update flushed tracking after successful write + storage.flushedNameCount = storage.nameRegistry.length; + storage.flushedState = storage.state; + storage.flushedOutput = storage.output; + storage.flushedError = storage.error; +} + +/** + * Add a signal to the queue. + */ +export async function addSignal( + storage: Storage, + driver: EngineDriver, + name: string, + data: unknown, +): Promise { + const signal: Signal = { + id: generateId(), + name, + data, + sentAt: Date.now(), + }; + + storage.signals.push(signal); + + // Persist immediately using signal's unique ID as key + await driver.set(buildSignalKey(signal.id), serializeSignal(signal)); +} + +/** + * Consume a signal from the queue. + * Returns null if no matching signal is found. + * Deletes from driver first to prevent duplicates on failure. + */ +export async function consumeSignal( + storage: Storage, + driver: EngineDriver, + signalName: string, +): Promise { + const index = storage.signals.findIndex((s) => s.name === signalName); + if (index === -1) { + return null; + } + + const signal = storage.signals[index]; + + // Delete from driver first - if this fails, memory is unchanged + await driver.delete(buildSignalKey(signal.id)); + + // Only remove from memory after successful driver deletion + storage.signals.splice(index, 1); + + return signal; +} + +/** + * Consume up to N signals from the queue. + * + * Uses allSettled to handle partial failures gracefully: + * - If all deletes succeed, signals are removed from memory + * - If some deletes fail, only successfully deleted signals are removed + * - On next load, failed signals will be re-read from KV + */ +export async function consumeSignals( + storage: Storage, + driver: EngineDriver, + signalName: string, + limit: number, +): Promise { + // Find all matching signals up to limit (don't modify memory yet) + const toConsume: { signal: Signal; index: number }[] = []; + let count = 0; + + for (let i = 0; i < storage.signals.length && count < limit; i++) { + if (storage.signals[i].name === signalName) { + toConsume.push({ signal: storage.signals[i], index: i }); + count++; + } + } + + if (toConsume.length === 0) { + return []; + } + + // Delete from driver using allSettled to handle partial failures + const deleteResults = await Promise.allSettled( + toConsume.map(({ signal }) => driver.delete(buildSignalKey(signal.id))), + ); + + // Track which signals were successfully deleted + const successfullyDeleted: { signal: Signal; index: number }[] = []; + for (let i = 0; i < deleteResults.length; i++) { + if (deleteResults[i].status === "fulfilled") { + successfullyDeleted.push(toConsume[i]); + } + } + + // Only remove successfully deleted signals from memory + // Remove in reverse order to preserve indices + for (let i = successfullyDeleted.length - 1; i >= 0; i--) { + const { index } = successfullyDeleted[i]; + storage.signals.splice(index, 1); + } + + return successfullyDeleted.map(({ signal }) => signal); +} + +/** + * Delete entries with a given location prefix (used for loop forgetting). + * Also cleans up associated metadata from both memory and driver. + */ +export async function deleteEntriesWithPrefix( + storage: Storage, + driver: EngineDriver, + prefixLocation: Location, +): Promise { + // Collect entry IDs for metadata cleanup + const entryIds: string[] = []; + + // Collect entries to delete and their IDs + for (const [key, entry] of storage.history.entries) { + // Check if the entry's location starts with the prefix location + if (isLocationPrefix(prefixLocation, entry.location)) { + entryIds.push(entry.id); + storage.entryMetadata.delete(entry.id); + storage.history.entries.delete(key); + } + } + + // Delete entries from driver using binary prefix + await driver.deletePrefix(buildHistoryPrefix(prefixLocation)); + + // Delete metadata from driver in parallel + await Promise.all(entryIds.map((id) => driver.delete(buildEntryMetadataKey(id)))); +} + +/** + * Get an entry by location. + */ +export function getEntry(storage: Storage, location: Location): Entry | undefined { + const key = locationToKey(storage, location); + return storage.history.entries.get(key); +} + +/** + * Set an entry by location. + */ +export function setEntry(storage: Storage, location: Location, entry: Entry): void { + const key = locationToKey(storage, location); + storage.history.entries.set(key, entry); +} diff --git a/rivetkit-typescript/packages/workflow-engine/src/testing.ts b/rivetkit-typescript/packages/workflow-engine/src/testing.ts new file mode 100644 index 0000000000..c822089397 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/testing.ts @@ -0,0 +1,135 @@ +import type { EngineDriver, KVEntry, KVWrite } from "./driver.js"; +import { keyStartsWith, compareKeys, keyToHex } from "./keys.js"; +import { sleep } from "./utils.js"; + +/** + * In-memory implementation of EngineDriver for testing. + * Uses binary keys (Uint8Array) with hex encoding for internal Map storage. + */ +export class InMemoryDriver implements EngineDriver { + // Map from hex-encoded key to { originalKey, value } + private kv = new Map(); + private alarms = new Map(); + + /** Simulated latency per operation (ms) */ + latency = 10; + + /** How often the worker polls for work */ + workerPollInterval = 100; + + async get(key: Uint8Array): Promise { + await sleep(this.latency); + const entry = this.kv.get(keyToHex(key)); + return entry?.value ?? null; + } + + async set(key: Uint8Array, value: Uint8Array): Promise { + await sleep(this.latency); + this.kv.set(keyToHex(key), { key, value }); + } + + async delete(key: Uint8Array): Promise { + await sleep(this.latency); + this.kv.delete(keyToHex(key)); + } + + async deletePrefix(prefix: Uint8Array): Promise { + await sleep(this.latency); + for (const [hexKey, entry] of this.kv) { + if (keyStartsWith(entry.key, prefix)) { + this.kv.delete(hexKey); + } + } + } + + async list(prefix: Uint8Array): Promise { + await sleep(this.latency); + const results: KVEntry[] = []; + for (const entry of this.kv.values()) { + if (keyStartsWith(entry.key, prefix)) { + results.push({ key: entry.key, value: entry.value }); + } + } + // Sort by key lexicographically + return results.sort((a, b) => compareKeys(a.key, b.key)); + } + + async batch(writes: KVWrite[]): Promise { + await sleep(this.latency); + for (const { key, value } of writes) { + this.kv.set(keyToHex(key), { key, value }); + } + } + + async setAlarm(workflowId: string, wakeAt: number): Promise { + await sleep(this.latency); + this.alarms.set(workflowId, wakeAt); + } + + async clearAlarm(workflowId: string): Promise { + await sleep(this.latency); + this.alarms.delete(workflowId); + } + + /** + * Get the alarm time for a workflow (for testing). + */ + getAlarm(workflowId: string): number | undefined { + return this.alarms.get(workflowId); + } + + /** + * Check if any alarms are due and return their workflow IDs. + */ + getDueAlarms(): string[] { + const now = Date.now(); + const due: string[] = []; + for (const [workflowId, wakeAt] of this.alarms) { + if (wakeAt <= now) { + due.push(workflowId); + } + } + return due; + } + + /** + * Clear all data (for testing). + */ + clear(): void { + this.kv.clear(); + this.alarms.clear(); + } + + /** + * Get a snapshot of all data (for testing/debugging). + */ + snapshot(): { + kv: Record; + alarms: Record; + } { + const kvSnapshot: Record = {}; + for (const [hexKey, entry] of this.kv) { + kvSnapshot[hexKey] = entry.value; + } + return { + kv: kvSnapshot, + alarms: Object.fromEntries(this.alarms), + }; + } + + /** + * Get all hex-encoded keys (for testing). + */ + keys(): string[] { + return [...this.kv.keys()]; + } +} + +// Re-export main exports for convenience +export * from "./index.js"; + +// Export serde functions for testing +export { serializeSignal } from "../schemas/serde.js"; + +// Export key builders for testing +export { buildSignalKey } from "./keys.js"; diff --git a/rivetkit-typescript/packages/workflow-engine/src/types.ts b/rivetkit-typescript/packages/workflow-engine/src/types.ts new file mode 100644 index 0000000000..05f4a32575 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/types.ts @@ -0,0 +1,397 @@ +/** + * Index into the entry name registry. + * Names are stored once and referenced by this index to avoid repetition. + */ +export type NameIndex = number; + +/** + * A segment in a location path. + * Either a name index (for named entries) or a loop iteration marker. + */ +export type PathSegment = NameIndex | LoopIterationMarker; + +/** + * Marker for a loop iteration in a location path. + */ +export interface LoopIterationMarker { + loop: NameIndex; + iteration: number; +} + +/** + * Location identifies where an entry exists in the workflow execution tree. + * It forms a path from the root through loops, joins, and branches. + */ +export type Location = PathSegment[]; + +/** + * Current state of a sleep entry. + */ +export type SleepState = "pending" | "completed" | "interrupted"; + +/** + * Status of an entry in the workflow. + */ +export type EntryStatus = + | "pending" + | "running" + | "completed" + | "failed" + | "exhausted"; + +/** + * Status of a branch in join/race. + */ +export type BranchStatusType = + | "pending" + | "running" + | "completed" + | "failed" + | "cancelled"; + +/** + * Current state of the workflow. + */ +export type WorkflowState = + | "pending" + | "running" + | "sleeping" + | "failed" + | "completed" + | "cancelled"; + +/** + * Step entry data. + */ +export interface StepEntry { + output?: unknown; + error?: string; +} + +/** + * Loop entry data. + */ +export interface LoopEntry { + state: unknown; + iteration: number; + output?: unknown; +} + +/** + * Sleep entry data. + */ +export interface SleepEntry { + deadline: number; + state: SleepState; +} + +/** + * Signal entry data. + */ +export interface SignalEntry { + name: string; + data: unknown; +} + +/** + * Branch status for join/race entries. + */ +export interface BranchStatus { + status: BranchStatusType; + output?: unknown; + error?: string; +} + +/** + * Join entry data. + */ +export interface JoinEntry { + branches: Record; +} + +/** + * Race entry data. + */ +export interface RaceEntry { + winner: string | null; + branches: Record; +} + +/** + * Removed entry data - placeholder for removed steps in workflow migrations. + */ +export interface RemovedEntry { + originalType: EntryKindType; + originalName?: string; +} + +/** + * All possible entry kind types. + */ +export type EntryKindType = + | "step" + | "loop" + | "sleep" + | "signal" + | "join" + | "race" + | "removed"; + +/** + * Type-specific entry data. + */ +export type EntryKind = + | { type: "step"; data: StepEntry } + | { type: "loop"; data: LoopEntry } + | { type: "sleep"; data: SleepEntry } + | { type: "signal"; data: SignalEntry } + | { type: "join"; data: JoinEntry } + | { type: "race"; data: RaceEntry } + | { type: "removed"; data: RemovedEntry }; + +/** + * An entry in the workflow history. + */ +export interface Entry { + id: string; + location: Location; + kind: EntryKind; + dirty: boolean; +} + +/** + * Metadata for an entry (stored separately, lazily loaded). + */ +export interface EntryMetadata { + status: EntryStatus; + error?: string; + attempts: number; + lastAttemptAt: number; + createdAt: number; + completedAt?: number; + dirty: boolean; +} + +/** + * A signal in the queue. + */ +export interface Signal { + /** Unique signal ID (used as KV key). */ + id: string; + name: string; + data: unknown; + sentAt: number; +} + +/** + * Workflow history - maps location keys to entries. + */ +export interface History { + entries: Map; +} + +/** + * Structured error information for workflow failures. + */ +export interface WorkflowError { + /** Error name/type (e.g., "TypeError", "CriticalError") */ + name: string; + /** Error message */ + message: string; + /** Stack trace if available */ + stack?: string; + /** Custom error properties (for structured errors) */ + metadata?: Record; +} + +/** + * Complete storage state for a workflow. + */ +export interface Storage { + nameRegistry: string[]; + flushedNameCount: number; + history: History; + entryMetadata: Map; + signals: Signal[]; + output?: unknown; + state: WorkflowState; + flushedState?: WorkflowState; + error?: WorkflowError; + flushedError?: WorkflowError; + flushedOutput?: unknown; +} + +/** + * Configuration for a step. + */ +export interface StepConfig { + name: string; + run: () => Promise; + /** If true, step result is not persisted (use for idempotent operations). */ + ephemeral?: boolean; + /** Maximum number of retry attempts (default: 3). */ + maxRetries?: number; + /** Base delay in ms for exponential backoff (default: 100). */ + retryBackoffBase?: number; + /** Maximum delay in ms for exponential backoff (default: 30000). */ + retryBackoffMax?: number; + /** Timeout in ms for step execution (default: 30000). Set to 0 to disable. */ + timeout?: number; +} + +/** + * Result from a loop iteration. + */ +export type LoopResult = + | { continue: true; state: S } + | { break: true; value: T }; + +/** + * Configuration for a loop. + */ +export interface LoopConfig { + name: string; + state?: S; + run: (ctx: WorkflowContextInterface, state: S) => Promise>; + commitInterval?: number; +} + +/** + * Configuration for a branch in join/race. + */ +export interface BranchConfig { + run: (ctx: WorkflowContextInterface) => Promise; +} + +/** + * Extract the output type from a BranchConfig. + */ +export type BranchOutput = T extends BranchConfig ? O : never; + +/** + * The workflow context interface exposed to workflow functions. + */ +export interface WorkflowContextInterface { + readonly workflowId: string; + readonly signal: AbortSignal; + + step(name: string, run: () => Promise): Promise; + step(config: StepConfig): Promise; + + loop( + name: string, + run: (ctx: WorkflowContextInterface) => Promise>, + ): Promise; + loop(config: LoopConfig): Promise; + + sleep(name: string, durationMs: number): Promise; + sleepUntil(name: string, timestampMs: number): Promise; + + listen(name: string, signalName: string): Promise; + listenN(name: string, signalName: string, limit: number): Promise; + listenWithTimeout( + name: string, + signalName: string, + timeoutMs: number, + ): Promise; + listenUntil( + name: string, + signalName: string, + timestampMs: number, + ): Promise; + listenNWithTimeout( + name: string, + signalName: string, + limit: number, + timeoutMs: number, + ): Promise; + listenNUntil( + name: string, + signalName: string, + limit: number, + timestampMs: number, + ): Promise; + + join>>( + name: string, + branches: T, + ): Promise<{ [K in keyof T]: BranchOutput }>; + + race( + name: string, + branches: Array<{ name: string; run: (ctx: WorkflowContextInterface) => Promise }>, + ): Promise<{ winner: string; value: T }>; + + removed(name: string, originalType: EntryKindType): Promise; + + isEvicted(): boolean; +} + +/** + * Workflow function type. + */ +export type WorkflowFunction = ( + ctx: WorkflowContextInterface, + input: TInput, +) => Promise; + +/** + * Result returned when a workflow run completes or yields. + */ +export interface WorkflowResult { + state: WorkflowState; + output?: TOutput; + sleepUntil?: number; + waitingForSignals?: string[]; +} + +/** + * Handle for managing a running workflow. + * + * Returned by `runWorkflow()`. The workflow starts executing immediately. + * Use `.result` to await completion, and other methods to interact with + * the running workflow. + */ +export interface WorkflowHandle { + readonly workflowId: string; + + /** + * Promise that resolves when the workflow completes or yields. + */ + readonly result: Promise>; + + /** + * Send a signal to the workflow. + * The signal is persisted and will be available on the next run. + */ + signal(name: string, data: unknown): Promise; + + /** + * Wake the workflow immediately by setting an alarm for now. + */ + wake(): Promise; + + /** + * Request the workflow to stop gracefully. + * The workflow will throw EvictedError at its next yield point, + * flush its state, and resolve the result promise. + */ + evict(): void; + + /** + * Cancel the workflow permanently. + * Sets the workflow state to "cancelled" and clears any pending alarms. + * Unlike evict(), this marks the workflow as permanently stopped. + */ + cancel(): Promise; + + /** + * Get the workflow output if completed. + */ + getOutput(): Promise; + + /** + * Get the current workflow state. + */ + getState(): Promise; +} diff --git a/rivetkit-typescript/packages/workflow-engine/src/utils.ts b/rivetkit-typescript/packages/workflow-engine/src/utils.ts new file mode 100644 index 0000000000..b3b977f926 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/src/utils.ts @@ -0,0 +1,19 @@ +/** + * Sleep for a given number of milliseconds. + */ +export function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Safely parse JSON with a meaningful error message. + */ +export function safeJsonParse(value: string, context: string): T { + try { + return JSON.parse(value) as T; + } catch (error) { + throw new Error( + `Failed to parse ${context}: ${error instanceof Error ? error.message : String(error)}`, + ); + } +} diff --git a/rivetkit-typescript/packages/workflow-engine/tests/workflow.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/workflow.test.ts new file mode 100644 index 0000000000..963ff8c543 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/tests/workflow.test.ts @@ -0,0 +1,430 @@ +import { describe, it, expect, beforeEach } from "vitest"; +import { + InMemoryDriver, + runWorkflow, + Loop, + CriticalError, + SleepError, + StepExhaustedError, + serializeSignal, + buildSignalKey, + generateId, + type WorkflowContextInterface, +} from "../src/testing.js"; + +describe("Workflow Engine", { sequential: true }, () => { + let driver: InMemoryDriver; + + beforeEach(() => { + driver = new InMemoryDriver(); + driver.latency = 0; // Disable latency for faster tests + }); + + describe("Steps", () => { + it("should execute a simple step", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + const result = await ctx.step("my-step", async () => { + return "hello world"; + }); + return result; + }; + + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output).toBe("hello world"); + }); + + it("should replay step on restart", async () => { + let callCount = 0; + + const workflow = async (ctx: WorkflowContextInterface) => { + const result = await ctx.step("my-step", async () => { + callCount++; + return "hello"; + }); + return result; + }; + + // First run + await runWorkflow("wf-1", workflow, undefined, driver).result; + expect(callCount).toBe(1); + + // Second run - should replay + await runWorkflow("wf-1", workflow, undefined, driver).result; + expect(callCount).toBe(1); // Should not increment + }); + + it("should execute multiple steps in sequence", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + const a = await ctx.step("step-a", async () => 1); + const b = await ctx.step("step-b", async () => 2); + const c = await ctx.step("step-c", async () => 3); + return a + b + c; + }; + + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output).toBe(6); + }); + + it("should retry failed steps", async () => { + let attempts = 0; + + const workflow = async (ctx: WorkflowContextInterface) => { + return await ctx.step( + { + name: "flaky-step", + maxRetries: 3, + retryBackoffBase: 1, + retryBackoffMax: 10, + run: async () => { + attempts++; + if (attempts < 3) { + throw new Error("Transient failure"); + } + return "success"; + }, + }, + ); + }; + + // First attempt fails + try { + await runWorkflow("wf-1", workflow, undefined, driver).result; + } catch {} + + // Second attempt fails + try { + await runWorkflow("wf-1", workflow, undefined, driver).result; + } catch {} + + // Third attempt succeeds + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output).toBe("success"); + expect(attempts).toBe(3); + }); + + it("should not retry CriticalError", async () => { + let attempts = 0; + + const workflow = async (ctx: WorkflowContextInterface) => { + return await ctx.step("critical-step", async () => { + attempts++; + throw new CriticalError("Unrecoverable"); + }); + }; + + await expect( + runWorkflow("wf-1", workflow, undefined, driver).result, + ).rejects.toThrow(CriticalError); + + // Should not retry + expect(attempts).toBe(1); + }); + + it("should exhaust retries", async () => { + let attempts = 0; + + const workflow = async (ctx: WorkflowContextInterface) => { + return await ctx.step( + { + name: "always-fails", + maxRetries: 2, + retryBackoffBase: 1, + run: async () => { + attempts++; + throw new Error("Always fails"); + }, + }, + ); + }; + + // Exhaust retries + for (let i = 0; i < 3; i++) { + try { + await runWorkflow("wf-1", workflow, undefined, driver).result; + } catch {} + } + + // Should throw StepExhaustedError + await expect( + runWorkflow("wf-1", workflow, undefined, driver).result, + ).rejects.toThrow(StepExhaustedError); + }); + }); + + describe("Loops", () => { + it("should execute a simple loop", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + return await ctx.loop( + { + name: "count-loop", + state: { count: 0 }, + run: async (ctx, state) => { + if (state.count >= 3) { + return Loop.break(state.count); + } + await ctx.step(`step-${state.count}`, async () => {}); + return Loop.continue({ count: state.count + 1 }); + }, + }, + ); + }; + + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output).toBe(3); + }); + + it("should resume loop from saved state", async () => { + let iteration = 0; + + const workflow = async (ctx: WorkflowContextInterface) => { + return await ctx.loop( + { + name: "resume-loop", + state: { count: 0 }, + commitInterval: 2, // Commit every 2 iterations + run: async (ctx, state) => { + iteration++; + + if (state.count >= 5) { + return Loop.break(state.count); + } + + // Simulate crash on iteration 3 + if (state.count === 2 && iteration === 3) { + throw new Error("Simulated crash"); + } + + return Loop.continue({ count: state.count + 1 }); + }, + }, + ); + }; + + // First run - crashes on iteration 3 + try { + await runWorkflow("wf-1", workflow, undefined, driver).result; + } catch {} + + // Reset iteration counter to see how many new iterations run + iteration = 0; + + // Second run - should resume from last committed state + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output).toBe(5); + }); + }); + + describe("Sleep", () => { + it("should yield on long sleep", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + await ctx.sleep("my-sleep", 10000); + return "done"; + }; + + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("sleeping"); + expect(result.sleepUntil).toBeDefined(); + expect(result.sleepUntil).toBeGreaterThan(Date.now()); + }); + + it("should complete short sleep in memory", async () => { + driver.workerPollInterval = 1000; // High threshold + + const workflow = async (ctx: WorkflowContextInterface) => { + await ctx.sleep("short-sleep", 10); // 10ms + return "done"; + }; + + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output).toBe("done"); + }); + + it("should resume after sleep deadline", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + await ctx.sleep("my-sleep", 1); // Very short sleep + return "done"; + }; + + // First run - starts sleeping + const result1 = await runWorkflow("wf-1", workflow, undefined, driver).result; + + // Wait for deadline + await new Promise((r) => setTimeout(r, 10)); + + // Second run - should complete + const result2 = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result2.state).toBe("completed"); + expect(result2.output).toBe("done"); + }); + }); + + describe("Join", () => { + it("should execute branches in parallel", async () => { + const order: string[] = []; + + const workflow = async (ctx: WorkflowContextInterface) => { + const results = await ctx.join("parallel", { + a: { + run: async (ctx) => { + order.push("a-start"); + const val = await ctx.step("step-a", async () => 1); + order.push("a-end"); + return val; + }, + }, + b: { + run: async (ctx) => { + order.push("b-start"); + const val = await ctx.step("step-b", async () => 2); + order.push("b-end"); + return val; + }, + }, + }); + + return results.a + results.b; + }; + + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output).toBe(3); + + // Both branches should start before either ends (parallel) + expect(order.indexOf("a-start")).toBeLessThan(order.indexOf("b-end")); + expect(order.indexOf("b-start")).toBeLessThan(order.indexOf("a-end")); + }); + + it("should wait for all branches even on error", async () => { + let bCompleted = false; + + const workflow = async (ctx: WorkflowContextInterface) => { + await ctx.join("parallel", { + a: { + run: async () => { + throw new Error("A failed"); + }, + }, + b: { + run: async (ctx) => { + await ctx.step("step-b", async () => { + bCompleted = true; + return "b"; + }); + return "b"; + }, + }, + }); + }; + + await expect( + runWorkflow("wf-1", workflow, undefined, driver).result, + ).rejects.toThrow(); + + expect(bCompleted).toBe(true); + }); + }); + + describe("Race", () => { + it("should return first completed branch", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + return await ctx.race("race", [ + { + name: "fast", + run: async (ctx) => { + return await ctx.step("fast-step", async () => "fast"); + }, + }, + { + name: "slow", + run: async (ctx) => { + // This would sleep but fast completes first + return await ctx.step("slow-step", async () => "slow"); + }, + }, + ]); + }; + + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output?.winner).toBe("fast"); + expect(result.output?.value).toBe("fast"); + }); + }); + + describe("Signals", () => { + it("should wait for signals", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + const signal = await ctx.listen("wait-signal", "my-signal"); + return signal; + }; + + // First run - should wait + const result1 = await runWorkflow("wf-1", workflow, undefined, driver).result; + expect(result1.state).toBe("sleeping"); + expect(result1.waitingForSignals).toContain("my-signal"); + }); + + it("should consume pending signals", async () => { + // Pre-add a signal using BARE serialization with binary key + const signalId = generateId(); + await driver.set( + buildSignalKey(signalId), + serializeSignal({ + id: signalId, + name: "my-signal", + data: "hello", + sentAt: Date.now(), + }), + ); + + const workflow = async (ctx: WorkflowContextInterface) => { + const signal = await ctx.listen("wait-signal", "my-signal"); + return signal; + }; + + const result = await runWorkflow("wf-1", workflow, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output).toBe("hello"); + }); + }); + + describe("Removed", () => { + it("should skip removed steps", async () => { + // First, create a workflow with a step + const workflow1 = async (ctx: WorkflowContextInterface) => { + await ctx.step("old-step", async () => "old"); + return "done"; + }; + + await runWorkflow("wf-1", workflow1, undefined, driver).result; + + // Now "update" the workflow to remove the step + const workflow2 = async (ctx: WorkflowContextInterface) => { + await ctx.removed("old-step", "step"); + return "updated"; + }; + + const result = await runWorkflow("wf-1", workflow2, undefined, driver).result; + + expect(result.state).toBe("completed"); + expect(result.output).toBe("updated"); + }); + }); +}); diff --git a/rivetkit-typescript/packages/workflow-engine/tsconfig.json b/rivetkit-typescript/packages/workflow-engine/tsconfig.json new file mode 100644 index 0000000000..2482b0d391 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "types": ["node"], + "outDir": "./dist" + }, + "include": ["src/**/*", "schemas/**/*", "dist/schemas/**/*", "tests/**/*"] +} diff --git a/rivetkit-typescript/packages/workflow-engine/tsup.config.ts b/rivetkit-typescript/packages/workflow-engine/tsup.config.ts new file mode 100644 index 0000000000..d8652c0151 --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/tsup.config.ts @@ -0,0 +1,7 @@ +import { defineConfig } from "tsup"; +import defaultConfig from "../../../tsup.base.ts"; + +export default defineConfig({ + ...defaultConfig, + outDir: "dist/tsup/", +}); diff --git a/rivetkit-typescript/packages/workflow-engine/vitest.config.ts b/rivetkit-typescript/packages/workflow-engine/vitest.config.ts new file mode 100644 index 0000000000..4afc2b39dd --- /dev/null +++ b/rivetkit-typescript/packages/workflow-engine/vitest.config.ts @@ -0,0 +1,6 @@ +import { defineConfig } from "vitest/config"; +import defaultConfig from "../../../vitest.base.ts"; + +export default defineConfig({ + ...defaultConfig, +});