Skip to content

Agent Networks

Manoj Desai edited this page May 3, 2025 · 1 revision

Agent Networks

Agent networks enable you to connect multiple specialized agents into a cohesive ecosystem. This approach allows each agent to focus on its strengths while collaborating to solve complex problems.

Creating Agent Networks

The basic structure of an agent network:

from python_a2a import AgentNetwork, HTTPClient

# Create a network of specialized agents
network = AgentNetwork(name="Travel Assistant Network")

# Add agents to the network with role names
network.add("weather", "http://localhost:5001")
network.add("hotels", "http://localhost:5002")
network.add("flights", "http://localhost:5003")
network.add("activities", "http://localhost:5004")
network.add("recommendations", "http://localhost:5005")

# Create a client that can access the network
client = HTTPClient(network)

# The client can now interact with any agent in the network
weather_response = client.send_message_to("weather", "What's the weather in Paris?")
print(f"Weather: {weather_response.content}")

hotels_response = client.send_message_to("hotels", "Find hotels in Paris")
print(f"Hotels: {hotels_response.content}")

Automatic Message Routing

Implement intelligent routing to direct messages to the most appropriate agent:

from python_a2a import AgentNetwork, HTTPClient
from python_a2a.client.router import KeywordRouter

# Create an agent network
network = AgentNetwork(name="Knowledge Network")

# Add specialized agents
network.add("weather", "http://localhost:5001")
network.add("sports", "http://localhost:5002")
network.add("history", "http://localhost:5003")
network.add("science", "http://localhost:5004")
network.add("general", "http://localhost:5005")  # Fallback agent

# Create a keyword router
router = KeywordRouter(network=network, default_agent="general")

# Define routing rules based on keywords
router.add_rule(agent="weather", keywords=["weather", "temperature", "forecast", "rain", "snow", "climate"])
router.add_rule(agent="sports", keywords=["sports", "game", "team", "player", "score", "tournament"])
router.add_rule(agent="history", keywords=["history", "historical", "ancient", "century", "king", "queen", "war"])
router.add_rule(agent="science", keywords=["science", "scientific", "physics", "chemistry", "biology", "experiment"])

# Create a client with the router
client = HTTPClient(router)

# Messages will be automatically routed to the appropriate agent
weather_query = "What's the weather forecast for Tokyo tomorrow?"
sports_query = "Who won the World Cup in 2018?"
history_query = "Tell me about ancient Egyptian pyramids."
science_query = "Explain quantum physics in simple terms."
general_query = "What's the meaning of life?"

print(f"Weather query: {client.send_message(weather_query).content}")
print(f"Sports query: {client.send_message(sports_query).content}")
print(f"History query: {client.send_message(history_query).content}")
print(f"Science query: {client.send_message(science_query).content}")
print(f"General query: {client.send_message(general_query).content}")

LLM-Powered Routing

Use an LLM to intelligently route messages:

from python_a2a import AgentNetwork, HTTPClient
from python_a2a.client.router import SmartRouter
from python_a2a.client.llm.openai import OpenAILLMClient
import os

# Create an agent network
network = AgentNetwork(name="Specialized Network")

# Add agents with detailed descriptions
network.add(
    "weather",
    "http://localhost:5001",
    description="Provides weather forecasts and climate information for any location"
)
network.add(
    "travel",
    "http://localhost:5002",
    description="Helps with travel planning, hotel bookings, and finding attractions"
)
network.add(
    "finance",
    "http://localhost:5003",
    description="Provides financial advice, stock information, and budget planning"
)
network.add(
    "health",
    "http://localhost:5004",
    description="Offers health tips, medical information, and fitness guidance"
)

# Create an LLM-powered router
router = SmartRouter(
    network=network,
    llm_client=OpenAILLMClient(
        api_key=os.environ.get("OPENAI_API_KEY"),
        model="gpt-3.5-turbo"
    )
)

# Create a client with the smart router
client = HTTPClient(router)

# The LLM will analyze each message and route to the most appropriate agent
queries = [
    "What's the weather like in New York today?",
    "I'm planning a trip to Paris next month",
    "How should I diversify my investment portfolio?",
    "What exercises are good for lower back pain?"
]

for query in queries:
    print(f"\nQuery: {query}")
    print(f"Response: {client.send_message(query).content}")

