Skip to content

Latest commit

 

History

History
432 lines (345 loc) · 13.7 KB

File metadata and controls

432 lines (345 loc) · 13.7 KB

AgentGraph

AgentGraph is the lowest-level authoring API in the Unified Orchestration Layer. It exposes explicit node and edge management, supports cycles, subgraph composition, and four edge types including the AgentOS-exclusive discovery and personality edges.

Current runtime note: AgentGraph compilation is complete, but some advanced execution paths are still bridge-dependent today. The base runtime executes tool, router, guardrail, and human nodes directly. gmi, extension, and subgraph execution require a higher-level runtime bridge, and discovery/personality edges are still partial unless those integrations are wired.

Use AgentGraph when you need full control: complex conditional routing, agent loops that cycle back, memory-driven state machines, or subgraph composition. For linear pipelines, see workflow(). For goal-driven orchestration, see mission().

Quick Start

import {
  AgentGraph, START, END,
  gmiNode, toolNode,
} from '@framers/agentos/orchestration';
import { z } from 'zod';

const graph = new AgentGraph(
  {
    input: z.object({ topic: z.string() }),
    scratch: z.object({ sources: z.array(z.string()).default([]) }),
    artifacts: z.object({ summary: z.string() }),
  },
  { reducers: { 'scratch.sources': 'concat' } }
)
  .addNode('search', toolNode('web_search'))
  .addNode('summarize', gmiNode({ instructions: 'Summarize the search results.' }))
  .addEdge(START, 'search')
  .addEdge('search', 'summarize')
  .addEdge('summarize', END)
  .compile();

const result = await graph.invoke({ topic: 'quantum computing' });

Constructor

new AgentGraph(stateSchema, config?)
Parameter Type Description
stateSchema.input Zod schema Shape of the frozen user input
stateSchema.scratch Zod schema Shape of the mutable node-to-node communication bag
stateSchema.artifacts Zod schema Shape of the accumulated outputs returned to the caller
config.reducers StateReducers Field-level merge strategies for parallel branches
config.memoryConsistency MemoryConsistencyMode Graph-wide memory isolation (default: 'snapshot')
config.checkpointPolicy 'every_node' | 'explicit' | 'none' When to persist checkpoints (default: 'none')

Node Builders

All nodes are created with typed factory functions. Each accepts an optional policies object for memory, discovery, guardrail, and persona configuration.

gmiNode

A General Model Invocation node that calls an LLM. The default executionMode is react_bounded — an internal ReAct tool-use loop capped by maxInternalIterations.

import { gmiNode } from '@framers/agentos/orchestration';

gmiNode(
  {
    instructions: 'Research the topic thoroughly.',
    executionMode: 'react_bounded', // default — bounded ReAct loop
    maxInternalIterations: 5,       // default
    parallelTools: false,
    temperature: 0.7,
    maxTokens: 2048,
  },
  {
    memory: {
      consistency: 'snapshot',
      read: { types: ['semantic'], semanticQuery: '{input.topic}', maxTraces: 10 },
      write: { autoEncode: true, type: 'episodic', scope: 'session' },
    },
    discovery: { enabled: true, kind: 'tool', maxResults: 5 },
    guardrails: { output: ['content-safety'], onViolation: 'block' },
    checkpoint: 'after',
  }
)

Execution modes:

Mode Description Default for
single_turn One LLM call, no internal tool loop workflow() steps
react_bounded ReAct loop up to maxInternalIterations AgentGraph gmi nodes
planner_controlled PlanningEngine controls the loop mission() steps

toolNode

Invokes a registered ITool by name. The tool name must match a key in the tool catalogue.

import { toolNode } from '@framers/agentos/orchestration';

toolNode(
  'web_search',
  {
    timeout: 10_000,
    // Accepted by the IR today; shared-runtime retries are still being wired.
    retryPolicy: { maxAttempts: 3, backoff: 'exponential', backoffMs: 500 },
  },
  {
    effectClass: 'external',
    guardrails: { output: ['pii-redaction'], onViolation: 'sanitize' },
  }
)

humanNode

Suspends execution and surfaces a prompt to a human operator. The run can be resumed with .resume(checkpointId) after the human responds.

import { humanNode } from '@framers/agentos/orchestration';

humanNode({ prompt: 'Does this summary look accurate? (yes/no)', timeout: 86_400_000 })

routerNode

A pure routing node with no LLM call and no output. Evaluates a condition and emits edges to the appropriate next node. Use this as the source of addConditionalEdge() calls when you need a dedicated branching point.

