Skip to content

Commit bb3fecf

Browse files
committed
feat: added v2 of a2a protocol implementation-
1 parent ff513ad commit bb3fecf

File tree

10 files changed

+1709
-3
lines changed

10 files changed

+1709
-3
lines changed

agentic_rag/README.md

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,217 @@ Query a specific collection:
454454
python local_rag_agent.py --query "How to implement a queue?" --collection "Repository Collection"
455455
```
456456

457+
## 3. Agent2Agent (A2A) Protocol Integration
458+
459+
The agentic_rag system now includes full support for the Agent2Agent (A2A) protocol, enabling seamless communication and collaboration with other AI agents. This integration transforms the system into an interoperable agent that can participate in multi-agent workflows and ecosystems.
460+
461+
### 3.1 Why A2A Protocol Implementation?
462+
463+
**Enhanced Interoperability**: The A2A protocol enables the agentic_rag system to communicate with other AI agents using a standardized protocol, breaking down silos between different AI systems and frameworks.
464+
465+
**Scalable Multi-Agent Workflows**: By implementing A2A, the system can participate in complex multi-agent workflows where different agents handle specialized tasks (document processing, analysis, synthesis) and collaborate to solve complex problems.
466+
467+
**Industry Standard Compliance**: A2A is an open standard developed by Google, ensuring compatibility with other A2A-compliant agents and future-proofing the system.
468+
469+
**Enterprise-Grade Security**: A2A includes built-in security mechanisms including authentication, authorization, and secure communication protocols.
470+
471+
**Agent Discovery**: The protocol enables automatic discovery of other agents and their capabilities, allowing for dynamic agent composition and task delegation.
472+
473+
### 3.2 A2A Implementation Architecture
474+
475+
The A2A implementation consists of several key components:
476+
477+
#### 3.2.1 Core A2A Infrastructure
478+
479+
- **A2A Models** (`a2a_models.py`): Pydantic models for JSON-RPC 2.0 communication
480+
- **A2A Handler** (`a2a_handler.py`): Main request handler and method router
481+
- **Task Manager** (`task_manager.py`): Long-running task execution and status tracking
482+
- **Agent Registry** (`agent_registry.py`): Agent discovery and capability management
483+
- **Agent Card** (`agent_card.py`): Capability advertisement and metadata
484+
485+
#### 3.2.2 Supported A2A Methods
486+
487+
The system supports the following A2A protocol methods:
488+
489+
- `document.query`: Query documents using RAG with intelligent routing
490+
- `document.upload`: Process and store documents in vector database
491+
- `task.create`: Create long-running tasks for complex operations
492+
- `task.status`: Check status of running tasks
493+
- `task.cancel`: Cancel running tasks
494+
- `agent.discover`: Discover other agents and their capabilities
495+
- `agent.card`: Get agent capability information
496+
- `health.check`: System health and status check
497+
498+
### 3.3 A2A Endpoints
499+
500+
The system exposes the following A2A endpoints:
501+
502+
- `POST /a2a`: Main A2A protocol endpoint for agent communication
503+
- `GET /agent_card`: Get the agent's capability card
504+
- `GET /a2a/health`: A2A health check endpoint
505+
506+
### 3.4 Usage Examples
507+
508+
#### 3.4.1 Basic A2A Communication
509+
510+
```bash
511+
# Query documents via A2A protocol
512+
curl -X POST http://localhost:8000/a2a \
513+
-H "Content-Type: application/json" \
514+
-d '{
515+
"jsonrpc": "2.0",
516+
"method": "document.query",
517+
"params": {
518+
"query": "What is machine learning?",
519+
"collection": "PDF",
520+
"use_cot": true
521+
},
522+
"id": "1"
523+
}'
524+
```
525+
526+
#### 3.4.2 Task Management
527+
528+
```bash
529+
# Create a long-running task
530+
curl -X POST http://localhost:8000/a2a \
531+
-H "Content-Type: application/json" \
532+
-d '{
533+
"jsonrpc": "2.0",
534+
"method": "task.create",
535+
"params": {
536+
"task_type": "document_processing",
537+
"params": {
538+
"document": "large_document.pdf",
539+
"chunk_count": 100
540+
}
541+
},
542+
"id": "2"
543+
}'
544+
545+
# Check task status
546+
curl -X POST http://localhost:8000/a2a \
547+
-H "Content-Type: application/json" \
548+
-d '{
549+
"jsonrpc": "2.0",
550+
"method": "task.status",
551+
"params": {
552+
"task_id": "task-id-from-previous-response"
553+
},
554+
"id": "3"
555+
}'
556+
```
557+
558+
#### 3.4.3 Agent Discovery
559+
560+
```bash
561+
# Discover agents with specific capabilities
562+
curl -X POST http://localhost:8000/a2a \
563+
-H "Content-Type: application/json" \
564+
-d '{
565+
"jsonrpc": "2.0",
566+
"method": "agent.discover",
567+
"params": {
568+
"capability": "document.query"
569+
},
570+
"id": "4"
571+
}'
572+
573+
# Get agent card
574+
curl -X GET http://localhost:8000/agent_card
575+
```
576+
577+
### 3.5 Testing A2A Implementation
578+
579+
The A2A implementation includes comprehensive tests covering all functionality:
580+
581+
#### 3.5.1 Running Tests
582+
583+
```bash
584+
# Run all A2A tests
585+
python run_a2a_tests.py
586+
587+
# Run specific test categories
588+
python -m pytest test_a2a.py::TestA2AModels -v
589+
python -m pytest test_a2a.py::TestA2AHandler -v
590+
python -m pytest test_a2a.py::TestTaskManager -v
591+
python -m pytest test_a2a.py::TestAgentRegistry -v
592+
```
593+
594+
#### 3.5.2 Test Coverage
595+
596+
The test suite includes:
597+
598+
- **Unit Tests**: Individual component testing
599+
- **Integration Tests**: End-to-end workflow testing
600+
- **Async Tests**: Asynchronous operation testing
601+
- **Error Handling**: Error condition testing
602+
- **Model Validation**: Data model testing
603+
604+
### 3.6 A2A Agent Card
605+
606+
The system publishes its capabilities through an agent card:
607+
608+
```json
609+
{
610+
"agent_id": "agentic_rag_v1",
611+
"name": "Agentic RAG System",
612+
"version": "1.0.0",
613+
"description": "Intelligent RAG system with multi-agent reasoning",
614+
"capabilities": [
615+
{
616+
"name": "document.query",
617+
"description": "Query documents using RAG with context retrieval",
618+
"input_schema": { ... },
619+
"output_schema": { ... }
620+
}
621+
],
622+
"endpoints": {
623+
"base_url": "http://localhost:8000",
624+
"authentication": { ... }
625+
}
626+
}
627+
```
628+
629+
### 3.7 Benefits of A2A Integration
630+
631+
1. **Interoperability**: Seamless communication with other A2A-compliant agents
632+
2. **Scalability**: Support for complex multi-agent workflows
633+
3. **Standardization**: Industry-standard communication protocol
634+
4. **Discovery**: Automatic agent and capability discovery
635+
5. **Security**: Built-in authentication and authorization
636+
6. **Future-Proofing**: Compatibility with evolving agent ecosystems
637+
638+
### 3.8 Configuration
639+
640+
No additional configuration is required for A2A functionality. The system automatically:
641+
642+
- Initializes A2A handlers on startup
643+
- Registers available capabilities
644+
- Starts task management services
645+
- Enables agent discovery
646+
647+
### 3.9 Troubleshooting
648+
649+
#### Common Issues
650+
651+
1. **Async Test Failures**: Ensure `pytest-asyncio` is installed
652+
2. **Import Errors**: Verify all A2A modules are in the Python path
653+
3. **Task Timeouts**: Check task manager configuration for long-running tasks
654+
655+
#### Debug Commands
656+
657+
```bash
658+
# Check A2A health
659+
curl -X GET http://localhost:8000/a2a/health
660+
661+
# View agent capabilities
662+
curl -X GET http://localhost:8000/agent_card
663+
664+
# Test basic functionality
665+
python -c "from a2a_models import A2ARequest; print('A2A models working')"
666+
```
667+
457668
## Contributing
458669
459670
This project is open source. Please submit your contributions by forking this repository and submitting a pull request! Oracle appreciates any contributions that are made by the open source community.

agentic_rag/a2a_demo.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#!/usr/bin/env python3
2+
"""
3+
A2A Protocol Demo Script
4+
5+
This script demonstrates the A2A protocol functionality by making
6+
sample requests to the agentic_rag system.
7+
"""
8+
9+
import asyncio
10+
import json
11+
import requests
12+
from typing import Dict, Any
13+
14+
15+
class A2AClient:
16+
"""Simple A2A client for testing"""
17+
18+
def __init__(self, base_url: str = "http://localhost:8000"):
19+
self.base_url = base_url
20+
self.session = requests.Session()
21+
22+
def make_request(self, method: str, params: Dict[str, Any], request_id: str = "1") -> Dict[str, Any]:
23+
"""Make an A2A request"""
24+
payload = {
25+
"jsonrpc": "2.0",
26+
"method": method,
27+
"params": params,
28+
"id": request_id
29+
}
30+
31+
try:
32+
response = self.session.post(
33+
f"{self.base_url}/a2a",
34+
json=payload,
35+
headers={"Content-Type": "application/json"}
36+
)
37+
response.raise_for_status()
38+
return response.json()
39+
except requests.exceptions.RequestException as e:
40+
return {
41+
"jsonrpc": "2.0",
42+
"error": {
43+
"code": -32603,
44+
"message": f"Request failed: {str(e)}"
45+
},
46+
"id": request_id
47+
}
48+
49+
def get_agent_card(self) -> Dict[str, Any]:
50+
"""Get the agent card"""
51+
try:
52+
response = self.session.get(f"{self.base_url}/agent_card")
53+
response.raise_for_status()
54+
return response.json()
55+
except requests.exceptions.RequestException as e:
56+
return {"error": f"Failed to get agent card: {str(e)}"}
57+
58+
def health_check(self) -> Dict[str, Any]:
59+
"""Check system health"""
60+
try:
61+
response = self.session.get(f"{self.base_url}/a2a/health")
62+
response.raise_for_status()
63+
return response.json()
64+
except requests.exceptions.RequestException as e:
65+
return {"error": f"Health check failed: {str(e)}"}
66+
67+
68+
def print_response(title: str, response: Dict[str, Any]):
69+
"""Print formatted response"""
70+
print(f"\n{'='*60}")
71+
print(f" {title}")
72+
print(f"{'='*60}")
73+
print(json.dumps(response, indent=2))
74+
75+
76+
def main():
77+
"""Run A2A demo"""
78+
print("🤖 A2A Protocol Demo")
79+
print("=" * 60)
80+
print("This demo shows the A2A protocol functionality")
81+
print("Make sure the agentic_rag server is running on localhost:8000")
82+
print("=" * 60)
83+
84+
client = A2AClient()
85+
86+
# Test 1: Health Check
87+
print("\n1. Testing Health Check...")
88+
health_response = client.health_check()
89+
print_response("Health Check Response", health_response)
90+
91+
# Test 2: Get Agent Card
92+
print("\n2. Getting Agent Card...")
93+
card_response = client.get_agent_card()
94+
print_response("Agent Card", card_response)
95+
96+
# Test 3: Document Query
97+
print("\n3. Testing Document Query...")
98+
query_response = client.make_request(
99+
"document.query",
100+
{
101+
"query": "What is artificial intelligence?",
102+
"collection": "General",
103+
"use_cot": False,
104+
"max_results": 3
105+
},
106+
"query-1"
107+
)
108+
print_response("Document Query Response", query_response)
109+
110+
# Test 4: Task Creation
111+
print("\n4. Testing Task Creation...")
112+
task_response = client.make_request(
113+
"task.create",
114+
{
115+
"task_type": "document_processing",
116+
"params": {
117+
"document": "demo_document.pdf",
118+
"chunk_count": 10
119+
}
120+
},
121+
"task-1"
122+
)
123+
print_response("Task Creation Response", task_response)
124+
125+
# Test 5: Task Status (if task was created)
126+
if "result" in task_response and "task_id" in task_response["result"]:
127+
task_id = task_response["result"]["task_id"]
128+
print(f"\n5. Checking Task Status for {task_id}...")
129+
130+
# Wait a moment for task to start
131+
import time
132+
time.sleep(2)
133+
134+
status_response = client.make_request(
135+
"task.status",
136+
{"task_id": task_id},
137+
"status-1"
138+
)
139+
print_response("Task Status Response", status_response)
140+
141+
# Test 6: Agent Discovery
142+
print("\n6. Testing Agent Discovery...")
143+
discover_response = client.make_request(
144+
"agent.discover",
145+
{"capability": "document.query"},
146+
"discover-1"
147+
)
148+
print_response("Agent Discovery Response", discover_response)
149+
150+
# Test 7: Error Handling
151+
print("\n7. Testing Error Handling...")
152+
error_response = client.make_request(
153+
"unknown.method",
154+
{"param": "value"},
155+
"error-1"
156+
)
157+
print_response("Error Response", error_response)
158+
159+
print("\n" + "="*60)
160+
print("🎉 A2A Demo completed!")
161+
print("="*60)
162+
163+
164+
if __name__ == "__main__":
165+
main()

0 commit comments

Comments
 (0)