Agent Discovery and Registration

Enable dynamic discovery of agents:

from python_a2a.discovery import AgentRegistry, enable_discovery, DiscoveryClient
from python_a2a import A2AServer, agent, run_server, AgentNetwork, HTTPClient
import threading
import time

# Create a registry server
registry = AgentRegistry(name="Central Registry")

# Run the registry in a background thread
def run_registry():
    run_server(registry, port=7000)

registry_thread = threading.Thread(target=run_registry, daemon=True)
registry_thread.start()

# Create specialized agents
@agent(
    name="Weather Agent",
    description="Provides weather information"
)
class WeatherAgent(A2AServer):
    def handle_task(self, task):
        return {"output": f"Weather forecast for {task.input}: Sunny, 75°F"}

@agent(
    name="Travel Agent",
    description="Helps with travel planning"
)
class TravelAgent(A2AServer):
    def handle_task(self, task):
        return {"output": f"Travel recommendations for {task.input}: Visit the museum and local restaurants"}

# Create and run the agents with discovery enabled
registry_url = "http://localhost:7000"

weather_agent = WeatherAgent()
weather_discovery = enable_discovery(weather_agent, registry_url=registry_url)

travel_agent = TravelAgent()
travel_discovery = enable_discovery(travel_agent, registry_url=registry_url)

# Run agents in background threads
def run_agent(agent, port):
    run_server(agent, port=port)

threading.Thread(target=run_agent, args=(weather_agent, 5001), daemon=True).start()
threading.Thread(target=run_agent, args=(travel_agent, 5002), daemon=True).start()

# Wait for agents to register
time.sleep(2)

# Create a discovery client
discovery = DiscoveryClient(registry_url)

# Find all registered agents
all_agents = discovery.list_agents()
print(f"Found {len(all_agents)} registered agents:")
for agent_info in all_agents:
    print(f"- {agent_info.name}: {agent_info.description} at {agent_info.endpoint}")

# Create a dynamic network from discovered agents
network = AgentNetwork(name="Discovered Network")

for agent_info in all_agents:
    agent_name = agent_info.name.lower().replace(" agent", "")
    network.add(agent_name, agent_info.endpoint)

# Create a client for the dynamic network
client = HTTPClient(network)

# Send messages to the discovered agents
weather_response = client.send_message_to("weather", "New York")
print(f"Weather response: {weather_response.content}")

travel_response = client.send_message_to("travel", "Paris")
print(f"Travel response: {travel_response.content}")

Hierarchical Agent Networks

Create networks with primary and secondary agents:

from python_a2a import AgentNetwork, HTTPClient, A2AServer, agent, run_server
import threading

# Create specialized agents
@agent(
    name="Primary Assistant",
    description="Main assistant that coordinates specialized agents"
)
class PrimaryAgent(A2AServer):
    def __init__(self):
        super().__init__()
        # Create a network of secondary agents
        self.network = AgentNetwork(name="Secondary Network")
        self.network.add("weather", "http://localhost:5011")
        self.network.add("travel", "http://localhost:5012")
        self.network.add("finance", "http://localhost:5013")
        
        # Create a client to access the network
        self.client = HTTPClient(self.network)
    
    def handle_task(self, task):
        """Coordinate with secondary agents to process a task."""
        input_text = task.input.lower()
        
        # Route to appropriate secondary agent based on content
        if any(word in input_text for word in ["weather", "temperature", "forecast"]):
            # Get information from weather agent
            response = self.client.send_message_to("weather", task.input)
            return {"output": f"I consulted our weather specialist: {response.content}"}
        
        elif any(word in input_text for word in ["travel", "trip", "vacation", "hotel"]):
            # Get information from travel agent
            response = self.client.send_message_to("travel", task.input)
            return {"output": f"I consulted our travel specialist: {response.content}"}
        
        elif any(word in input_text for word in ["money", "finance", "invest", "budget"]):
            # Get information from finance agent
            response = self.client.send_message_to("finance", task.input)
            return {"output": f"I consulted our finance specialist: {response.content}"}
        
        # Handle general queries directly
        return {"output": f"I'm handling this directly: {task.input}"}

# Create secondary agents
@agent(
    name="Weather Specialist",
    description="Provides detailed weather information"
)
class WeatherAgent(A2AServer):
    def handle_task(self, task):
        return {"output": f"Weather forecast for {task.input}: Sunny, 75°F"}