import { routerNode } from '@framers/agentos/orchestration';

// In-process function (not serializable)
routerNode((state) => state.scratch.confidence > 0.8 ? 'summarize' : 'search')

// Expression string (serializable to JSON/YAML)
routerNode("scratch.confidence > 0.8 ? 'summarize' : 'search'")

guardrailNode

Runs guardrails as an explicit step in the graph, not just on the edge. Use this for pre-flight checks or to gate progress through critical stages.

import { guardrailNode } from '@framers/agentos/orchestration';

guardrailNode(['pii-redaction', 'content-safety'], {
  onViolation: 'reroute',
  rerouteTarget: 'sanitize-output',
})

subgraphNode

Embeds a previously compiled CompiledExecutionGraph as a single node. Input and output fields are mapped between the parent and child graphs.

At the moment, this is a compile-time authoring primitive. Executing subgraphs requires a runtime bridge that knows how to resolve and invoke nested graphs.

import { subgraphNode } from '@framers/agentos/orchestration';

subgraphNode(compiledSubgraph, {
  inputMapping: { 'scratch.query': 'input.topic' },  // parent scratch → child input
  outputMapping: { 'artifacts.summary': 'scratch.result' }, // child artifacts → parent scratch
})

Edge Types

Static Edge

Always followed. The most common edge type.

graph.addEdge(START, 'fetch');
graph.addEdge('fetch', 'process');
graph.addEdge('process', END);

Conditional Edge

Target is resolved at runtime by a function receiving the current GraphState.

graph.addConditionalEdge('evaluate', (state) =>
  state.scratch.confidence > 0.8 ? 'summarize' : 'search'
);

The function must return a valid node id. The returned id is not validated at compile time.

Discovery Edge

Target is resolved at runtime via semantic search over the capability registry. In the shared runtime today, this remains partial: when discovery is not wired, execution follows the declared fallback target.

graph.addDiscoveryEdge('plan', {
  query: 'find a tool that can search academic papers',
  kind: 'tool',            // restrict to tools only
  fallbackTarget: 'web-search',  // use this node if discovery returns nothing
});

Runtime semantics (target state):

  1. CapabilityDiscoveryEngine.discover(query, { kind }) is called
  2. The top-1 result is selected
  3. A transient executable node is instantiated
  4. Execution continues through the resolved target
  5. If no results: route to fallbackTarget, or emit DISCOVERY_NO_RESULTS

Personality Edge

Routes based on the agent's current HEXACO/PAD trait value. No conditional logic required in your code once a personality source is wired into the runtime.

graph.addPersonalityEdge('draft', {
  trait: 'conscientiousness',  // HEXACO trait name
  threshold: 0.7,              // decision boundary (0–1)
  above: 'human-review',       // route when trait >= threshold
  below: END,                  // route when trait < threshold
});

Available HEXACO traits: honesty_humility, emotionality, extraversion, agreeableness, conscientiousness, openness.

State Management

GraphState has three partitions you control and two managed by the runtime:

interface GraphState<TInput, TScratch, TArtifacts> {
  input: Readonly<TInput>;      // Frozen at graph start — nodes cannot write
  scratch: TScratch;            // Mutable node-to-node bag
  artifacts: TArtifacts;        // Accumulated outputs returned to caller

  // Runtime-managed:
  memory: MemoryView;           // Read-only memory traces (populated by MemoryPolicy)
  diagnostics: DiagnosticsView; // Token usage, latency, discovery results
  currentNodeId: string;
  visitedNodes: string[];
  iteration: number;
  checkpointId?: string;
}

State Reducers

When parallel branches or a loop writes to the same field, you need a reducer to define the merge strategy:

const graph = new AgentGraph(stateSchema, {
  reducers: {
    'scratch.sources': 'concat',        // Arrays: [...existing, ...incoming]
    'scratch.confidence': 'max',        // Numbers: Math.max(existing, incoming)
    'artifacts.summary': 'last',        // Any: last-write-wins (default)
    'artifacts.score': (a, b) => (Number(a) + Number(b)) / 2,  // Custom
  },
});

Built-in reducers: concat, merge, max, min, avg, sum, last, first, longest

Compilation

const compiled = graph.compile({
  checkpointStore: new SqliteCheckpointStore('./runs.db'),
  validate: true, // default — throws on unreachable nodes or structural errors
});

AgentGraph allows cycles (validate: false is only needed for intentional orphan nodes).

Execution

