Status: Canonical
Last Updated: 2025-12-31
Supersedes: Implicit state management in AgentState, VariablesManager, VectorMemory
Current state management is fragmented across multiple systems with unclear ownership:
- AgentState (LangGraph): 855-line state class mixing coordination and ephemeral data
- VariablesManager: Standalone variable storage
- StateVariablesManager: Per-thread storage within AgentState
- VectorMemory: Agent memory with embeddings and history
- ExecutionContext: Orchestrator coordination context
This ambiguity leads to:
- Uncertain persistence boundaries (what survives restarts?)
- Mutation conflicts (who can modify what?)
- Memory leaks (ephemeral data never discarded)
- Coupling (agents tightly bound to specific state systems)
State is owned by exactly ONE of three owners:
┌─────────────┬──────────────┬─────────────┬─────────────────┐
│ Owner │ Scope │ Lifetime │ Examples │
├─────────────┼──────────────┼─────────────┼─────────────────┤
│ AGENT │ Request │ Ephemeral │ current_request │
│ │ │ │ temp_data │
│ │ │ │ _internal_cache │
├─────────────┼──────────────┼─────────────┼─────────────────┤
│ MEMORY │ Cross-request│ Persistent │ user_history │
│ │ │ │ embeddings │
│ │ │ │ learned_facts │
├─────────────┼──────────────┼─────────────┼─────────────────┤
│ ORCHESTRATOR│ Coordination │ Trace-scoped│ trace_id │
│ │ │ │ routing_context │
│ │ │ │ parent_context │
└─────────────┴──────────────┴─────────────┴─────────────────┘
class StateOwnership(str, Enum):
AGENT = "agent" # Owned by agent (ephemeral)
MEMORY = "memory" # Owned by memory system (persistent)
ORCHESTRATOR = "orchestrator" # Owned by orchestrator (coordination)
SHARED = "shared" # Shared ownership with explicit protocolDefinition: State that exists only during agent execution and is discarded on shutdown.
Characteristics:
- Lifetime: Request-scoped (startup → shutdown)
- Persistence: None (cleared on termination)
- Access: Read/write by owning agent only
- Storage: In-memory (
_agent_statedict in ManagedAgent)
Examples:
# Ephemeral state managed by agent
agent._agent_state = {
"_current_request": {...}, # Transient request data
"_temp_cache": {...}, # Temporary cache
"_internal_counters": {...}, # Execution counters
"processing_context": {...} # Per-request context
}Mutation Rules:
- ✅ Agent CAN: Read, write, delete own ephemeral state
- ❌ Agent CANNOT: Persist beyond shutdown
- ❌ Memory CANNOT: Read or write agent state
- ❌ Orchestrator CANNOT: Modify agent state (read-only for observability)
Storage Contract:
# Agent owns this - cleared on shutdown
async def shutdown(self):
self._agent_state.clear() # Discard ephemeral stateDefinition: State that persists across agent restarts and represents learned knowledge.
Characteristics:
- Lifetime: Cross-request (survives restarts)
- Persistence: Durable storage (filesystem, DB, vector store)
- Access: Read/write by memory system, read-only for agents
- Storage: VectorMemory, MemoryBackend implementations
Examples:
# Persistent state managed by memory system
memory.store({
"user_history": [...], # Conversation history
"embeddings": [...], # Vector embeddings
"learned_facts": [...], # Extracted knowledge
"preferences": {...} # User preferences
})Mutation Rules:
- ✅ Memory CAN: Read, write, persist, delete memory state
- ✅ Agent CAN: Read memory state (query)
- ❌ Agent CANNOT: Directly modify memory state (must call memory.update())
- ❌ Orchestrator CANNOT: Modify memory state (read-only for routing)
Storage Contract:
# Agent reads via memory system
history = await memory.query("user_history")
# Agent writes via memory system (not direct)
await memory.update("learned_facts", new_fact) # ✅ Correct
# memory_state["learned_facts"].append(new_fact) # ❌ Wrong - bypasses memoryDefinition: State that coordinates agent execution and maintains trace context.
Characteristics:
- Lifetime: Trace-scoped (parent trace → child traces)
- Persistence: Observability backends (OTEL, LangFuse)
- Access: Read-only for agents, write by orchestrator
- Storage: ExecutionContext (immutable), trace backends
Examples:
# Coordination state managed by orchestrator
context = ExecutionContext(
trace_id="req-123", # Unique trace ID
profile="default", # Execution profile
parent_context=parent_ctx, # Parent trace link
metadata={"routing": "rr"} # Coordination metadata
)Mutation Rules:
- ✅ Orchestrator CAN: Create, propagate, log trace context
- ✅ Agent CAN: Read context for tracing/logging
- ❌ Agent CANNOT: Modify trace_id or routing context
- ❌ Memory CANNOT: Access orchestrator state
Storage Contract:
# Agent reads trace context
trace_id = context.trace_id # ✅ Read-only access
# Agent logs with trace context
logger.info("Processing", extra={"trace_id": trace_id})
# Agent CANNOT modify
# context.trace_id = "new-id" # ❌ Frozen dataclass prevents thisAgentState (872 lines) currently mixes ownership. Split as follows:
# src/cuga/backend/cuga_graph/state/agent_state.py
# ORCHESTRATOR state (coordination)
trace_id: str # → ExecutionContext.trace_id
routing_metadata: dict # → ExecutionContext.metadata
# MEMORY state (persistent)
user_history: List[Message] # → VectorMemory.history
embeddings: List[float] # → VectorMemory.embeddings
# AGENT state (ephemeral)
current_request: dict # → agent._agent_state
temp_cache: dict # → agent._agent_stateMigration Path:
- Identify which fields persist across requests → MEMORY
- Identify coordination fields (trace_id, routing) → ORCHESTRATOR
- Remaining fields → AGENT (ephemeral)
- Refactor AgentState to delegate to correct owners
VariablesManager (standalone):
- Current: Global variable storage
- Ownership: AGENT (ephemeral if request-scoped) or MEMORY (if persisted)
- Migration: Clarify persistence semantics - if variables survive restarts → MEMORY, else → AGENT
StateVariablesManager (per-thread in AgentState):
- Current: Thread-local storage within LangGraph state
- Ownership: AGENT (per-thread ephemeral state)
- Migration: Clearly document as ephemeral per-thread storage
VectorMemory:
- Current: Agent memory with embeddings
- Ownership: MEMORY (persistent)
- No changes needed: Already correctly scoped as persistent
ExecutionContext:
- Current: Orchestrator coordination context
- Ownership: ORCHESTRATOR (coordination)
- No changes needed: Already correctly scoped as coordination
Agents implementing AgentLifecycleProtocol must declare ownership:
def owns_state(self, key: str) -> StateOwnership:
"""Determine who owns a specific state key."""
if key.startswith("_"):
return StateOwnership.AGENT
if key in {"history", "embeddings", "facts"}:
return StateOwnership.MEMORY
if key in {"trace_id", "routing"}:
return StateOwnership.ORCHESTRATOR
return StateOwnership.AGENTRaise StateViolationError when ownership is violated:
def set_state(self, key: str, value: Any):
owner = self.owns_state(key)
if owner == StateOwnership.MEMORY:
raise StateViolationError(
key, StateOwnership.MEMORY, StateOwnership.AGENT
)
self._agent_state[key] = valueAssert ownership boundaries in tests:
def test_state_ownership():
agent = MyAgent()
# Agent can modify own state
agent.set_state("_temp", 123)
# Agent CANNOT modify memory state directly
with pytest.raises(StateViolationError):
agent.set_state("user_history", [...])
# Agent CANNOT modify orchestrator state
with pytest.raises(StateViolationError):
agent.set_state("trace_id", "new-id")Contract: MUST NOT persist beyond shutdown.
async def shutdown(self):
# Ephemeral state discarded
self._agent_state.clear()Contract: MUST persist across agent restarts.
async def shutdown(self):
# Memory state persisted before shutdown
if self._memory_dirty:
await self.memory.flush()Contract: MUST emit to observability backends.
async def complete(self, context: ExecutionContext):
# Emit trace completion event
await self.emitter.emit(
"orchestration.complete",
trace_id=context.trace_id
)Audit existing agents for state usage:
# Find all state reads/writes
rg "(self\._state|agent_state\[|memory\.|context\.)" src/For each state field, ask:
- Does it survive agent restarts? → MEMORY
- Is it coordination metadata? → ORCHESTRATOR
- Is it request-scoped? → AGENT
Before (mixed ownership):
class MyAgent:
def __init__(self):
self.state = {
"user_history": [], # Should be MEMORY
"trace_id": None, # Should be ORCHESTRATOR
"temp_data": {} # Correctly AGENT
}After (explicit ownership):
class MyAgent(ManagedAgent):
def __init__(self, memory: VectorMemory):
super().__init__()
self.memory = memory # MEMORY state
# self.context via orchestrator # ORCHESTRATOR state
# self._agent_state inherited # AGENT state
def owns_state(self, key: str) -> StateOwnership:
if key == "user_history":
return StateOwnership.MEMORY
if key == "trace_id":
return StateOwnership.ORCHESTRATOR
return StateOwnership.AGENTAssert ownership boundaries:
def test_state_ownership():
agent = MyAgent(memory=mock_memory)
assert agent.owns_state("user_history") == StateOwnership.MEMORY
assert agent.owns_state("trace_id") == StateOwnership.ORCHESTRATOR
assert agent.owns_state("_temp") == StateOwnership.AGENTA: Use StateOwnership.SHARED and document explicit protocol:
def owns_state(self, key: str) -> StateOwnership:
if key == "working_memory":
return StateOwnership.SHARED # Both can modify
...Then document the shared protocol:
# SHARED state protocol for "working_memory":
# - Agent: Read/write during request
# - Memory: Persist on agent shutdown
# - Coordination: Memory.flush() called by agent.shutdown()A: Incrementally split fields by ownership:
- Add
owns_state()to classify fields - Delegate to correct systems (memory.update(), context.trace_id)
- Remove redundant storage from AgentState
- Update tests to assert ownership
A: Document as ephemeral per-thread storage:
# StateVariablesManager: AGENT ownership (ephemeral, per-thread)
# - Lifetime: Single request/thread
# - Persistence: None
# - Use for: Thread-local temp dataA: Read-only for observability, never modify:
# ✅ Orchestrator can read for logging
logger.info("Agent state", agent_counters=agent._agent_state.get("_counters"))
# ❌ Orchestrator CANNOT modify
# agent._agent_state["_counters"] = 0 # Violates ownership- AgentLifecycleProtocol:
src/cuga/agents/lifecycle.py - OrchestratorProtocol:
src/cuga/orchestrator/protocol.py - AgentState:
src/cuga/backend/cuga_graph/state/agent_state.py - VectorMemory:
src/cuga/backend/memory/ - ExecutionContext:
src/cuga/orchestrator/protocol.py
- 2025-12-31: Initial state ownership contract defining AGENT/MEMORY/ORCHESTRATOR boundaries