Connect agents with Redis, Kafka, NATS, and cloud message services.
Agent OS provides the Agent Message Bus (AMB) for decoupled agent communication. This guide covers using the built-in adapters for different message brokers.
| Adapter | Use Case | Install |
|---|---|---|
| Memory | Testing, development | Included |
| Redis | Low-latency, pub/sub | pip install amb-core[redis] |
| Kafka | High-throughput, durability | pip install amb-core[kafka] |
| RabbitMQ | Complex routing, enterprise | pip install amb-core[rabbitmq] |
| NATS | Cloud-native, lightweight | pip install amb-core[nats] |
| Azure Service Bus | Azure ecosystem | pip install amb-core[azure] |
| AWS SQS | AWS ecosystem | pip install amb-core[aws] |
pip install amb-core[redis]docker run -d -p 6379:6379 redis:latestfrom amb_core.adapters import RedisBroker
from amb_core import AgentMessageBus, Message
# Create broker
broker = RedisBroker(url="redis://localhost:6379/0")
# Create message bus
bus = AgentMessageBus(broker=broker)
# Connect
await bus.connect()
# Subscribe to messages
async def handle_task(msg: Message):
print(f"Received task: {msg.payload}")
# Process and respond
result = await process_task(msg.payload)
# Send response
await bus.publish(Message(
topic="results",
payload=result,
correlation_id=msg.correlation_id
))
await bus.subscribe("tasks", handle_task)
# Publish a message
await bus.publish(Message(
topic="tasks",
payload={"action": "analyze", "file": "data.txt"}
))Best for: Low-latency messaging, pub/sub patterns, real-time updates
from amb_core.adapters import RedisBroker
broker = RedisBroker(
url="redis://localhost:6379/0"
)
# Features:
# - Pub/sub for real-time messaging
# - Streams for message persistence
# - Native request-response supportPros:
- Very low latency (<1ms)
- Simple setup
- Built-in persistence with Redis Streams
Cons:
- Limited durability compared to Kafka
- Single-node by default
Best for: High-throughput, event sourcing, audit logs
from amb_core.adapters import KafkaBroker
broker = KafkaBroker(
bootstrap_servers="localhost:9092"
)
# Features:
# - High throughput (millions/sec)
# - Durable message storage
# - Consumer groups for load balancingPros:
- Highest throughput
- Strong durability guarantees
- Replay capability
Cons:
- More complex setup
- Higher latency than Redis
Best for: Cloud-native apps, microservices, edge computing
from amb_core.adapters import NATSBroker
broker = NATSBroker(
servers=["nats://localhost:4222"],
use_jetstream=True # Enable persistence
)
# Features:
# - Lightweight (single binary)
# - Native request-reply
# - JetStream for persistencePros:
- Very lightweight
- Easy to deploy
- Built-in request-reply
Cons:
- Smaller ecosystem than Kafka/RabbitMQ
Best for: Azure ecosystem, enterprise messaging
from amb_core.adapters import AzureServiceBusBroker
broker = AzureServiceBusBroker(
connection_string="Endpoint=sb://...",
topic_name="agent-messages"
)
# Features:
# - Dead-letter queues
# - Sessions for ordering
# - Azure AD integrationPros:
- Managed service
- Enterprise features
- Azure integration
Cons:
- Azure lock-in
- Cost at scale
Best for: AWS ecosystem, serverless
from amb_core.adapters import AWSSQSBroker
broker = AWSSQSBroker(
region_name="us-east-1",
queue_name="agent-messages",
use_fifo=True # For ordering
)
# Features:
# - Serverless scaling
# - FIFO queues for ordering
# - Dead-letter queuesPros:
- Serverless (no infrastructure)
- Auto-scaling
- AWS integration
Cons:
- Higher latency
- AWS lock-in
from amb_core import Message
# Agent A sends request
response = await bus.request(
Message(
topic="calculate",
payload={"operation": "sum", "values": [1, 2, 3]}
),
timeout=30.0
)
print(f"Result: {response.payload}") # Result: 6# Agent A subscribes to events
await bus.subscribe("events.user.*", handle_user_event)
# Agent B publishes events
await bus.publish(Message(
topic="events.user.created",
payload={"user_id": "123", "email": "user@example.com"}
))# Multiple workers subscribe to same queue
# Each message delivered to only one worker
async def worker(msg: Message):
result = await process_work(msg.payload)
await bus.publish(Message(
topic="results",
payload=result,
correlation_id=msg.id
))
# Start multiple workers
for i in range(4):
await bus.subscribe("work-queue", worker, consumer_group=f"workers")# Publish all events to Kafka for durability
kafka_broker = KafkaBroker(bootstrap_servers="localhost:9092")
bus = AgentMessageBus(broker=kafka_broker)
# All agent actions become events
await bus.publish(Message(
topic="agent.events",
payload={
"event_type": "document_analyzed",
"agent_id": "analyzer-001",
"document_id": "doc-123",
"result": analysis_result,
"timestamp": datetime.now(timezone.utc).isoformat()
}
))
# Events can be replayed for debugging/auditUse different brokers for different purposes:
from amb_core import AgentMessageBus
from amb_core.adapters import RedisBroker, KafkaBroker
# Fast path for real-time messages
redis_bus = AgentMessageBus(
broker=RedisBroker(url="redis://localhost:6379")
)
# Durable path for events/audit
kafka_bus = AgentMessageBus(
broker=KafkaBroker(bootstrap_servers="localhost:9092")
)
@kernel.register
async def my_agent(task: str):
# Process task
result = await process(task)
# Fast response via Redis
await redis_bus.publish(Message(
topic="responses",
payload=result
))
# Durable event via Kafka
await kafka_bus.publish(Message(
topic="events",
payload={"action": "task_completed", "result": result}
))# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7
ports:
- "6379:6379"
kafka:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
nats:
image: nats:latest
ports:
- "4222:4222"
command: ["--js"] # Enable JetStream
agent:
build: .
depends_on:
- redis
- kafka
- nats
environment:
REDIS_URL: redis://redis:6379
KAFKA_SERVERS: kafka:9092
NATS_URL: nats://nats:4222import os
broker = RedisBroker(
url=os.environ.get("REDIS_URL", "redis://localhost:6379")
)async def with_reconnect(bus: AgentMessageBus):
while True:
try:
await bus.connect()
break
except ConnectionError:
print("Connection failed, retrying in 5s...")
await asyncio.sleep(5)# Configure DLQ for failed messages
broker = RedisBroker(
url="redis://localhost:6379",
dead_letter_queue="dlq:agent-messages"
)from amb_core.observability import metrics
# Track message processing lag
@metrics.track("message_processing")
async def handle_message(msg: Message):
lag = time.time() - msg.timestamp
metrics.gauge("message_lag_seconds", lag)
await process(msg)| Tutorial | Description |
|---|---|
| Creating Custom Tools | Build safe tools for agents |
| Multi-Agent Systems | Coordinate agent teams |
| Observability | Monitor your message bus |
Ready to build custom tools?