Isolate AI agents at runtime using privilege rings, saga transactions, and kill switches.
See also: Deployment Guide | Agent Runtime README
- Introduction
- Quick Start: Ring-Based Access Control
- The 4-Ring Model
- Capability Guards
- Saga Orchestration
- Session Isolation
- Emergency Controls
- Production Deployment
AI agents that can read files, call APIs, and execute code need strict boundaries. Without sandboxing, a misbehaving agent can:
- Exfiltrate data — read secrets and send them to external endpoints.
- Corrupt state — write to databases or files it should never touch.
- Consume resources — spin up infinite loops that exhaust CPU and memory.
- Cascade failures — a failed step in a multi-agent workflow leaves the system in a broken half-finished state.
The Agent Runtime (pip install agent-runtime) solves this with four
layers of defense:
┌─────────────────────────────────────────────────┐
│ Execution Ring Model │
│ Ring 0 (Root) → Ring 3 (Sandbox) │
├─────────────────────────────────────────────────┤
│ Capability Guards │
│ Per-agent tool allow/deny lists │
├─────────────────────────────────────────────────┤
│ Saga Orchestration │
│ Multi-step transactions with auto-rollback │
├─────────────────────────────────────────────────┤
│ Session Isolation │
│ VFS namespacing, vector clocks, intent locks │
├─────────────────────────────────────────────────┤
│ Emergency Controls │
│ Kill switch, rate limiting, breach detection │
└─────────────────────────────────────────────────┘
- Python ≥ 3.11
pip install agent-runtime(v2.0.2+)- For capability guards:
pip install agent-os-kernel
Get sandboxing running in under 20 lines:
from hypervisor import Hypervisor, ExecutionRing
from hypervisor.rings.classifier import ActionClassifier
from hypervisor.rings.enforcer import RingEnforcer
# 1. Create the runtime
hv = Hypervisor()
# 2. Classify an action — the classifier maps actions to rings
classifier = ActionClassifier()
result = classifier.classify_action_id("file.read")
print(result.ring) # ExecutionRing.RING_3_SANDBOX
print(result.risk_weight) # 0.1
result = classifier.classify_action_id("deploy.k8s")
print(result.ring) # ExecutionRing.RING_1_PRIVILEGED
print(result.risk_weight) # 0.9
# 3. Enforce the ring — block agents that lack privilege
enforcer = RingEnforcer()
agent_ring = ExecutionRing.from_eff_score(eff_score=0.72)
print(agent_ring) # ExecutionRing.RING_2_STANDARD
# Agent in Ring 2 tries a Ring 1 action → blocked
# Agent in Ring 2 tries a Ring 3 action → allowedThat's it. The classifier decides which ring an action belongs to, and the enforcer checks whether the agent's effective score grants sufficient privilege.
The runtime uses a hardware-inspired 4-ring privilege model. Each ring defines what an agent can do, how many calls it can make, and what level of trust is required.
┌───────────────────────┐
│ Ring 0 — Root │ eff_score: N/A (SRE Witness required)
│ Runtime config, │ Penalty/slashing operations
│ penalty ops │ Rate: unlimited
├───────────────────────┤
│ Ring 1 — Privileged │ eff_score ≥ 0.95 + consensus
│ Non-reversible ops │ Write, deploy, delete
│ (deploy, delete) │ Rate: 1000 calls/min
├───────────────────────┤
│ Ring 2 — Standard │ eff_score ≥ 0.60
│ Reversible actions │ Read + limited write
│ (write files, APIs) │ Rate: 100 calls/min
├───────────────────────┤
│ Ring 3 — Sandbox │ Default for unknown agents
│ Read-only, research │ No network, no writes
│ (safe exploration) │ Rate: 10 calls/min
└───────────────────────┘
The ExecutionRing enum maps directly from an agent's effective score
(eff_score), which combines trust, reputation, and behavioral signals:
from hypervisor.models import ExecutionRing
# Ring assignment is automatic based on eff_score
ring = ExecutionRing.from_eff_score(eff_score=0.98, has_consensus=True)
assert ring == ExecutionRing.RING_1_PRIVILEGED
ring = ExecutionRing.from_eff_score(eff_score=0.75)
assert ring == ExecutionRing.RING_2_STANDARD
ring = ExecutionRing.from_eff_score(eff_score=0.40)
assert ring == ExecutionRing.RING_3_SANDBOXNote: Ring 0 is never assigned by score alone — it requires an SRE Witness attestation and is reserved for runtime-level configuration.
Every action is classified by risk weight and reversibility to determine which ring it requires:
from hypervisor.rings.classifier import ActionClassifier, ClassificationResult
from hypervisor.models import ReversibilityLevel
classifier = ActionClassifier()
# Read operations → Ring 3 (low risk, fully reversible)
result = classifier.classify_action_id("file.read")
assert result.ring == ExecutionRing.RING_3_SANDBOX
assert result.reversibility == ReversibilityLevel.REVERSIBLE
# Write operations → Ring 2 (medium risk, reversible with effort)
result = classifier.classify_action_id("file.write")
assert result.ring == ExecutionRing.RING_2_STANDARD
# Deployments → Ring 1 (high risk, non-reversible)
result = classifier.classify_action_id("deploy.k8s")
assert result.ring == ExecutionRing.RING_1_PRIVILEGED
assert result.reversibility == ReversibilityLevel.NON_REVERSIBLE
# Override classification for custom actions
classifier.set_override("my_custom.action", ring=ExecutionRing.RING_2_STANDARD, risk_weight=0.5)Sometimes an agent needs temporary access to a higher ring. The
RingElevationManager handles time-bounded privilege escalation:
from hypervisor.rings.elevation import (
RingElevationManager,
RingElevation,
ElevationDenialReason,
)
manager = RingElevationManager()
# Request elevation from Ring 2 → Ring 1
elevation = manager.request_elevation(
agent_did="did:example:agent-42",
session_id="session-001",
current_ring=ExecutionRing.RING_2_STANDARD,
target_ring=ExecutionRing.RING_1_PRIVILEGED,
ttl_seconds=300, # 5-minute window (max: 3600s)
reason="Deploying approved release v2.1.0",
attestation="signed-approval-token-from-sre",
)
if elevation.is_active:
# Agent now has Ring 1 privileges for 5 minutes
effective = manager.get_effective_ring(
agent_did="did:example:agent-42",
session_id="session-001",
base_ring=ExecutionRing.RING_2_STANDARD,
)
assert effective == ExecutionRing.RING_1_PRIVILEGED
# Revoke early if needed
manager.revoke_elevation(elevation.elevation_id)Community Edition: Elevation requests are always denied. The denial reason is
ElevationDenialReason.COMMUNITY_EDITION. Upgrade to Enterprise for dynamic ring escalation.
The RingBreachDetector monitors for agents attempting actions above their
ring level:
from hypervisor.rings.breach_detector import (
RingBreachDetector,
BreachEvent,
BreachSeverity,
)
detector = RingBreachDetector()
# The detector fires events when an agent in Ring 3 attempts a Ring 1 action
# Severity depends on the gap between agent ring and action ring:
# 1-ring gap → WARNING
# 2-ring gap → HIGH
# 3-ring gap → CRITICAL (Ring 3 agent trying Ring 0 action)While rings control privilege levels, Capability Guards control which specific tools an agent can call. This is a second, orthogonal layer of defense.
The CapabilityGuardMiddleware (from agent-os) enforces per-agent tool
allow/deny lists:
from agent_os.integrations.maf_adapter import (
CapabilityGuardMiddleware,
GovernancePolicyMiddleware,
create_governance_middleware,
)
# Option 1: Explicit allow list (whitelist) — only these tools are permitted
guard = CapabilityGuardMiddleware(
allowed_tools=["web_search", "file_read", "calculator"],
)
# Option 2: Deny list (blacklist) — everything except these tools
guard = CapabilityGuardMiddleware(
denied_tools=["execute_code", "delete_file", "send_email"],
)
# Option 3: Factory function for full governance stack
middleware = create_governance_middleware(
policy_directory="policies/",
allowed_tools=["web_search", "file_read"],
denied_tools=["execute_code", "delete_file"],
enable_rogue_detection=True,
)Combine rings with capability guards for defense-in-depth:
from hypervisor.models import ExecutionRing
from agent_os.integrations.maf_adapter import CapabilityGuardMiddleware
# Define tool sets per ring
RING_TOOL_POLICIES = {
ExecutionRing.RING_3_SANDBOX: CapabilityGuardMiddleware(
allowed_tools=["web_search", "file_read"],
),
ExecutionRing.RING_2_STANDARD: CapabilityGuardMiddleware(
allowed_tools=["web_search", "file_read", "file_write", "api_call"],
denied_tools=["delete_file", "execute_code"],
),
ExecutionRing.RING_1_PRIVILEGED: CapabilityGuardMiddleware(
denied_tools=["drop_database"], # everything else allowed
),
ExecutionRing.RING_0_ROOT: CapabilityGuardMiddleware(
# No restrictions — full access
),
}
def get_guard_for_agent(eff_score: float) -> CapabilityGuardMiddleware:
"""Return the capability guard matching an agent's privilege ring."""
ring = ExecutionRing.from_eff_score(eff_score)
return RING_TOOL_POLICIES[ring]from agent_os.integrations.maf_adapter import (
create_governance_middleware,
AuditTrailMiddleware,
RogueDetectionMiddleware,
)
# Full governance middleware stack: policy + capability guard + audit + rogue detection
middleware = create_governance_middleware(
policy_directory="policies/",
allowed_tools=["web_search", "file_read"],
denied_tools=["execute_code"],
enable_rogue_detection=True,
)
# Attach to your agent framework — the middleware intercepts every tool call
# and blocks anything not in the allow list (or in the deny list)Multi-step agent workflows are dangerous: if step 3 of 5 fails, you're left with a half-finished state. The Saga Orchestrator wraps multi-step workflows in transactions with automatic compensation (rollback).
Step 1: Create PR ──→ Compensate: Close PR
Step 2: Run tests ──→ Compensate: Cancel test run
Step 3: Deploy to staging ──→ Compensate: Rollback deployment
Step 4: Notify team ──→ Compensate: Send failure notice
If Step 3 fails:
→ Compensate Step 2 (cancel tests)
→ Compensate Step 1 (close PR)
→ Saga state: COMPENSATING → FAILED
from hypervisor.saga.orchestrator import SagaOrchestrator
from hypervisor.saga.state_machine import SagaState, StepState
orchestrator = SagaOrchestrator()
# Create a new saga for this session
saga = orchestrator.create_saga(session_id="session-deploy-42")
# Add steps with execute and undo APIs
orchestrator.add_step(
saga_id=saga.saga_id,
action_id="pr.create",
agent_did="did:example:dev-agent",
execute_api="/api/pr/create",
undo_api="/api/pr/close", # compensation action
timeout_seconds=60,
max_retries=2,
)
orchestrator.add_step(
saga_id=saga.saga_id,
action_id="tests.run",
agent_did="did:example:ci-agent",
execute_api="/api/tests/run",
undo_api="/api/tests/cancel",
timeout_seconds=300,
max_retries=1,
)
orchestrator.add_step(
saga_id=saga.saga_id,
action_id="deploy.staging",
agent_did="did:example:deploy-agent",
execute_api="/api/deploy/staging",
undo_api="/api/deploy/rollback",
timeout_seconds=600,
)Each step transitions through a well-defined state machine:
StepState flow:
PENDING → EXECUTING → COMMITTED
↘ FAILED → COMPENSATING → COMPENSATED
↘ COMPENSATION_FAILED
The saga itself tracks the aggregate state:
from hypervisor.saga.state_machine import SagaState, StepState, STEP_TRANSITIONS
# Valid step transitions are enforced — invalid transitions raise errors
step = SagaStep(step_id="s1", action_id="pr.create", ...)
step.transition(StepState.EXECUTING) # PENDING → EXECUTING ✓
step.transition(StepState.COMMITTED) # EXECUTING → COMMITTED ✓
# step.transition(StepState.PENDING) # COMMITTED → PENDING ✗ (raises error)Saga-level states:
| State | Meaning |
|---|---|
RUNNING |
Steps are being executed sequentially |
COMPENSATING |
A step failed; compensation is running in reverse |
COMPLETED |
All steps committed successfully |
FAILED |
All compensation finished (or some compensation failed) |
ESCALATED |
Compensation itself failed; human intervention required |
For complex workflows, define sagas declaratively:
from hypervisor.saga.dsl import SagaDSLParser, SagaDefinition
saga_yaml = """
saga:
id: deploy-pipeline
steps:
- id: create-pr
action_id: pr.create
agent: did:example:dev-agent
execute_api: /api/pr/create
undo_api: /api/pr/close
timeout: 60
retries: 2
- id: run-tests
action_id: tests.run
agent: did:example:ci-agent
execute_api: /api/tests/run
undo_api: /api/tests/cancel
timeout: 300
depends_on: [create-pr]
- id: deploy-staging
action_id: deploy.staging
agent: did:example:deploy-agent
execute_api: /api/deploy/staging
undo_api: /api/deploy/rollback
timeout: 600
depends_on: [run-tests]
checkpoint_goal: "Staging deployment matches PR diff"
"""
parser = SagaDSLParser()
definition: SagaDefinition = parser.parse(saga_yaml)Checkpoints verify that each step actually achieved its goal, not just that it returned HTTP 200:
from hypervisor.saga.checkpoint import CheckpointManager, SemanticCheckpoint
checkpoint_mgr = CheckpointManager()
# After a deploy step, verify the deployment actually happened
checkpoint = SemanticCheckpoint(
step_id="deploy-staging",
goal="Staging deployment matches PR diff",
)
# The checkpoint manager evaluates whether the goal was metFor parallel step execution (e.g., deploy to multiple regions simultaneously):
from hypervisor.saga.fan_out import FanOutOrchestrator, FanOutPolicy
fan_out = FanOutOrchestrator()
# Execute the same action across multiple agents in parallel
# with configurable failure policies (fail-fast, best-effort, quorum)When multiple agents collaborate in a shared session, each agent gets an isolated view of the workspace. No agent can read or modify another agent's files without explicit sharing.
The SessionVFS provides per-agent isolated file views within a shared session:
from hypervisor.session.sso import SessionVFS, VFSPermissionError
vfs = SessionVFS()
# Agent A writes a file — only Agent A can see it
vfs.write(path="/workspace/plan.md", agent_did="did:agent-a", value="# My Plan")
# Agent A reads its own file — works fine
content = vfs.read(path="/workspace/plan.md", agent_did="did:agent-a")
assert content == "# My Plan"
# Agent B tries to read Agent A's file — blocked
try:
vfs.read(path="/workspace/plan.md", agent_did="did:agent-b")
except VFSPermissionError:
print("Access denied: Agent B cannot read Agent A's namespace")
# Agent B writes to the same path — it gets its own copy
vfs.write(path="/workspace/plan.md", agent_did="did:agent-b", value="# Different Plan")
# Each agent sees its own version
assert vfs.read("/workspace/plan.md", "did:agent-a") == "# My Plan"
assert vfs.read("/workspace/plan.md", "did:agent-b") == "# Different Plan"
# Delete is also scoped
vfs.delete(path="/workspace/plan.md", agent_did="did:agent-a")Choose the right isolation level based on your consistency requirements:
from hypervisor.session.isolation import IsolationLevel
# Snapshot — each agent sees a consistent snapshot (cheapest)
level = IsolationLevel.SNAPSHOT
assert not level.requires_vector_clocks
assert not level.requires_intent_locks
assert level.allows_concurrent_writes
assert level.coordination_cost == "low"
# Read Committed — agents see committed writes from others
level = IsolationLevel.READ_COMMITTED
assert level.requires_vector_clocks
assert not level.requires_intent_locks
# Serializable — strongest consistency (most expensive)
level = IsolationLevel.SERIALIZABLE
assert level.requires_vector_clocks
assert level.requires_intent_locks
assert not level.allows_concurrent_writes
assert level.coordination_cost == "high"When agents produce concurrent writes, vector clocks establish a causal order:
from hypervisor.session.vector_clock import VectorClockManager, CausalViolationError
clock_mgr = VectorClockManager()
# Each agent gets its own logical clock
clock_a = clock_mgr.create_clock("did:agent-a")
clock_b = clock_mgr.create_clock("did:agent-b")
# Agent A performs an action
clock_mgr.increment("did:agent-a")
# Check causal ordering — did A's action happen before B's?
happened_before = clock_mgr.happens_before(clock_a, clock_b)Prevent conflicting concurrent operations with intent locks:
from hypervisor.session.intent_locks import IntentLockManager, LockIntent, DeadlockError
lock_mgr = IntentLockManager()
# Agent A acquires a write lock on the session
lock_mgr.acquire_lock(
session_id="session-001",
agent_did="did:agent-a",
intent=LockIntent.WRITE,
)
# Agent B tries an exclusive lock — blocked until A releases
try:
lock_mgr.acquire_lock(
session_id="session-001",
agent_did="did:agent-b",
intent=LockIntent.EXCLUSIVE,
)
except DeadlockError:
print("Deadlock detected — aborting Agent B's operation")
# Release when done
lock_mgr.release_lock(session_id="session-001", agent_did="did:agent-a")Bring it all together with a SharedSessionObject:
from hypervisor.session import SharedSessionObject
from hypervisor.models import SessionConfig, ConsistencyMode
config = SessionConfig(
consistency_mode=ConsistencyMode.SERIALIZABLE,
max_participants=5,
max_duration_seconds=3600, # 1 hour
min_eff_score=0.60, # minimum trust to join
)
session = SharedSessionObject(
session_id="session-deploy-42",
config=config,
)
# Session provides:
# session.vfs — SessionVFS (isolated file views)
# session.vector_clocks — VectorClockManager (causal ordering)
# session.intent_locks — IntentLockManager (concurrency control)When an agent goes rogue, you need to stop it immediately — not after the next polling interval.
The KillSwitch terminates an agent and triggers saga compensation for any
in-flight work:
from hypervisor.security.kill_switch import KillSwitch, KillReason, KillResult
kill_switch = KillSwitch()
# Immediate termination — all in-flight saga steps are compensated
result: KillResult = kill_switch.kill(
agent_did="did:example:rogue-agent",
session_id="session-001",
reason=KillReason.BEHAVIORAL_DRIFT,
details="Agent started accessing files outside its namespace",
)
print(f"Kill ID: {result.kill_id}")
print(f"Compensation triggered: {result.compensation_triggered}")
print(f"Handoffs succeeded: {result.handoff_success_count}")
print(f"Timestamp: {result.timestamp}")Available kill reasons:
| Reason | When to use |
|---|---|
BEHAVIORAL_DRIFT |
Agent deviates from expected behavior patterns |
RATE_LIMIT |
Agent exceeded its rate limit repeatedly |
RING_BREACH |
Agent attempted actions above its ring level |
MANUAL |
Human operator triggered the kill |
QUARANTINE_TIMEOUT |
Agent was quarantined and didn't recover |
SESSION_TIMEOUT |
Session exceeded its max_duration_seconds |
Before killing, you can register substitute agents to take over in-flight work:
from hypervisor.security.kill_switch import KillSwitch, HandoffStatus
kill_switch = KillSwitch()
# Register a substitute agent that can take over work
kill_switch.register_substitute(
session_id="session-001",
agent_did="did:example:backup-agent",
)
# Now when the primary agent is killed, its saga steps are handed off
result = kill_switch.kill(
agent_did="did:example:primary-agent",
session_id="session-001",
reason=KillReason.MANUAL,
details="Planned maintenance rotation",
)
# Check handoff results
for handoff in result.handoffs:
print(f"Step {handoff.step_id}: {handoff.status}")
# HandoffStatus: PENDING, HANDED_OFF, FAILED, COMPENSATED
# Review kill history
history = kill_switch.get_kill_history(agent_did="did:example:primary-agent")Prevent resource exhaustion with per-agent rate limits:
from hypervisor.security.rate_limiter import AgentRateLimiter, RateLimitExceeded
# Ring 3 agents: 10 calls per minute
sandbox_limiter = AgentRateLimiter(
window_seconds=60.0,
max_calls=10,
)
# Ring 2 agents: 100 calls per minute
standard_limiter = AgentRateLimiter(
window_seconds=60.0,
max_calls=100,
)
# Check before each action
status = sandbox_limiter.check_rate_limit(agent_did="did:example:new-agent")
if not status.allowed:
print(f"Rate limited — retry after {status.retry_after_seconds}s")
# Reset limits (e.g., after an agent is promoted)
sandbox_limiter.reset(agent_did="did:example:new-agent")Quarantine isolates an agent without killing it — useful for investigation:
from hypervisor.liability.quarantine import QuarantineManager, QuarantineReason
quarantine = QuarantineManager()
# Quarantine a suspect agent — it can't take new actions but existing
# saga steps are preserved for forensic analysisWire breach detection into your kill switch for automated response:
from hypervisor.rings.breach_detector import RingBreachDetector, BreachSeverity
from hypervisor.security.kill_switch import KillSwitch, KillReason
from hypervisor.security.rate_limiter import AgentRateLimiter
detector = RingBreachDetector()
kill_switch = KillSwitch()
limiter = AgentRateLimiter(window_seconds=60.0, max_calls=100)
async def on_agent_action(agent_did: str, session_id: str, action_id: str):
"""Example enforcement pipeline for every agent action."""
# Layer 1: Rate limit check
status = limiter.check_rate_limit(agent_did)
if not status.allowed:
kill_switch.kill(agent_did, session_id, KillReason.RATE_LIMIT)
return
# Layer 2: Ring enforcement (breach detection)
# If a breach is CRITICAL severity → kill immediately
# If WARNING → log and allow (the agent might be testing boundaries)
# Layer 3: Capability guard check (handled by middleware)
# Layer 4: Saga step execution (handled by orchestrator)The runtime includes a FastAPI server for HTTP-based enforcement:
# Install with API extras
pip install "agent-runtime[api]"
# Start the server
hypervisor serve --host 0.0.0.0 --port 8000FROM python:3.11-slim
WORKDIR /app
RUN pip install "agent-runtime[full,api]"
EXPOSE 8000
CMD ["hypervisor", "serve", "--host", "0.0.0.0", "--port", "8000"]Build and run:
docker build -t agent-runtime:latest .
docker run -p 8000:8000 agent-runtime:latestDeploy the runtime as a sidecar alongside your agent pods:
# runtime-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-runtime
labels:
app: agent-runtime
spec:
replicas: 2
selector:
matchLabels:
app: agent-runtime
template:
metadata:
labels:
app: agent-runtime
spec:
containers:
- name: runtime
image: agent-runtime:latest
ports:
- containerPort: 8000
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
env:
- name: HYPERVISOR_LOG_LEVEL
value: "INFO"
---
apiVersion: v1
kind: Service
metadata:
name: agent-runtime
spec:
selector:
app: agent-runtime
ports:
- port: 8000
targetPort: 8000For fine-grained per-pod enforcement, run the runtime as a sidecar:
# agent-pod-with-sidecar.yaml
apiVersion: v1
kind: Pod
metadata:
name: agent-worker
spec:
containers:
# Your agent container
- name: agent
image: my-agent:latest
env:
- name: HYPERVISOR_URL
value: "http://localhost:8000"
# Runtime sidecar — enforces sandboxing for this pod
- name: runtime-sidecar
image: agent-runtime:latest
ports:
- containerPort: 8000
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "250m"Create a values file for parameterized deployments:
# values.yaml
replicaCount: 2
image:
repository: agent-runtime
tag: "latest"
pullPolicy: IfNotPresent
service:
type: ClusterIP
port: 8000
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
runtime:
logLevel: INFO
rateLimiting:
ring3MaxCalls: 10
ring2MaxCalls: 100
ring1MaxCalls: 1000
windowSeconds: 60
session:
maxDurationSeconds: 3600
maxParticipants: 10
defaultIsolation: snapshot
saga:
defaultTimeoutSeconds: 300
maxRetries: 2Monitor your runtime with the built-in event bus:
from hypervisor.observability.event_bus import HypervisorEventBus, EventType
from hypervisor.observability.causal_trace import CausalTraceId
event_bus = HypervisorEventBus()
# Subscribe to security events
@event_bus.subscribe(EventType.RING_BREACH)
async def on_breach(event):
print(f"BREACH: {event.agent_did} attempted {event.action_id}")
@event_bus.subscribe(EventType.KILL_SWITCH)
async def on_kill(event):
print(f"KILLED: {event.agent_did} — reason: {event.reason}")
# Trace causality across distributed saga steps
trace_id = CausalTraceId.generate()| Layer | Component | What It Does |
|---|---|---|
| Privilege | ExecutionRing |
4-tier access model based on trust score |
| Privilege | ActionClassifier |
Maps actions to rings by risk/reversibility |
| Privilege | RingElevationManager |
Temporary privilege escalation with TTL |
| Detection | RingBreachDetector |
Alerts on ring boundary violations |
| Tools | CapabilityGuardMiddleware |
Per-agent tool allow/deny lists |
| Transactions | SagaOrchestrator |
Multi-step workflows with auto-rollback |
| Isolation | SessionVFS |
Per-agent virtual file system namespacing |
| Isolation | IntentLockManager |
Concurrency control with intent locks |
| Isolation | VectorClockManager |
Causal ordering of concurrent operations |
| Emergency | KillSwitch |
Immediate agent termination |
| Emergency | AgentRateLimiter |
Per-agent call rate enforcement |
| Emergency | QuarantineManager |
Agent isolation for investigation |
| Observability | HypervisorEventBus |
Real-time event streaming |
- Audit trails: Explore
CommitmentEngineandDeltaEnginefor hash-chained, tamper-evident logging. - Liability: See
LiabilityMatrix,CausalAttributor, andSlashingEnginefor agent accountability. - Deployment: Read the Azure Container Apps guide for cloud-native deployment patterns.