Package:
agent-os-kernelv1.3.1 Β· Python: β₯3.9 Β· License: MIT
Comprehensive reference for all public modules, classes, and functions in Agent OS.
- Core β Stateless Kernel
- Core β Base Agent
- Integrations β Governance Policy
- Integrations β Profiling
- Semantic Policy Engine
- Context Budget Scheduler
- Mute Agent Primitives (Face/Hands)
- Health Checks
- Circuit Breaker
- Metrics
- Trust Root & Supervisor Hierarchy
- Execution Sandbox
- Exception Hierarchy
- CLI Reference
- Extensions
Module: agent_os.stateless
MCP-compliant stateless execution kernel. Every request is self-contained β the kernel stores no session state in-process, enabling horizontal scaling behind a load balancer.
from agent_os.stateless import StatelessKernel, ExecutionContext
kernel = StatelessKernel()
ctx = ExecutionContext(agent_id="analyst-001", policies=["read_only"])
result = await kernel.execute("database_query", {"query": "SELECT 1"}, ctx)| Method | Signature | Returns | Description |
|---|---|---|---|
__init__ |
(backend: StateBackend = None, policies: dict = None, enable_tracing: bool = False, circuit_breaker_config: CircuitBreakerConfig = None) |
β | Create kernel with optional backend, custom policies, OTel tracing, and circuit breaker config |
execute |
async (action: str, params: dict, context: ExecutionContext) -> ExecutionResult |
ExecutionResult |
Execute an action with full policy governance. Returns signal="SIGKILL" on policy violation, signal="SIGTERM" on execution error |
All state needed for a request. Callers thread updated_context from one result into the next request.
| Field | Type | Default | Description |
|---|---|---|---|
agent_id |
str |
β | Unique identifier of the requesting agent |
policies |
list[str] |
[] |
Policy names to enforce (e.g. ["read_only", "no_pii"]) |
history |
list[dict] |
[] |
Chronological list of previous actions |
state_ref |
str | None |
None |
Key referencing externalized state in the backend |
metadata |
dict |
{} |
Arbitrary metadata passed through to the result |
| Field | Type | Description |
|---|---|---|
success |
bool |
True if the action completed without violation or error |
data |
Any |
The action's return value (None on failure) |
error |
str | None |
Human-readable error message |
signal |
str | None |
"SIGKILL" (policy), "SIGTERM" (execution error), "ESCALATE", "DEFER" |
updated_context |
ExecutionContext | None |
Context with updated history for the next request |
metadata |
dict |
Request metadata including request_id and timestamp |
Internal representation auto-generated by execute(). The request_id is a truncated SHA-256 hash for log correlation.
| Field | Type | Description |
|---|---|---|
action |
str |
Action name |
params |
dict |
Action parameters |
context |
ExecutionContext |
Full execution context |
request_id |
str | None |
Auto-generated correlation ID |
from agent_os import stateless_execute
result = await stateless_execute(
action="database_query",
params={"query": "SELECT * FROM users"},
agent_id="analyst-001",
policies=["read_only"],
)| Parameter | Type | Default | Description |
|---|---|---|---|
action |
str |
β | Action to execute |
params |
dict |
β | Action parameters |
agent_id |
str |
β | Requesting agent identifier |
policies |
list[str] | None |
[] |
Policy names to enforce |
history |
list[dict] | None |
[] |
Prior action history |
backend |
StateBackend | None |
MemoryBackend() |
State backend |
Any object implementing get, set, and delete as async methods satisfies this protocol via structural subtyping.
class StateBackend(Protocol):
async def get(self, key: str) -> Optional[Dict[str, Any]]: ...
async def set(self, key: str, value: Dict[str, Any], ttl: Optional[int] = None) -> None: ...
async def delete(self, key: str) -> None: ...In-memory backend with lazy TTL expiry. Dev/test only β state is lost on restart.
Production backend with connection pooling and configurable timeouts.
from agent_os.stateless import RedisBackend, RedisConfig
backend = RedisBackend(config=RedisConfig(
host="redis.example.com",
port=6379,
pool_size=20,
connect_timeout=5.0,
))
kernel = StatelessKernel(backend=backend)RedisConfig Field |
Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
Redis hostname |
port |
int |
6379 |
Redis port |
db |
int |
0 |
Database number |
password |
str | None |
None |
Authentication password |
pool_size |
int |
10 |
Max connections |
connect_timeout |
float |
5.0 |
Connection timeout (seconds) |
read_timeout |
float |
10.0 |
Read timeout (seconds) |
retry_on_timeout |
bool |
True |
Retry on timeout |
Module: agent_os.base_agent
Reusable base classes for building agents with policy governance, audit logging, and tool integration.
from agent_os.base_agent import BaseAgent, AgentConfig
class MyAgent(BaseAgent):
async def run(self, task: str) -> ExecutionResult:
return await self._execute("process", {"task": task})
agent = MyAgent(AgentConfig(agent_id="my-agent", policies=["read_only"]))
result = await agent.run("hello")| Method / Property | Signature | Returns | Description |
|---|---|---|---|
__init__ |
(config: AgentConfig, defer_timeout: float = 30.0) |
β | Initialize with config and optional defer timeout |
agent_id |
property | str |
Agent's unique identifier |
policies |
property | list[str] |
Copy of active policies |
run |
async (*args, **kwargs) |
ExecutionResult |
Abstract β implement the agent's main task |
get_audit_log |
() |
list[dict] |
Full audit log as dictionaries |
clear_audit_log |
() |
None |
Clear the audit log |
query_audit_log |
(action, decision, since, limit, offset) |
list[dict] |
Filter audit entries with optional criteria |
get_execution_stats |
() |
dict |
Execution time statistics (avg, min, max, p99) |
get_escalation_queue |
() |
list[EscalationRequest] |
Pending escalation requests |
set_defer_callback |
(callback) |
None |
Register async callback for DEFER policy decisions |
Extends BaseAgent with tool discovery and execution. Tools are executed through the kernel for policy enforcement.
class AnalysisAgent(ToolUsingAgent):
async def run(self, data: str) -> ExecutionResult:
return await self._use_tool("json_parser", {"text": data})
agent = AnalysisAgent(
AgentConfig(agent_id="analyzer"),
tools=["json_parser", "csv_reader"],
)| Method | Signature | Returns | Description |
|---|---|---|---|
__init__ |
(config: AgentConfig, tools: list[str] = None) |
β | Initialize with optional tool allowlist |
_use_tool |
async (tool_name: str, params: dict) |
ExecutionResult |
Execute a tool through the kernel |
list_allowed_tools |
() |
list[str] | None |
Allowed tools, or None if all permitted |
| Field | Type | Default | Description |
|---|---|---|---|
agent_id |
str |
β | 3β64 chars, alphanumeric with dashes |
policies |
list[str] |
[] |
Policy names to apply |
metadata |
dict |
{} |
Additional metadata |
state_backend |
StateBackend | None |
None |
Custom state backend |
max_audit_log_size |
int |
10000 |
Max audit log entries |
max_metadata_size_bytes |
int |
1048576 |
Max metadata value size (1 MB) |
| Class Method | Signature | Description |
|---|---|---|
from_file |
(path: str) -> AgentConfig |
Load from YAML or JSON file |
from_dict |
(data: dict) -> AgentConfig |
Deserialize from dictionary |
to_dict |
() -> dict |
Serialize to dictionary |
| Field | Type | Description |
|---|---|---|
timestamp |
datetime |
When the entry was created (UTC) |
agent_id |
str |
Agent that made the request |
request_id |
str |
Correlation ID |
action |
str |
Action attempted |
params |
dict |
Action parameters |
decision |
PolicyDecision |
ALLOW, DENY, AUDIT, ESCALATE, or DEFER |
result_success |
bool | None |
Whether execution succeeded |
error |
str | None |
Error message if failed |
execution_time_ms |
float | None |
Execution duration |
| Value | Description |
|---|---|
ALLOW |
Action permitted |
DENY |
Action blocked |
AUDIT |
Allow but log for review |
ESCALATE |
Route to human reviewer |
DEFER |
Async policy evaluation with callback |
Generic typed wrapper for execution results with optional transform function.
from agent_os.base_agent import TypedResult
typed = TypedResult.from_execution_result(result, transform=int)Module: agent_os.integrations.base
Central policy configuration for governed AI agents. Validated on construction and serializable to YAML.
from agent_os.integrations.base import GovernancePolicy, PatternType
policy = GovernancePolicy(
name="read_only_strict",
max_tokens=2048,
max_tool_calls=5,
allowed_tools=["read_file", "web_search"],
blocked_patterns=[
"password",
(r"rm\s+-rf", PatternType.REGEX),
("*.exe", PatternType.GLOB),
],
require_human_approval=True,
confidence_threshold=0.9,
)| Field | Type | Default | Description |
|---|---|---|---|
name |
str |
"default" |
Human-readable policy name |
max_tokens |
int |
4096 |
Max tokens per request (must be > 0) |
max_tool_calls |
int |
10 |
Max tool calls per request (0 disables) |
allowed_tools |
list[str] |
[] |
Tool allowlist (empty = all permitted) |
blocked_patterns |
list[str | tuple] |
[] |
Blocked content patterns |
require_human_approval |
bool |
False |
Require human approval for tool calls |
timeout_seconds |
int |
300 |
Max wall-clock time per request |
confidence_threshold |
float |
0.8 |
Minimum confidence score (0.0β1.0) |
drift_threshold |
float |
0.15 |
Max semantic drift before alert (0.0β1.0) |
log_all_calls |
bool |
True |
Log every tool call |
checkpoint_frequency |
int |
5 |
Checkpoint every N calls |
max_concurrent |
int |
10 |
Max concurrent agent executions |
backpressure_threshold |
int |
8 |
Concurrency level to start throttling |
version |
str |
"1.0.0" |
Semantic version for auditable evolution |
| Method | Signature | Returns | Description |
|---|---|---|---|
validate |
() |
None |
Validate all fields (called in __post_init__) |
detect_conflicts |
() |
list[str] |
Detect contradictory policy settings |
matches_pattern |
(text: str) |
list[str] |
Return all blocked patterns matching text |
is_stricter_than |
(other: GovernancePolicy) |
bool |
Check if this policy is stricter |
to_yaml |
() |
str |
Serialize to YAML |
from_yaml |
(yaml_str: str) |
GovernancePolicy |
Deserialize from YAML |
| Value | Description |
|---|---|
SUBSTRING |
Plain substring match |
REGEX |
Regular expression (compiled, case-insensitive) |
GLOB |
Glob/wildcard pattern |
| Value | Description |
|---|---|
POLICY_CHECK |
A policy check was performed |
POLICY_VIOLATION |
A policy was violated |
TOOL_CALL_BLOCKED |
A tool call was blocked |
CHECKPOINT_CREATED |
A governance checkpoint was created |
DRIFT_DETECTED |
Semantic drift was detected |
Module: agent_os.integrations.profiling
Decorator that measures execution time and optionally memory usage.
from agent_os.integrations.profiling import profile_governance, get_report
@profile_governance
def check_policy(self, action):
...
@profile_governance(track_memory=True)
def expensive_check(self, data):
...
report = get_report()
print(report.format_report())| Function | Signature | Description |
|---|---|---|
profile_governance |
(func=None, *, track_memory=False) |
Decorator for method profiling |
get_report |
() β ProfilingReport |
Retrieve the global profiling report |
reset_report |
() |
Reset all profiling data |
Context manager for scoped profiling.
from agent_os.integrations.profiling import ProfileGovernanceContext
with ProfileGovernanceContext(track_memory=True) as report:
# run profiled code
print(report.format_report())Module: agent_os.semantic_policy
Intent-based policy enforcement using weighted keyword signals. Zero-dependency heuristic classifier (<1 ms).
from agent_os.semantic_policy import SemanticPolicyEngine, IntentCategory
engine = SemanticPolicyEngine()
# Classify intent
result = engine.classify("database_query", {"query": "DROP TABLE users"})
result.category # IntentCategory.DESTRUCTIVE_DATA
result.confidence # 0.9
# Classify and enforce β raises PolicyDenied if blocked
engine.check("database_query", {"query": "SELECT 1"})| Method | Signature | Returns | Description |
|---|---|---|---|
__init__ |
(deny: list[IntentCategory] = None, confidence_threshold: float = 0.5, custom_signals: dict = None) |
β | Create engine with deny categories, threshold, and optional custom signals |
classify |
(action: str, params: dict) |
IntentClassification |
Classify semantic intent of action+params |
check |
(action: str, params: dict, *, deny: list = None, policy_name: str = "") |
IntentClassification |
Classify and enforce β raises PolicyDenied if intent is in deny set |
| Value | Description |
|---|---|
DESTRUCTIVE_DATA |
DROP, DELETE, TRUNCATE, wipe |
DATA_EXFILTRATION |
Bulk export, dump, copy-to-external |
PRIVILEGE_ESCALATION |
GRANT, sudo, chmod, admin |
SYSTEM_MODIFICATION |
rm, shutdown, reboot, kill |
CODE_EXECUTION |
exec, eval, subprocess, shell |
NETWORK_ACCESS |
fetch, curl, HTTP, connect |
DATA_READ |
SELECT, get, read, list |
DATA_WRITE |
INSERT, UPDATE, create, write |
BENIGN |
No risk signals detected |
| Field | Type | Description |
|---|---|---|
category |
IntentCategory |
Classified intent category |
confidence |
float |
Confidence score (0.0β1.0) |
matched_signals |
tuple |
Signal keywords that matched |
explanation |
str |
Human-readable explanation |
is_dangerous |
bool (property) |
True if dangerous category with confidence β₯ 0.5 |
Raised when an action is denied by semantic policy. Attributes: classification, policy_name.
Module: agent_os.context_budget
Token budget as a kernel primitive. Enforces the "90% lookup, 10% reasoning" split and emits signals when agents exceed budgets.
from agent_os.context_budget import ContextScheduler, ContextPriority
scheduler = ContextScheduler(total_budget=16000, lookup_ratio=0.90)
# Allocate a window
window = scheduler.allocate("agent-001", "summarize report", ContextPriority.HIGH)
# Record usage
scheduler.record_usage("agent-001", lookup_tokens=500, reasoning_tokens=50)
# Release when done
scheduler.release("agent-001")| Method / Property | Signature | Returns | Description |
|---|---|---|---|
__init__ |
(total_budget: int = 8000, lookup_ratio: float = 0.90, warn_threshold: float = 0.85) |
β | Create scheduler with global token pool |
allocate |
(agent_id, task, priority=NORMAL, max_tokens=None) |
ContextWindow |
Allocate a context window |
record_usage |
(agent_id, lookup_tokens=0, reasoning_tokens=0) |
UsageRecord |
Record token usage; emits SIGWARN/SIGSTOP |
release |
(agent_id) |
UsageRecord | None |
Release allocation and move to history |
get_usage |
(agent_id) |
UsageRecord | None |
Get current usage for an agent |
get_health_report |
() |
dict |
Summary of scheduler state |
on_signal |
(signal: AgentSignal, handler: Callable) |
None |
Register a signal handler |
active_agents |
property | list[str] |
Currently active agent IDs |
active_count |
property | int |
Number of active agents |
available_tokens |
property | int |
Remaining tokens in the pool |
utilization |
property | float |
Global pool utilization (0.0β1.0) |
| Field | Type | Description |
|---|---|---|
agent_id |
str |
Agent this window belongs to |
task |
str |
Task description |
lookup_budget |
int |
Tokens for retrieval/facts |
reasoning_budget |
int |
Tokens for LLM reasoning |
total |
int |
lookup_budget + reasoning_budget |
| Value | Level | Min Tokens |
|---|---|---|
CRITICAL |
3 | 4000 |
HIGH |
2 | 2000 |
NORMAL |
1 | 1000 |
LOW |
0 | 500 |
| Value | Description |
|---|---|
SIGSTOP |
Budget exceeded β halt the agent |
SIGWARN |
Budget nearing limit |
SIGRESUME |
Budget replenished |
Raised when an agent exceeds its context budget. Attributes: agent_id, budget, used.
Module: agent_os.mute
Separates reasoning from execution with kernel-enforced trust boundaries β the agent equivalent of Unix privilege separation.
Marks a function as a Face (reasoning) agent. Can call LLMs and produce plans but cannot execute side-effects.
from agent_os.mute import face_agent, ExecutionPlan, ActionStep
@face_agent(capabilities=["db.read", "file.write"])
async def planner(task: str) -> ExecutionPlan:
return ExecutionPlan(steps=[
ActionStep(action="db.read", params={"query": "SELECT 1"})
])Marks a function as a Mute (Hands/execution) agent. Can execute actions but cannot call LLMs.
from agent_os.mute import mute_agent, ActionStep
@mute_agent(capabilities=["db.read", "file.write"])
async def executor(step: ActionStep) -> dict:
return {"rows": [1]}Execute a FaceβHands pipeline with kernel-level validation.
from agent_os.mute import pipe
result = await pipe(planner, executor, "get me the row count")
result.success # True
result.step_results # [StepResult(...)]
result.audit_log # Full audit trail| Parameter | Type | Default | Description |
|---|---|---|---|
face_fn |
Callable |
β | @face_agent function |
mute_fn |
Callable |
β | @mute_agent function |
task |
Any |
β | Input to the face agent |
face_args |
dict | None |
None |
Extra kwargs for face agent |
halt_on_deny |
bool |
True |
Stop if any step is denied |
halt_on_error |
bool |
False |
Stop if any step fails |
| Type | Fields | Description |
|---|---|---|
ActionStep |
action, params, description, depends_on |
Single atomic action in a plan |
ExecutionPlan |
steps, metadata, plan_id |
Structured output from a Face agent |
StepResult |
step_index, action, status, data, error, duration_ms |
Result of executing a single step |
PipelineResult |
plan, step_results, success, total_duration_ms, denied_steps, audit_log |
Full pipeline result |
ActionStatus |
Enum: PENDING, APPROVED, DENIED, EXECUTED, FAILED |
Status of an action step |
Raised when an agent attempts an action outside its capabilities. Attributes: agent_role, action, allowed.
Module: agent_os.health (re-exported from agent_os.integrations.health)
Thread-safe health checker for K8s readiness/liveness probes.
from agent_os.health import HealthChecker, HealthStatus, ComponentHealth
checker = HealthChecker(version="1.3.1")
checker.register_check("database", lambda: ComponentHealth(
name="database",
status=HealthStatus.HEALTHY,
message="connected",
))
report = checker.check_health() # Full health report
report = checker.check_ready() # Readiness probe
report = checker.check_live() # Liveness probe (lightweight)
print(report.to_dict())
print(report.is_healthy()) # True
print(report.is_ready()) # True (not UNHEALTHY)| Method | Signature | Returns | Description |
|---|---|---|---|
__init__ |
(version: str = "1.0.0") |
β | Create checker |
register_check |
(name: str, check_fn: Callable) |
None |
Register a named health check (thread-safe) |
check_health |
() |
HealthReport |
Run all checks and return full report |
check_ready |
() |
HealthReport |
Readiness probe (same as full check) |
check_live |
() |
HealthReport |
Liveness probe (lightweight, process only) |
HEALTHY, DEGRADED, UNHEALTHY
| Field | Type | Description |
|---|---|---|
status |
HealthStatus |
Aggregate status |
components |
dict[str, ComponentHealth] |
Per-component results |
timestamp |
str |
ISO 8601 timestamp |
version |
str |
Application version |
uptime_seconds |
float |
Time since checker creation |
Module: agent_os.circuit_breaker
Prevents cascading failures by tracking backend errors and short-circuiting calls when a threshold is exceeded.
from agent_os.circuit_breaker import CircuitBreaker, CircuitBreakerConfig
cb = CircuitBreaker(CircuitBreakerConfig(failure_threshold=3, reset_timeout_seconds=15.0))
result = await cb.call(backend.get, "key")| Method | Signature | Returns | Description |
|---|---|---|---|
__init__ |
(config: CircuitBreakerConfig = None) |
β | Create with optional config |
call |
async (func, *args, **kwargs) |
Any |
Execute through circuit breaker; raises CircuitBreakerOpen if open |
get_state |
() |
CircuitState |
Current state (CLOSED, OPEN, HALF_OPEN) |
record_success |
() |
None |
Record a success and reset failure count |
record_failure |
() |
None |
Record a failure; opens circuit if threshold reached |
reset |
() |
None |
Manually reset to CLOSED |
| Field | Type | Default | Description |
|---|---|---|---|
failure_threshold |
int |
5 |
Consecutive failures before opening |
reset_timeout_seconds |
float |
30.0 |
Seconds before OPEN β HALF_OPEN |
half_open_max_calls |
int |
1 |
Max calls in HALF_OPEN before deciding |
Module: agent_os.metrics
Thread-safe singleton that records policy enforcement statistics across all governance adapters.
from agent_os.metrics import metrics
metrics.record_check("langchain", latency_ms=1.2, approved=True)
metrics.record_violation("crewai")
metrics.record_blocked("crewai")
snap = metrics.snapshot()
# {"total_checks": 1, "violations": 1, "approvals": 1, "blocked": 1, ...}| Method | Signature | Returns | Description |
|---|---|---|---|
record_check |
(adapter: str, latency_ms: float, approved: bool) |
None |
Record a policy check |
record_violation |
(adapter: str) |
None |
Record a standalone violation |
record_blocked |
(adapter: str) |
None |
Record a blocked tool call |
snapshot |
() |
dict |
JSON-serializable metrics snapshot |
reset |
() |
None |
Reset all counters (useful for tests) |
Module: agent_os.trust_root
Deterministic (non-LLM) policy authority at the top of the supervisor chain. Cannot be overridden by any agent.
from agent_os.trust_root import TrustRoot, TrustDecision
from agent_os.integrations.base import GovernancePolicy
root = TrustRoot(policies=[GovernancePolicy(allowed_tools=["read_file"])])
decision = root.validate_action({"tool": "delete_file", "arguments": {}})
decision.allowed # False| Method | Signature | Returns | Description |
|---|---|---|---|
__init__ |
(policies: list[GovernancePolicy], max_escalation_depth: int = 3) |
β | Create with at least one policy |
validate_action |
(action: dict) |
TrustDecision |
Deterministic policy check |
validate_supervisor |
(supervisor_config: dict) |
bool |
Verify supervisor meets trust requirements |
is_deterministic |
() |
bool |
Always returns True |
Module: agent_os.supervisor
Layered supervision with a deterministic trust root. Level 0 must be deterministic (non-LLM).
from agent_os.supervisor import SupervisorHierarchy
hierarchy = SupervisorHierarchy(trust_root=root)
hierarchy.register_supervisor("trust-root", level=0, is_agent=False)
hierarchy.register_supervisor("safety-agent", level=1, is_agent=True)
violations = hierarchy.validate_hierarchy() # [] if valid| Method | Signature | Returns | Description |
|---|---|---|---|
register_supervisor |
(name, level, is_agent=True) |
None |
Register a supervisor at a hierarchy level |
validate_hierarchy |
() |
list[str] |
Return list of violations (empty = valid) |
get_authority_chain |
(action: dict) |
list[str] |
Ordered supervisor chain for an action |
escalate |
(action: dict, from_level: int) |
TrustDecision |
Escalate action up the hierarchy |
Module: agent_os.sandbox
Prevents agents from bypassing the kernel via direct stdlib calls using import hooks and AST-based static analysis.
| Field | Type | Default | Description |
|---|---|---|---|
blocked_modules |
list[str] |
["subprocess", "os", "shutil", "socket", "ctypes"] |
Modules to block |
blocked_builtins |
list[str] |
["exec", "eval", "compile", "__import__"] |
Builtins to block |
allowed_paths |
list[str] |
[] |
Allowed file system paths |
max_memory_mb |
int | None |
None |
Memory limit |
max_cpu_seconds |
int | None |
None |
CPU time limit |
Module: agent_os.exceptions
All exceptions carry error_code, details dict, and timestamp for structured error handling.
AgentOSError
βββ PolicyError
β βββ PolicyViolationError (POLICY_VIOLATION)
β βββ PolicyDeniedError (POLICY_DENIED)
β βββ PolicyTimeoutError (POLICY_TIMEOUT)
βββ BudgetError
β βββ BudgetExceededError (BUDGET_EXCEEDED)
β βββ BudgetWarningError (BUDGET_WARNING)
βββ IdentityError
β βββ IdentityVerificationError (IDENTITY_VERIFICATION_FAILED)
β βββ CredentialExpiredError (CREDENTIAL_EXPIRED)
βββ IntegrationError
β βββ AdapterNotFoundError (ADAPTER_NOT_FOUND)
β βββ AdapterTimeoutError (ADAPTER_TIMEOUT)
βββ ConfigurationError
β βββ InvalidPolicyError (INVALID_POLICY)
β βββ MissingConfigError (MISSING_CONFIG)
βββ RateLimitError (RATE_LIMIT_EXCEEDED)
βββ SecurityError (SECURITY_VIOLATION)
βββ SerializationError (SERIALIZATION_ERROR)
All exceptions support to_dict() for JSON serialization:
from agent_os.exceptions import PolicyViolationError
try:
raise PolicyViolationError("Blocked action", details={"tool": "rm"})
except PolicyViolationError as e:
print(e.to_dict())
# {"error": "POLICY_VIOLATION", "message": "Blocked action", "details": {"tool": "rm"}, ...}agentos init [--template strict|permissive|audit] [--force]
agentos secure [--verify]
agentos audit [--format text|json]
agentos status [--format FORMAT]
agentos check [files...] [--staged] [--ci] [--format text|json]
agentos review <file> [--cmvk] [--models MODEL1,MODEL2]
agentos validate [files...] [--strict]
agentos install-hooks [--force] [--append]
agentos serve [--port PORT]
agentos metrics
| Variable | Default | Description |
|---|---|---|
AGENTOS_CONFIG |
.agents/ |
Path to config directory |
AGENTOS_LOG_LEVEL |
WARNING |
DEBUG, INFO, WARNING, ERROR |
AGENTOS_BACKEND |
memory |
memory or redis |
AGENTOS_REDIS_URL |
redis://localhost:6379 |
Redis connection URL |
See agentos <command> --help for detailed usage of each command.
Endpoint: POST /api/copilot
curl -X POST https://your-deploy.vercel.app/api/copilot \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"@agentos help"}]}'npx agentos-mcp-serverTools: create_agent, list_templates, check_compliance, run_tests, security_audit, debug_agent
See extensions/mcp-server/README.md for full tool documentation.