// Run to completion
const result = await compiled.invoke({ topic: 'quantum computing' });

// Stream events
for await (const event of compiled.stream({ topic: 'quantum computing' })) {
  console.log(event.type, event.nodeId);
  // event.type: 'run_start' | 'node_start' | 'node_end' | 'edge_transition' | 'run_end'
}

// Resume from checkpoint after interruption
const result = await compiled.resume(checkpointId);

// Export IR for debugging or visualization
const ir = compiled.toIR();

Subgraph Composition

Build modular graphs by nesting compiled graphs as single nodes:

// Build the inner graph
const fetchGraph = new AgentGraph(fetchState)
  .addNode('fetch', toolNode('web_fetch'))
  .addNode('parse', toolNode('html_parser'))
  .addEdge(START, 'fetch')
  .addEdge('fetch', 'parse')
  .addEdge('parse', END)
  .compile();

// Embed it in the outer graph
const outerGraph = new AgentGraph(outerState)
  .addNode('gather', subgraphNode(fetchGraph.toIR(), {
    inputMapping: { 'input.url': 'input.url' },
    outputMapping: { 'artifacts.text': 'scratch.rawText' },
  }))
  .addNode('analyze', gmiNode({ instructions: 'Analyze the text.' }))
  .addEdge(START, 'gather')
  .addEdge('gather', 'analyze')
  .addEdge('analyze', END)
  .compile();

Complete Example — Research Graph

import {
  AgentGraph, START, END,
  gmiNode, toolNode, humanNode,
} from '@framers/agentos/orchestration';
import { SqliteCheckpointStore } from '@framers/agentos/orchestration/checkpoint';
import { z } from 'zod';

const ResearchState = {
  input: z.object({ topic: z.string() }),
  scratch: z.object({
    sources: z.array(z.string()).default([]),
    confidence: z.number().default(0),
  }),
  artifacts: z.object({
    summary: z.string(),
    sources: z.array(z.string()),
  }),
};

const graph = new AgentGraph(ResearchState, {
  reducers: { 'scratch.sources': 'concat' },
  memoryConsistency: 'snapshot',
  checkpointPolicy: 'every_node',
})
  .addNode('plan', gmiNode(
    {
      instructions: 'Break this research topic into sub-questions.',
      executionMode: 'single_turn',
    },
    {
      memory: {
        consistency: 'snapshot',
        read: { types: ['semantic'], semanticQuery: '{input.topic}', maxTraces: 10 },
      },
      discovery: { enabled: true, kind: 'tool' },
      checkpoint: 'after',
    }
  ))
  .addNode('search', toolNode(
    'web_search',
    { timeout: 10_000 },
    {
      effectClass: 'external',
      guardrails: { output: ['pii-redaction'], onViolation: 'sanitize' },
    }
  ))
  .addNode('evaluate', gmiNode(
    {
      instructions: 'Evaluate source quality and assign a confidence score (0–1).',
      executionMode: 'single_turn',
    },
    {
      memory: {
        consistency: 'snapshot',
        write: { autoEncode: true, type: 'episodic', scope: 'session' },
      },
    }
  ))
  .addNode('summarize', gmiNode(
    {
      instructions: 'Write a final summary from gathered sources.',
      executionMode: 'single_turn',
    },
    {
      guardrails: {
        output: ['grounding-guard'],
        onViolation: 'reroute',
        rerouteTarget: 'search',
      },
    }
  ))
  .addNode('review', humanNode({ prompt: 'Does this summary look accurate?' }))

  .addEdge(START, 'plan')
  .addEdge('plan', 'search')
  .addEdge('search', 'evaluate')
  .addConditionalEdge('evaluate', (state) =>
    state.scratch.confidence > 0.8 ? 'summarize' : 'search'
  )
  .addPersonalityEdge('summarize', {
    trait: 'conscientiousness',
    threshold: 0.7,
    above: 'review',
    below: END,
  })
  .addEdge('review', END)

  .compile({
    checkpointStore: new SqliteCheckpointStore('./research-checkpoints.db'),
  });

// Run
const result = await graph.invoke({ topic: 'quantum computing' });

// Stream with progress
for await (const event of graph.stream({ topic: 'quantum computing' })) {
  if (event.type === 'node_start') console.log(`Starting: ${event.nodeId}`);
  if (event.type === 'node_end')   console.log(`Done: ${event.nodeId}`);
}

// Resume after interruption at human-review step
const result2 = await graph.resume(savedCheckpointId);

See Also