refactor: restructure Graph class to utilize CompiledGraph#13
refactor: restructure Graph class to utilize CompiledGraph#13cantemizyurek merged 2 commits intomainfrom
Conversation
…ion, enhancing modularity and maintainability
🦋 Changeset detectedLatest commit: 7c468f8 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📝 WalkthroughWalkthroughThis change separates graph definition from execution by introducing CompiledGraph, which handles execution, suspense, checkpointing, state management, events, and subgraph execution; Graph becomes a lightweight builder with a new compile() method to produce an executable CompiledGraph. Changes
Sequence DiagramssequenceDiagram
participant User
participant Graph
participant Compiled as CompiledGraph
participant Storage
participant State as StateManager
participant Node
User->>Graph: define graph
User->>Graph: compile(options)
Graph->>Compiled: new CompiledGraph(registries, options)
Compiled->>Storage: init (InMemoryStorage default)
Compiled-->>User: return CompiledGraph
User->>Compiled: execute/run
Compiled->>Compiled: executeInternal (checkpoint check)
alt checkpoint exists
Compiled->>Storage: restore state & pending nodes
else no checkpoint
Compiled->>State: init start state
end
Compiled->>Compiled: call onStart
loop execution loop
Compiled->>Node: execute(state, suspendFn)
alt node completes
Node->>State: apply updates
Node-->>Compiled: result
else node suspends
Node-->>Compiled: throw SuspenseError(data)
Compiled->>Storage: persist checkpoint (state, nodes)
Compiled-->>User: emit/return suspense
end
end
Compiled->>Compiled: call onFinish
Compiled-->>User: return final state
sequenceDiagram
participant Parent as ParentCompiledGraph
participant SubNode as SubgraphNode
participant Child as ChildCompiledGraph
participant Storage as ChildStorage
Parent->>SubNode: invoke subgraph node
SubNode->>Child: create child CompiledGraph (registries, options)
Child->>Storage: init child storage
Child->>Child: execute child nodes
alt child suspends
Child->>Storage: persist child checkpoint
Child-->>SubNode: propagate SuspenseError(data)
else child completes
Child-->>SubNode: return child final state
SubNode->>Parent: map outputs into parent state
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/compiled-graph.ts`:
- Around line 238-246: executeBatch currently runs executeSingleNode in parallel
via Promise.all which can cause race conditions because individual nodes call
update() to mutate context.state; either document this limitation or make state
mutations deterministic by changing executeBatch to (a) run nodes serially
(await each executeSingleNode in a for...of) so updates are applied in a defined
order, or (b) have executeSingleNode return a collected state-delta instead of
applying immediately and then merge/apply those deltas to context.state in a
deterministic order after all promises resolve; update references: executeBatch,
executeSingleNode, update(), and context.state.
In `@tests/graph.test.ts`:
- Line 289: The test uses an unnecessary cast "storage as any" when calling
g.compile(...); fix it by instantiating the InMemoryStorage with the correct
NodeKeys generic to match the graph (e.g., change new InMemoryStorage<{ value:
number }, string>() to new InMemoryStorage<{ value: number }, 'START' |
'END'>()) so g.compile accepts it without casting; update the InMemoryStorage
generic to the actual node identifiers for any custom graph, then remove the "as
any" cast from the runGraph(g.compile(...)) call.
🧹 Nitpick comments (2)
src/types.ts (1)
85-92: Consider consolidating duplicate type definitions.
CompileOptionsis identical toGraphOptions. If they're expected to diverge in the future, this separation is fine. Otherwise, consider using a shared type or type alias to reduce duplication.// Option 1: Type alias export type CompileOptions<State, NodeKeys> = GraphOptions<State, NodeKeys> // Option 2: Shared base (if divergence is expected) interface BaseOptions<State, NodeKeys> { storage?: GraphStorage<State, NodeKeys> onFinish?: (args: { state: State }) => Promise<void> | void onStart?: (args: { state: State; writer: UIMessageStreamWriter }) => Promise<void> | void } export interface GraphOptions<State, NodeKeys> extends BaseOptions<State, NodeKeys> {} export interface CompileOptions<State, NodeKeys> extends BaseOptions<State, NodeKeys> {}src/compiled-graph.ts (1)
13-21: AddObject.setPrototypeOffor robust error inheritance.When extending built-in classes like
Errorin TypeScript/ES6, the prototype chain may not be properly set in some transpilation targets (ES5). This can causeinstanceof SuspenseErrorchecks to fail.♻️ Proposed fix
export class SuspenseError extends Error { readonly data?: unknown constructor(data?: unknown) { super('Suspense') this.name = 'SuspenseError' this.data = data + Object.setPrototypeOf(this, SuspenseError.prototype) } }
| private async executeBatch( | ||
| nodes: GraphSDK.Node<State, NodeKeys>[], | ||
| context: GraphSDK.ExecutionContext<State, NodeKeys> | ||
| ): Promise<Array<{ node: GraphSDK.Node<State, NodeKeys>; error: SuspenseError }>> { | ||
| const results = await Promise.all( | ||
| nodes.map((node) => this.executeSingleNode(node, context)) | ||
| ) | ||
| return results.filter((r): r is { node: GraphSDK.Node<State, NodeKeys>; error: SuspenseError } => r !== null) | ||
| } |
There was a problem hiding this comment.
Parallel state updates may cause race conditions.
executeBatch runs nodes in parallel via Promise.all. Each node can call update() which mutates context.state. If two parallel nodes update the same state property, the final value depends on execution order, potentially causing lost updates.
The tests "parallel nodes can update different state properties" pass because they update different properties. However, if two parallel nodes update the same property, the behavior is undefined.
Consider documenting this limitation or implementing synchronized state updates if concurrent writes to the same property should be supported.
🤖 Prompt for AI Agents
In `@src/compiled-graph.ts` around lines 238 - 246, executeBatch currently runs
executeSingleNode in parallel via Promise.all which can cause race conditions
because individual nodes call update() to mutate context.state; either document
this limitation or make state mutations deterministic by changing executeBatch
to (a) run nodes serially (await each executeSingleNode in a for...of) so
updates are applied in a defined order, or (b) have executeSingleNode return a
collected state-delta instead of applying immediately and then merge/apply those
deltas to context.state in a deterministic order after all promises resolve;
update references: executeBatch, executeSingleNode, update(), and context.state.
… separation of concerns and execution flexibility
Summary by CodeRabbit
New Features
Breaking Changes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.