Deep dive into Agent OS kernel architecture for AAIF reviewers.
- Design Philosophy
- Execution Model
- Policy Engine
- Signal Dispatch
- Virtual File System
- Stateless Architecture
- Integration Points
Traditional agent frameworks (LangChain, CrewAI) are librariesβthey provide tools, not governance. When an agent misbehaves, the framework can only suggest; it cannot enforce.
Agent OS inverts this: the kernel owns execution. Every agent action passes through kernel space, where policies are enforced deterministically before execution reaches user space.
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Traditional Framework β
β β
β Agent β Action β Maybe Validation? β Execution β
β (library call) (uncontrolled) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Agent OS β
β β
β Agent β Syscall β Kernel Space β Policy Check β SIGKILL? β
β β β β
β β βββββββββββββββββββββ β
β β β PASS β
β β βΌ β
β ββββββ Execution (governed) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
- No Bypass: All agent I/O goes through syscalls
- Fail Closed: Unknown actions are denied
- Audit Everything: Flight Recorder logs all decisions
- Crash Isolation: User space crash β kernel crash
1. Agent code makes syscall (SYS_EXEC, SYS_WRITE, etc.)
β
βΌ
2. Kernel receives ExecutionRequest
βββββββββββββββββββββββββββββββββββββββββββ
β ExecutionRequest β
β agent_id: "carbon-auditor" β
β action: "database_query" β
β params: {"query": "SELECT..."} β
β context: β
β policies: ["read_only", "no_pii"] β
β history: [...] β
βββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
3. Policy Engine evaluates each policy
βββββββββββββββββββββββββββββββββββββββββββ
β For each policy in context.policies: β
β - Check blocked_actions β
β - Check blocked_patterns β
β - Check constraints β
β β
β If ANY check fails β SIGKILL β
βββββββββββββββββββββββββββββββββββββββββββ
β
ββββββ΄βββββ
β DENIED βββββΆ ExecutionResult(signal=SIGKILL)
βββββββββββ
β
ββββββ΄βββββ
β ALLOWED βββββΆ Execute action
βββββββββββ
β
βΌ
4. Update context with history entry
β
βΌ
5. Return ExecutionResult
βββββββββββββββββββββββββββββββββββββββββββ
β ExecutionResult β
β success: True β
β data: {"rows": [...]} β
β trace_id: "abc123" β
β updated_context: ExecutionContext β
βββββββββββββββββββββββββββββββββββββββββββ
Every request is independent. The kernel maintains no session state.
from agent_os import StatelessKernel, ExecutionContext
kernel = StatelessKernel()
# Each call is independent
result1 = await kernel.execute(
action="database_query",
params={"query": "SELECT * FROM users"},
context=ExecutionContext(
agent_id="agent-1",
policies=["read_only"],
history=[] # Empty history
)
)
# Pass updated context back for next call
result2 = await kernel.execute(
action="api_call",
params={"url": "https://api.example.com"},
context=result1.updated_context # Contains history
)Why Stateless?
- Horizontal scaling (any kernel instance handles any request)
- MCP compatibility (June 2026 stateless requirement)
- Easier testing (no hidden state)
- Cloud-native (works with Lambda, Cloud Run)
Policies are declarative rules that define allowed/blocked actions:
DEFAULT_POLICIES = {
"read_only": {
"blocked_actions": [
"file_write", "database_write", "send_email",
"api_call_write", "delete"
],
"blocked_patterns": []
},
"no_pii": {
"blocked_actions": [],
"blocked_patterns": [
r"\bssn\b", r"\bsocial.security\b",
r"\bcredit.card\b", r"\bpassword\b"
]
},
"strict": {
"blocked_actions": ["shell_exec", "eval", "raw_sql"],
"blocked_patterns": [
r"\bdrop\s+table\b", r"\bdelete\s+from\b",
r";\s*--", r"'\s*or\s*'1'\s*=\s*'1"
]
},
"audit": {
"blocked_actions": [],
"blocked_patterns": [],
"audit_all": True # Log everything, block nothing
}
}def _check_policies(self, request: ExecutionRequest) -> Optional[str]:
"""
Check all policies against the request.
Returns error message if blocked, None if allowed.
"""
for policy_name in request.context.policies:
policy = self.policies.get(policy_name)
if not policy:
continue
# Check blocked actions
if request.action in policy.get("blocked_actions", []):
return f"Action '{request.action}' blocked by '{policy_name}'"
# Check blocked patterns in params
params_str = json.dumps(request.params).lower()
for pattern in policy.get("blocked_patterns", []):
if re.search(pattern, params_str, re.IGNORECASE):
return f"Pattern '{pattern}' blocked by '{policy_name}'"
return None # All checks passedkernel = StatelessKernel(policies={
"banking": {
"blocked_actions": ["transfer_funds", "create_account"],
"blocked_patterns": [r"\$\d{6,}"], # Block amounts > $99,999
"max_calls_per_minute": 10
}
})| Signal | Value | Description | Maskable | Recovery |
|---|---|---|---|---|
SIGSTOP |
1 | Pause execution | Yes | SIGCONT |
SIGCONT |
2 | Resume execution | Yes | - |
SIGINT |
3 | Graceful interrupt | Yes | Restart |
SIGKILL |
4 | Immediate termination | No | New agent |
SIGTERM |
5 | Request shutdown | Yes | Cleanup |
SIGPOLICY |
8 | Policy violation | No | None |
SIGTRUST |
9 | Trust violation | No | None |
class SignalDispatcher:
"""Dispatch signals to agents."""
def __init__(self):
self._handlers: Dict[str, Dict[Signal, Callable]] = {}
self._history: List[Dict] = []
def send(self, agent_id: str, signal: Signal, reason: str = ""):
"""Send signal to agent."""
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"agent_id": agent_id,
"signal": signal.name,
"reason": reason
}
self._history.append(entry)
# Non-maskable signals
if signal in (Signal.SIGKILL, Signal.SIGPOLICY, Signal.SIGTRUST):
self._terminate(agent_id, signal, reason)
return
# Maskable signals - call handler if registered
handlers = self._handlers.get(agent_id, {})
handler = handlers.get(signal)
if handler:
handler(signal, reason)When a policy check fails:
# In StatelessKernel.execute()
error = self._check_policies(request)
if error:
# Record metrics
self.metrics.record_violation(
agent_id=request.context.agent_id,
action=request.action,
policy=error.split("'")[1]
)
self.metrics.record_blocked(request.context.agent_id, request.action)
# Emit signal (for Flight Recorder)
self.signals.send(
request.context.agent_id,
Signal.SIGKILL,
reason=error
)
return ExecutionResult(
success=False,
error=error,
signal="SIGKILL"
)/agent/{agent_id}/
βββ mem/
β βββ working/ # Ephemeral (cleared on restart)
β βββ episodic/ # Experience logs
β βββ semantic/ # Long-term knowledge
βββ state/
β βββ checkpoints/ # SIGUSR1 snapshots
βββ policy/ # Read-only from user space
β βββ active.json # Current policies
β βββ history.json # Policy changes
βββ ipc/ # Inter-process pipes
β βββ inbox/
β βββ outbox/
βββ audit/ # Flight Recorder logs
βββ events.jsonl
class AgentVFS:
"""Virtual File System for agent memory and state."""
def __init__(self, agent_id: str, backend: StorageBackend = None):
self.agent_id = agent_id
self.backend = backend or MemoryBackend()
self._readonly_paths = {"/policy", "/audit"}
def write(self, path: str, data: Any) -> bool:
"""Write to VFS path."""
full_path = f"/agent/{self.agent_id}{path}"
# Check read-only
for ro_path in self._readonly_paths:
if path.startswith(ro_path):
raise PermissionError(f"Path {path} is read-only")
self.backend.set(full_path, data)
return True
def read(self, path: str) -> Any:
"""Read from VFS path."""
full_path = f"/agent/{self.agent_id}{path}"
return self.backend.get(full_path)VFS paths are exposed as MCP resources:
VFS Path MCP Resource URI
/agent/foo/mem/working/data β vfs://foo/mem/working/data
/agent/foo/mem/episodic/* β vfs://foo/mem/episodic/*
All state lives in pluggable backends:
class StateBackend(Protocol):
"""Protocol for external state storage."""
async def get(self, key: str) -> Optional[Dict[str, Any]]: ...
async def set(self, key: str, value: Dict[str, Any], ttl: int = None): ...
async def delete(self, key: str): ...
# Implementations
class MemoryBackend(StateBackend):
"""In-memory (for testing, single-instance)."""
class RedisBackend(StateBackend):
"""Redis (for production, distributed)."""
class DynamoDBBackend(StateBackend):
"""DynamoDB (for serverless)."""Every execution carries its own context:
@dataclass
class ExecutionContext:
"""All state needed for a single execution."""
agent_id: str
policies: List[str] = field(default_factory=list)
history: List[Dict] = field(default_factory=list)
state_ref: Optional[str] = None # External state key
metadata: Dict[str, Any] = field(default_factory=dict) Load Balancer
β
ββββββββββββββββββΌβββββββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββ βββββββββββ βββββββββββ
β Kernel β β Kernel β β Kernel β
β Pod #1 β β Pod #2 β β Pod #3 β
ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ
β β β
ββββββββββββββββββΌβββββββββββββββββ
β
ββββββ΄βββββ
β Redis β (shared state)
β Cluster β
βββββββββββ
Any kernel instance can handle any request because state is external.
# MCP tool calls go through kernel
@mcp_tool
async def kernel_execute(agent_id: str, action: str, params: dict, context: dict):
kernel = StatelessKernel()
result = await kernel.execute(
action=action,
params=params,
context=ExecutionContext(**context)
)
return result.to_dict()# Parse AGENTS.md and convert to policies
parser = AgentsParser()
config = parser.parse_directory(".agents/")
policies = parser.to_kernel_policies(config)
# Use with kernel
kernel = StatelessKernel()
context = ExecutionContext(
agent_id=config.name,
policies=list(policies["rules"])
)# Kernel automatically emits metrics and traces
kernel = StatelessKernel(
metrics=KernelMetrics(),
tracer=KernelTracer("agent-os")
)
# Start metrics server
server = MetricsServer(port=9090, metrics=kernel.metrics)
server.start()
# Prometheus scrapes localhost:9090/metrics
# Grafana displays dashboards| Component | Location |
|---|---|
| Stateless Kernel | src/agent_os/stateless.py |
| Policy Engine | src/agent_os/stateless.py (in StatelessKernel) |
| Signal Dispatch | packages/control-plane/src/agent_control_plane/signals.py |
| VFS | packages/control-plane/src/agent_control_plane/vfs.py |
| MCP Server | packages/mcp-kernel-server/src/mcp_kernel_server/server.py |
| AGENTS.md Parser | src/agent_os/agents_compat.py |
| Metrics | packages/observability/src/agent_os_observability/metrics.py |
| Tracer | packages/observability/src/agent_os_observability/tracer.py |
| CLI | src/agent_os/cli.py |