@agent(
    name="Travel Specialist",
    description="Expert in travel planning"
)
class TravelAgent(A2AServer):
    def handle_task(self, task):
        return {"output": f"Travel recommendations for {task.input}: Visit popular attractions and try local cuisine"}

@agent(
    name="Finance Specialist",
    description="Provides financial advice"
)
class FinanceAgent(A2AServer):
    def handle_task(self, task):
        return {"output": f"Financial advice for {task.input}: Diversify your investments and maintain an emergency fund"}

# Run all agents in background threads
def run_agent(agent, port):
    run_server(agent, port=port)

# Start secondary agents
weather_agent = WeatherAgent()
threading.Thread(target=run_agent, args=(weather_agent, 5011), daemon=True).start()

travel_agent = TravelAgent()
threading.Thread(target=run_agent, args=(travel_agent, 5012), daemon=True).start()

finance_agent = FinanceAgent()
threading.Thread(target=run_agent, args=(finance_agent, 5013), daemon=True).start()

# Start primary agent
primary_agent = PrimaryAgent()
threading.Thread(target=run_agent, args=(primary_agent, 5000), daemon=True).start()

# Wait for all agents to start
import time
time.sleep(2)

# Create a client to interact with the primary agent
client = HTTPClient("http://localhost:5000")

# Send different types of queries
queries = [
    "What's the weather like in Tokyo?",
    "I'm planning a trip to Paris",
    "How should I invest my savings?",
    "Tell me a joke"  # General query handled by primary agent
]

for query in queries:
    print(f"\nQuery: {query}")
    response = client.send_message(query)
    print(f"Response: {response.content}")

Agent Network with Load Balancing

Implement load balancing for high-availability networks:

from python_a2a import AgentNetwork, HTTPClient
from python_a2a.client.router import LoadBalancingRouter
import random
import time

# Create an agent network with redundant agents
network = AgentNetwork(name="Scalable Network")

# Add multiple instances of the same agent type
network.add("weather1", "http://localhost:5001", agent_type="weather")
network.add("weather2", "http://localhost:5002", agent_type="weather")
network.add("weather3", "http://localhost:5003", agent_type="weather")

network.add("travel1", "http://localhost:5011", agent_type="travel")
network.add("travel2", "http://localhost:5012", agent_type="travel")

# Create a load balancing router
router = LoadBalancingRouter(network=network)

# Create a client with the load balancing router
client = HTTPClient(router)

# Send messages and let the router distribute the load
for i in range(10):
    # Alternate between agent types
    if i % 2 == 0:
        query = f"Weather query {i}: What's the forecast?"
        agent_type = "weather"
    else:
        query = f"Travel query {i}: Recommend destinations"
        agent_type = "travel"
    
    print(f"\nSending to {agent_type}: {query}")
    response = client.send_message_to(agent_type, query)
    print(f"Response: {response.content}")
    
    # Simulate some delay between requests
    time.sleep(0.5)

Fallback and Retry Mechanism

Implement fallback logic for resilient networks:

from python_a2a import AgentNetwork, HTTPClient
from python_a2a.client.router import ResilientRouter
from python_a2a.exceptions import A2AConnectionError

# Create an agent network with primary and fallback agents
network = AgentNetwork(name="Resilient Network")

# Primary agents
network.add("primary_weather", "http://localhost:5001", agent_type="weather")
network.add("primary_travel", "http://localhost:5011", agent_type="travel")

# Fallback agents
network.add("fallback_weather", "http://localhost:5002", agent_type="weather", is_fallback=True)
network.add("fallback_travel", "http://localhost:5012", agent_type="travel", is_fallback=True)

# Create a resilient router
router = ResilientRouter(
    network=network,
    max_retries=3,
    retry_delay=0.5
)

# Create a client with the resilient router
client = HTTPClient(router)

# Simulate a scenario where some endpoints fail
def mock_send_message(endpoint, message):
    # Simulate connection errors for primary endpoints
    if "primary" in endpoint and random.random() < 0.7:
        raise A2AConnectionError(f"Connection failed to {endpoint}")
    
    # Simulate successful response from available endpoints
    if "weather" in endpoint:
        return {"content": f"Weather forecast from {endpoint}: Sunny, 75°F"}
    else:
        return {"content": f"Travel recommendations from {endpoint}: Visit popular attractions"}

