A TypeScript library for building stateful, resumable workflows with human-in-the-loop support.
- Human in the Loop — Suspend execution to wait for user input, approvals, or external data, then resume seamlessly
- Type-safe — Full TypeScript support with generic state management
- Resumable — Checkpoint and resume execution from any point
- Composable — Nested subgraphs with state mapping
- Parallel Execution — Fork workflows with multiple edges from a single node
- Conditional Routing — Dynamic edges based on state
- Visualization — Generate Mermaid diagrams of your workflows
npm i ai-sdk-graphimport { graph } from 'ai-sdk-graph'
const workflow = graph<{ value: number }>()
.node('validate', ({ update }) => {
update({ value: 10 })
})
.node('transform', ({ update, state }) => {
update({ value: state().value * 2 })
})
.edge('START', 'validate')
.edge('validate', 'transform')
.edge('transform', 'END')
// Execute the workflow
const stream = workflow.execute('run-1', { value: 0 })Nodes are execution units that receive a context object with:
state()— Read the current stateupdate(changes)— Update state with partial object or functionsuspense(data?)— Pause execution for human-in-the-loopwriter— Stream writer for UI integration
graph<{ count: number }>()
.node('increment', ({ state, update }) => {
update({ count: state().count + 1 })
})Connect nodes with static or dynamic routing:
// Static edge
.edge('START', 'validate')
// Dynamic edge based on state
.edge('router', (state) => state.isValid ? 'process' : 'reject')State is type-safe and immutable. Updates can be partial objects or functions:
// Partial update
update({ status: 'complete' })
// Functional update
update((state) => ({ count: state.count + 1 }))Suspend execution to wait for user input, approvals, or external data:
const workflow = graph<{ approved: boolean }>()
.node('review', ({ state, suspense }) => {
if (!state().approved) {
suspense({ reason: 'Waiting for approval' })
}
})
.edge('START', 'review')
.edge('review', 'END')
// First execution suspends
workflow.execute('run-1', { approved: false })
// Resume with updated state after user approves
workflow.execute('run-1', (existing) => ({ ...existing, approved: true }))Multiple edges from the same node execute targets in parallel:
graph<{ results: string[] }>()
.node('fork', () => {})
.node('taskA', ({ update }) => update({ results: ['A'] }))
.node('taskB', ({ update }) => update({ results: ['B'] }))
.node('join', () => {})
.edge('START', 'fork')
.edge('fork', 'taskA') // Both taskA and taskB
.edge('fork', 'taskB') // execute in parallel
.edge('taskA', 'join')
.edge('taskB', 'join')
.edge('join', 'END')Compose workflows with nested graphs:
const childGraph = graph<{ value: number }>()
.node('double', ({ update, state }) => {
update({ value: state().value * 2 })
})
.edge('START', 'double')
.edge('double', 'END')
const parentGraph = graph<{ input: number; result: number }>()
.graph('process', childGraph, {
input: (parentState) => ({ value: parentState.input }),
output: (childState) => ({ result: childState.value })
})
.edge('START', 'process')
.edge('process', 'END')By default, graphs use in-memory storage. For production, use Redis:
import { graph } from 'ai-sdk-graph'
import { RedisStorage } from 'ai-sdk-graph/storage'
import Redis from 'ioredis'
const redis = new Redis()
const storage = new RedisStorage(redis)
const workflow = graph<State>(storage)
// ... define nodes and edgesGenerate Mermaid diagrams of your workflows:
const diagram = workflow.toMermaid()
// or with direction
const diagram = workflow.toMermaid({ direction: 'LR' })Output:
flowchart TB
START([START])
validate[validate]
transform[transform]
END([END])
START --> validate
validate --> transform
transform --> END
Create a new graph with optional storage backend.
Add a node with an execution handler.
Add a static edge between nodes.
Add a dynamic edge that routes based on state.
Add a nested subgraph with state mapping.
Execute the graph and return a readable stream.
Generate a Mermaid flowchart diagram.
MIT