This guide covers the enterprise-grade features added to the Self-Correcting Agent Kernel. These features address the three critical gaps identified in the competitive assessment:
- Multi-Agent Orchestration - Full hierarchical swarms with conflict resolution
- Tool Ecosystem - OpenAPI auto-discovery with 60+ built-in tools
- Scalability & Reliability - Distributed execution with failover
Enable asynchronous message passing between agents:
from src.agents.pubsub import InMemoryPubSub, PubSubMessage, MessagePriority
# Create pub-sub system
pubsub = InMemoryPubSub()
# Define message handler
async def handle_message(msg: PubSubMessage):
print(f"Received from {msg.from_agent}: {msg.payload}")
# Subscribe to topic
await pubsub.subscribe("alerts", handle_message)
# Publish message
msg = PubSubMessage(
topic="alerts",
from_agent="agent-001",
payload={"alert": "System degradation detected"},
priority=MessagePriority.HIGH
)
await pubsub.publish("alerts", msg)Features:
- In-memory (dev) and Redis (production) backends
- Message history for debugging
- Dead-letter queues for failed deliveries
- Priority-based routing
Coordinate multiple agents for collaborative tasks:
from src.agents.pubsub import AgentSwarm
# Create swarm
swarm = AgentSwarm(
"fraud-detection",
pubsub,
["analyst-001", "analyst-002", "analyst-003"]
)
# Broadcast to all agents
await swarm.broadcast(
"supervisor",
"Investigate transaction T-12345",
{"transaction_id": "T-12345", "risk_score": 0.87}
)
# Request consensus
result = await swarm.request_consensus(
"analyst-001",
"Block account due to fraud?",
{"confidence": 0.92},
required_votes=2
)
# Distribute work
tasks = [{"task": "analyze", "tx_id": f"T-{i}"} for i in range(10)]
assignments = await swarm.distribute_work(
"supervisor",
tasks,
strategy="round_robin"
)Resolve disagreements between agents using various voting mechanisms:
from src.agents.conflict_resolution import (
ConflictResolver, AgentVote, ConflictType, VoteType
)
resolver = ConflictResolver(
default_vote_type=VoteType.MAJORITY,
supervisor_agent_id="supervisor-001"
)
# Agents vote on decision
votes = [
AgentVote(
agent_id="expert-001",
option="approve",
confidence=0.95,
reasoning="Deep analysis shows approval is safe"
),
AgentVote(
agent_id="novice-001",
option="reject",
confidence=0.6
),
AgentVote(
agent_id="novice-002",
option="reject",
confidence=0.5
)
]
# Resolve using weighted voting (expertise matters)
resolution = await resolver.resolve_conflict(
"conflict-001",
ConflictType.DECISION,
votes,
VoteType.WEIGHTED
)
print(f"Decision: {resolution.winning_option}")
print(f"Consensus: {resolution.consensus_score:.1%}")Voting Mechanisms:
- Majority: Simple 50%+1 majority
- Supermajority: 2/3 or 3/4 threshold
- Unanimous: All agents must agree
- Weighted: Votes weighted by confidence
- Ranked-Choice: Instant runoff voting
60+ tools across 6 categories ready to use:
from src.interfaces.openapi_tools import create_builtin_tools_library
from src.interfaces.tool_registry import ToolRegistry
registry = ToolRegistry()
tools = create_builtin_tools_library()
# Register tools
for tool in tools:
registry.register_tool(tool, mock_executor)
# Execute a tool
result = await registry.execute_tool(
"text_summarize",
{"text": "Long document...", "max_length": 100}
)Tool Categories:
- Text Processing (10 tools): search, summarize, translate, sentiment analysis
- Data Manipulation (10 tools): filter, sort, aggregate, pivot
- File Operations (10 tools): read, write, copy, delete
- Web Interaction (10 tools): search, fetch, scrape, download
- Mathematics (10 tools): calculate, convert units, statistics
- Time/Date (10 tools): format, parse, calculate age
Automatically generate tools from OpenAPI specifications:
from src.interfaces.openapi_tools import OpenAPIParser
parser = OpenAPIParser()
# Parse OpenAPI spec
spec_yaml = """
openapi: 3.0.0
info:
title: My API
version: 1.0.0
paths:
/users:
get:
operationId: list_users
summary: List all users
responses:
'200':
description: Success
"""
spec = parser.parse_spec(spec_yaml, format="yaml")
tools = parser.extract_tools(spec, tool_prefix="myapi_")
# Register in registry
count = parser.register_tools_from_spec(
spec_yaml,
registry,
format="yaml",
tool_prefix="myapi_"
)
print(f"Registered {count} tools from OpenAPI spec")Extend the kernel with community plugins:
from src.interfaces.plugin_system import PluginManager
from pathlib import Path
manager = PluginManager(
plugin_dirs=[Path("./plugins")],
registry=registry,
auto_activate=True
)
# Discover plugins
discovered = manager.discover_plugins()
# Install plugin
success = manager.install_plugin("weather_tools")
# List all plugins
plugins = manager.list_plugins()
for plugin in plugins:
print(f"{plugin['name']}: {plugin['status']}")Plugin Structure:
plugins/
βββ weather/
βββ plugin.json # Metadata
βββ __init__.py # Tool definitions and executors
Scale horizontally across multiple machines:
from src.kernel.distributed import DistributedEngine, ExecutionMode
# Initialize engine
engine = DistributedEngine(mode=ExecutionMode.LOCAL)
engine.initialize()
# Submit tasks
task_id = await engine.submit_task(
"audit_task",
{"agent_id": "agent-001", "prompt": "Analyze this"},
max_attempts=3
)
# Wait for result
result = await engine.wait_for_task(task_id, timeout_seconds=30)
# Parallel map
numbers = list(range(100))
results = await engine.parallel_map(lambda x: x * x, numbers)
# Get cluster stats
stats = engine.get_cluster_stats()
print(f"Resources: {stats['resources']}")Features:
- Task parallelism for independent operations
- Actor model for stateful workers
- Automatic retry and failover
- Worker pools with auto-scaling
Ensure high availability:
from src.kernel.failover import HealthMonitor, FailoverManager
# Create health monitor
monitor = HealthMonitor(
check_interval_seconds=30,
unhealthy_threshold=3
)
# Register components
async def primary_check():
return call_primary_agent()
monitor.register_component("primary", "agent", primary_check)
monitor.register_component("backup", "agent", backup_check)
# Start monitoring
await monitor.start_monitoring()
# Configure failover
failover = FailoverManager(monitor)
failover.register_backup("primary", "backup")
# Get active component (automatic failover)
active = await failover.get_active_component("primary")Prevent cascading failures:
from src.kernel.failover import CircuitBreaker
breaker = CircuitBreaker(
"external-api",
failure_threshold=5,
timeout_seconds=60
)
async def api_call():
return await external_service()
async def fallback():
return use_cached_data()
# Execute with circuit breaker
result = await breaker.call(api_call, fallback=fallback)Test system performance:
from src.kernel.load_testing import LoadTester, LoadProfile
tester = LoadTester()
async def target_function():
await process_request()
# Run load test
result = await tester.run_load_test(
target_function,
profile=LoadProfile.RAMP_UP,
total_requests=1000,
concurrent_requests=50,
ramp_up_seconds=30
)
print(f"Throughput: {result.requests_per_second:.1f} req/s")
print(f"Latency p95: {result.latency_p95:.1f}ms")
print(f"Error rate: {result.error_rate:.1%}")Load Profiles:
- Ramp-Up: Gradually increase load
- Spike: Sudden load increase
- Endurance: Sustained load over time
- Stress: Push beyond capacity
See examples/enterprise_integration_demo.py for a complete example that demonstrates all features working together:
python3 examples/enterprise_integration_demo.pyThe demo showcases:
- Multi-agent fraud detection with weighted voting
- Tool ecosystem with 60+ built-in tools
- Automatic failover when components fail
- Circuit breaker pattern with fallback
- Load testing achieving 68+ req/s
Based on load testing results:
| Metric | Value |
|---|---|
| Throughput | 68.1 req/s |
| Latency (p50) | 20.3ms |
| Latency (p95) | 20.3ms |
| Latency (p99) | 20.4ms |
| Error Rate | 0.0% |
| Consensus Resolution | <100ms |
| Tool Execution | <1ms |
| Failover Time | <500ms |
- Use weighted voting when agents have different expertise levels
- Set supervisor for escalation on deadlocks
- Monitor consensus scores to detect contentious decisions
- Use OpenAPI parser for existing APIs
- Create plugins for domain-specific tools
- Register tools lazily to reduce startup time
- Set health check thresholds based on SLOs
- Use circuit breakers for external dependencies
- Test failover scenarios with chaos engineering
- Use Ray for CPU-intensive tasks
- Monitor cluster resources with
get_cluster_stats() - Configure worker pools based on load patterns
Before:
orchestrator = Orchestrator(agents)After:
orchestrator = Orchestrator(
agents,
message_broker=pubsub,
enable_pubsub=True,
enable_conflict_resolution=True
)Before: Manual tool definition
After: OpenAPI auto-discovery
parser = OpenAPIParser()
parser.register_tools_from_spec(openapi_yaml, registry)Before: No failover
After:
monitor = HealthMonitor()
monitor.register_component("primary", "agent", health_check)
failover = FailoverManager(monitor)
failover.register_backup("primary", "backup")If you get ModuleNotFoundError: No module named 'src':
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))If Ray is not installed:
pip install 'ray[default]>=2.8.0'Or check availability:
from src.kernel.distributed import RAY_AVAILABLE
print(f"Ray available: {RAY_AVAILABLE}")Ensure health checks fail multiple times:
# Default unhealthy_threshold is 3
monitor = HealthMonitor(unhealthy_threshold=2)For detailed API documentation, see:
- Multi-Agent:
src/agents/pubsub.py,src/agents/conflict_resolution.py - Tools:
src/interfaces/openapi_tools.py,src/interfaces/plugin_system.py - Distributed:
src/kernel/distributed.py,src/kernel/failover.py,src/kernel/load_testing.py
All classes include comprehensive docstrings with examples.