# Override the client's send_message method for this simulation
client._send_message = mock_send_message

# Send messages with automatic fallback
for i in range(5):
    agent_type = "weather" if i % 2 == 0 else "travel"
    query = f"Query {i} for {agent_type}"
    
    print(f"\nSending to {agent_type}: {query}")
    try:
        response = client.send_message_to(agent_type, query)
        print(f"Success: {response.content}")
    except Exception as e:
        print(f"All {agent_type} agents failed: {e}")

Cross-Network Communication

Enable communication between different agent networks:

from python_a2a import AgentNetwork, HTTPClient, A2AServer, agent, run_server
import threading
import time

# Create multiple specialized networks
network_a = AgentNetwork(name="Information Network")
network_a.add("weather", "http://localhost:5001")
network_a.add("news", "http://localhost:5002")

network_b = AgentNetwork(name="Planning Network")
network_b.add("travel", "http://localhost:5011")
network_b.add("events", "http://localhost:5012")

# Create a network gateway agent
@agent(
    name="Network Gateway",
    description="Connects multiple agent networks"
)
class NetworkGateway(A2AServer):
    def __init__(self):
        super().__init__()
        # Connect to multiple networks
        self.networks = {
            "info": HTTPClient(network_a),
            "planning": HTTPClient(network_b)
        }
    
    def handle_task(self, task):
        """Route requests across networks."""
        if isinstance(task.input, dict) and "network" in task.input and "query" in task.input:
            # Explicit routing to a specific network
            network = task.input["network"]
            query = task.input["query"]
            
            if network in self.networks:
                response = self.networks[network].send_message(query)
                return {"output": response.content}
            else:
                return {"output": f"Network '{network}' not found. Available networks: {list(self.networks.keys())}"}
        
        # Implicit routing based on content
        input_text = str(task.input).lower()
        
        if any(word in input_text for word in ["weather", "news", "information"]):
            # Route to information network
            response = self.networks["info"].send_message(task.input)
            return {"output": f"Information Network: {response.content}"}
        
        elif any(word in input_text for word in ["travel", "event", "planning"]):
            # Route to planning network
            response = self.networks["planning"].send_message(task.input)
            return {"output": f"Planning Network: {response.content}"}
        
        # Use both networks for comprehensive answers
        info_response = self.networks["info"].send_message(task.input)
        planning_response = self.networks["planning"].send_message(task.input)
        
        return {
            "output": f"Combined response:\n\nInformation: {info_response.content}\n\nPlanning: {planning_response.content}"
        }

# Run the gateway agent
gateway_agent = NetworkGateway()
threading.Thread(target=run_server, args=(gateway_agent, 5000), daemon=True).start()

# Wait for the gateway to start
time.sleep(1)

# Create a client to the gateway
client = HTTPClient("http://localhost:5000")

# Send queries that require cross-network collaboration
queries = [
    "What's the weather in Paris?",  # Information network
    "Plan a trip to Tokyo",  # Planning network
    {"network": "info", "query": "Get the latest news"},  # Explicit routing
    {"network": "planning", "query": "Find events this weekend"},  # Explicit routing
    "What's happening in New York this week?"  # Combined response
]

for query in queries:
    print(f"\nQuery: {query}")
    response = client.send_message(query)
    print(f"Response: {response.content}")

Best Practices for Agent Networks

  1. Specialized Agents

    • Design agents with focused capabilities
    • Clear boundaries between agent responsibilities
    • Well-defined interfaces and response formats
  2. Intelligent Routing

    • Implement content-based routing
    • Use LLMs for complex routing decisions
    • Add fallback mechanisms for reliability
  3. Network Management

    • Monitor agent health and availability
    • Implement load balancing for high-traffic networks
    • Create redundancy for critical agents
  4. Security

    • Implement proper authentication between agents
    • Control access to sensitive agents
    • Validate messages between trusted agents
  5. Discoverability

    • Use agent registries for dynamic discovery
    • Maintain up-to-date agent metadata
    • Enable self-organization of networks

By implementing these agent network patterns, you can create scalable, resilient, and intelligent agent ecosystems that effectively collaborate to solve complex problems.

Clone this wiki locally