diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..0895cdb2 --- /dev/null +++ b/.env.example @@ -0,0 +1 @@ +GEMINI_API_KEY="your_actual_google_gemini_api_key_here" diff --git a/README.md b/README.md index 8b73dd37..71740c6e 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,58 @@ pip install a2a-sdk You can also find more Python samples [here](https://github.com/google-a2a/a2a-samples/tree/main/samples/python) and JavaScript samples [here](https://github.com/google-a2a/a2a-samples/tree/main/samples/js). +### Blog Post Generation Agent Example + +This example demonstrates an agent capable of generating blog posts using the Gemini API. + +#### Prerequisites + +1. **Install Dependencies**: + Ensure you have all necessary dependencies installed. If you've followed the main installation for `a2a-sdk`, you might also need `python-dotenv` and `google-generativeai`: + ```bash + pip install python-dotenv google-generativeai + ``` + (Note: `google-generativeai` should already be installed if previous steps were followed for this agent, but `python-dotenv` is likely new for this example). + +2. **Set up Gemini API Key**: + - Create a `.env` file in the root of this repository by copying the `.env.example` file: + ```bash + cp .env.example .env + ``` + - Edit the `.env` file and replace `"your_actual_google_gemini_api_key_here"` with your actual Gemini API key. + ``` + GEMINI_API_KEY="your_actual_api_key_here" + ``` + +#### Running the Example + +1. Navigate to the `examples` directory (if you are not already there): + ```bash + cd examples + ``` +2. Run the script: + ```bash + python run_blog_generator.py + ``` + The script will: + - Generate a blog topic based on predefined keywords. + - Generate an outline for the topic. + - Write content for each section of the outline. + - Assemble the full blog post. + - Print the final blog post to the console and save it to `generated_blog_post.md` in the `examples` directory (where the script is run). + +#### How it Works + +The `run_blog_generator.py` script uses the `ExampleAgent` located in `src/a2a/example_agent/agent.py`. This agent has been configured with capabilities to: +- `generate_blog_topic`: Creates a topic. +- `generate_blog_outline`: Structures the blog post. +- `write_blog_section`: Writes content for each section using the Gemini API. +- `assemble_blog_post`: Compiles the sections into a final blog post. + +The agent reads the `GEMINI_API_KEY` from the environment variables (loaded from the `.env` file located in the project root or the `examples/` directory). + +*Note: The `ExampleAgent` currently uses placeholder classes for some core A2A SDK components (`Agent`, `AgentCapability`, etc.) as they were not found directly within the SDK during development of this example. These placeholders would ideally be replaced by actual SDK components.* + ## License This project is licensed under the terms of the [Apache 2.0 License](https://raw.githubusercontent.com/google-a2a/a2a-python/refs/heads/main/LICENSE). diff --git a/examples/run_blog_generator.py b/examples/run_blog_generator.py new file mode 100644 index 00000000..d88b1950 --- /dev/null +++ b/examples/run_blog_generator.py @@ -0,0 +1,164 @@ +import asyncio +import os +# It's good practice to handle potential ImportError for dotenv +try: + from dotenv import load_dotenv +except ImportError: + print("python-dotenv library not found. Please install it by running: pip install python-dotenv") + print("This script relies on a .env file to load your GEMINI_API_KEY.") + exit(1) + + +# Assuming ExampleAgent and TaskParameters are accessible via a2a. +# This might require ensuring src is in PYTHONPATH or the package is installed. +# For direct script execution, you might need to adjust sys.path or set PYTHONPATH. +import sys +# Add src to Python path if running script directly from repo root +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src'))) + +from a2a.example_agent.agent import ExampleAgent +from a2a.example_agent.agent import TaskParameters # Using the placeholder from agent.py + +# MockTaskCompleter to capture results for the script +class ScriptTaskCompleter: + def __init__(self): + self.output = None + self.error = None + self.has_failed = False + + def complete_task(self, output: any): + self.output = output + self.error = None + self.has_failed = False + # print(f"Task completed successfully.") # Optional: for verbose logging + + def fail_task(self, error_message: str): + self.output = None + self.error = error_message + self.has_failed = True + # print(f"Task failed: {error_message}") # Optional: for verbose logging + +async def main(): + # Attempt to load .env file from the project root (one level up from examples/) + dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env') + if os.path.exists(dotenv_path): + load_dotenv(dotenv_path) + print(f"Loaded .env from {dotenv_path}") + else: + # Fallback to trying to load .env from the current directory (examples/) + if load_dotenv(): + print(f"Loaded .env from current directory") + else: + print("No .env file found in project root or current directory.") + + + api_key = os.getenv("GEMINI_API_KEY") + if not api_key: + print("Error: GEMINI_API_KEY environment variable not set or .env file not found/loaded.") + print("Please create a .env file in the project root with your API key:") + print("Example .env content: GEMINI_API_KEY='your_actual_api_key_here'") + return + + try: + agent = ExampleAgent() # Initializes with API key from env + except ValueError as e: + print(f"Error initializing agent: {e}") + return + + completer = ScriptTaskCompleter() + + # 1. Generate Blog Topic + print("\n--- 1. Generating Blog Topic ---") + topic_keywords = ["artificial intelligence in healthcare", "future trends", "patient outcomes"] + print(f"Keywords: {', '.join(topic_keywords)}") + topic_params = TaskParameters(parameters={ + "capability_name": "generate_blog_topic", + "keywords": topic_keywords + }) + await agent.execute_task(completer, topic_params) + + if completer.has_failed: + print(f"Could not generate topic: {completer.error}") + return + blog_topic = completer.output + print(f"Generated Topic: {blog_topic}") + + # 2. Generate Blog Outline + print("\n--- 2. Generating Blog Outline ---") + print(f"Using topic: {blog_topic}") + outline_params = TaskParameters(parameters={ + "capability_name": "generate_blog_outline", + "topic": blog_topic + }) + await agent.execute_task(completer, outline_params) + + if completer.has_failed: + print(f"Could not generate outline: {completer.error}") + return + blog_outline = completer.output + if not blog_outline: # Check if outline is empty or None + print(f"Generated outline was empty. Stopping.") + return + + print(f"Generated Outline:") + for i, section_title in enumerate(blog_outline): + print(f" {i+1}. {section_title}") + + # 3. Write Blog Sections + print("\n--- 3. Writing Blog Sections ---") + written_sections = [] + for i, section_title in enumerate(blog_outline): + # Check if section_title is valid + if not section_title or not isinstance(section_title, str) or not section_title.strip(): + print(f" Skipping invalid section title at index {i}: '{section_title}'") + written_sections.append(f"Skipped section due to invalid title: '{section_title}'") + continue + + print(f" Writing section {i+1}: '{section_title}'...") + section_params = TaskParameters(parameters={ + "capability_name": "write_blog_section", + "section_prompt": section_title + # Using default model 'gemini-1.5-flash-latest' + }) + await agent.execute_task(completer, section_params) + + if completer.has_failed: + error_msg = completer.error if completer.error else "Unknown error" + print(f" Could not write section '{section_title}': {error_msg}") + written_sections.append(f"Content for '{section_title}' could not be generated: {error_msg}") + continue # Continue to next section for now + + section_content = completer.output if completer.output else "" + written_sections.append(section_content) + print(f" Section content (first 80 chars): {section_content[:80].replace('\n', ' ')}...") + + + # 4. Assemble Blog Post + print("\n--- 4. Assembling Blog Post ---") + assembly_params = TaskParameters(parameters={ + "capability_name": "assemble_blog_post", + "title": blog_topic, # Using the generated topic as title + "sections": written_sections + }) + await agent.execute_task(completer, assembly_params) + + if completer.has_failed: + print(f"Could not assemble blog post: {completer.error}") + return + + final_blog_post = completer.output + print("\n--- Generated Blog Post ---") + print(final_blog_post) + + # Save to file + output_filename = "generated_blog_post.md" + with open(output_filename, "w", encoding="utf-8") as f: + f.write(final_blog_post) + print(f"\nBlog post saved to: {output_filename}") + + print("\n\n--- Example Script Finished ---") + print("To run this script again: python examples/run_blog_generator.py") + print("Ensure you have a .env file in the project root (../.env) or in the examples/ directory (./.env) with your GEMINI_API_KEY.") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/a2a/__init__.py b/src/a2a/__init__.py index 86893a97..4a7f712f 100644 --- a/src/a2a/__init__.py +++ b/src/a2a/__init__.py @@ -1 +1,5 @@ """The A2A Python SDK.""" + +from .example_agent import ExampleAgent + +__all__ = ["ExampleAgent"] diff --git a/src/a2a/example_agent/__init__.py b/src/a2a/example_agent/__init__.py new file mode 100644 index 00000000..71e34aa2 --- /dev/null +++ b/src/a2a/example_agent/__init__.py @@ -0,0 +1,3 @@ +from .agent import ExampleAgent + +__all__ = ["ExampleAgent"] diff --git a/src/a2a/example_agent/agent.py b/src/a2a/example_agent/agent.py new file mode 100644 index 00000000..b825b681 --- /dev/null +++ b/src/a2a/example_agent/agent.py @@ -0,0 +1,217 @@ +import os +import google.generativeai as genai + +# Placeholder classes - replace with actual imports from a2a.sdk when available +class Agent: + def __init__(self, capabilities): + self.capabilities = capabilities + +class AgentCapability: + def __init__(self, name, description, parameters): + self.name = name + self.description = description + self.parameters = parameters + +class TaskCompleter: + def complete_task(self, output): + raise NotImplementedError + + def fail_task(self, error_message): + raise NotImplementedError + +class TaskParameters: + def __init__(self, parameters): + self._parameters = parameters + + def get(self, key: str, default: any = None) -> any: + return self._parameters.get(key, default) + +# End of placeholder classes + +from a2a.types import AgentCapabilities # Assuming AgentCapabilities is available +# from a2a.server.task_queue import TaskParameters # Assuming TaskParameters is available + + +class ExampleAgent(Agent): + """An example agent that echoes back the input.""" + + def __init__(self): + self.api_key = os.getenv("GEMINI_API_KEY") + if not self.api_key: + raise ValueError("GEMINI_API_KEY environment variable not set.") + genai.configure(api_key=self.api_key) + + super().__init__( + capabilities=AgentCapabilities( + initial_prompt="You are a helpful AI assistant that can write blog posts.", + capabilities=[ + AgentCapability( + name="generate_blog_topic", + description="Generates a blog post topic based on provided keywords.", + parameters={ + "type": "object", + "properties": { + "keywords": { + "type": "array", + "items": {"type": "string"}, + "description": "Keywords to base the topic on.", + }, + }, + "required": ["keywords"], + }, + ), + AgentCapability( + name="generate_blog_outline", + description="Generates a blog post outline for a given topic.", + parameters={ + "type": "object", + "properties": { + "topic": { + "type": "string", + "description": "The topic of the blog post.", + }, + }, + "required": ["topic"], + }, + ), + AgentCapability( + name="write_blog_section", + description="Writes content for a specific blog section using a Gemini model.", + parameters={ + "type": "object", + "properties": { + "section_prompt": { + "type": "string", + "description": "The prompt or title for the blog section.", + }, + "model": { + "type": "string", + "description": "The Gemini model to use (e.g., 'gemini-1.5-flash-latest'). Defaults to 'gemini-1.5-flash-latest'.", + "default": "gemini-1.5-flash-latest", + } + }, + "required": ["section_prompt"], + }, + ), + AgentCapability( + name="assemble_blog_post", + description="Assembles the title and sections into a formatted blog post.", + parameters={ + "type": "object", + "properties": { + "title": { + "type": "string", + "description": "The title of the blog post.", + }, + "sections": { + "type": "array", + "items": {"type": "string"}, + "description": "The written content of each blog section.", + }, + }, + "required": ["title", "sections"], + }, + ), + AgentCapability( + name="echo", # Keep the echo capability for basic testing + description="Echoes back the input.", + parameters={ + "type": "object", + "properties": { + "message": { + "type": "string", + "description": "The message to echo.", + }, + }, + "required": ["message"], + }, + ) + ], + ) + ) + self.model_text = genai.GenerativeModel('gemini-1.5-flash-latest') # Default model for text + + async def _call_gemini_api(self, prompt: str, model_name: str) -> str: + try: + model_to_use = genai.GenerativeModel(model_name) + response = await model_to_use.generate_content_async(prompt) + return response.text + except Exception as e: + # Log the exception e + print(f"Error calling Gemini API: {e}") + # Consider how to propagate this error or handle it, + # for now, returning an error message in the content. + return f"Error generating content: {str(e)}" + + async def generate_blog_topic(self, keywords: list[str]) -> str: + prompt = f"Generate a compelling blog post topic based on the following keywords: {', '.join(keywords)}. Provide only the topic text." + return await self._call_gemini_api(prompt, self.model_text.model_name) + + async def generate_blog_outline(self, topic: str) -> list[str]: + prompt = f"Generate a blog post outline (list of main sections) for the topic: '{topic}'. Return the outline as a numbered list. Each section on a new line." + response_text = await self._call_gemini_api(prompt, self.model_text.model_name) + # Simple parsing for numbered list. Robust parsing might be needed. + return [line.strip() for line in response_text.splitlines() if line.strip()] + + + async def write_blog_section(self, section_prompt: str, model: str) -> str: + prompt = f"Write a detailed blog post section for the following prompt/title: '{section_prompt}'." + return await self._call_gemini_api(prompt, model) + + def assemble_blog_post(self, title: str, sections: list[str]) -> str: + post = f"# {title}\n\n" + for i, section_content in enumerate(sections): + # Assuming sections might already have their own subheadings or are just paragraphs + post += f"{section_content}\n\n" + return post.strip() + + async def execute_task( + self, + task_completer: TaskCompleter, + parameters: TaskParameters, + # TODO: Add type hint for function_to_call + # function_to_call: FunctionToCallBase, # Still a TODO + ): + # capability_name = function_to_call.name + capability_name = parameters.get("capability_name") # Expect capability_name in parameters for now + + try: + if capability_name == "echo": + message = parameters.get("message", "No message provided.") + task_completer.complete_task(output=message) + elif capability_name == "generate_blog_topic": + keywords = parameters.get("keywords", []) + if not keywords: + task_completer.fail_task(error_message="Keywords are required for generate_blog_topic.") + return + output = await self.generate_blog_topic(keywords=keywords) + task_completer.complete_task(output=output) + elif capability_name == "generate_blog_outline": + topic = parameters.get("topic") + if not topic: + task_completer.fail_task(error_message="A topic is required for generate_blog_outline.") + return + output = await self.generate_blog_outline(topic=topic) + task_completer.complete_task(output=output) + elif capability_name == "write_blog_section": + section_prompt = parameters.get("section_prompt") + model = parameters.get("model", self.model_text.model_name) + if not section_prompt: + task_completer.fail_task(error_message="A section_prompt is required for write_blog_section.") + return + output = await self.write_blog_section(section_prompt=section_prompt, model=model) + task_completer.complete_task(output=output) + elif capability_name == "assemble_blog_post": + title = parameters.get("title") + sections = parameters.get("sections", []) + if not title or not sections: + task_completer.fail_task(error_message="Title and sections are required for assemble_blog_post.") + return + output = self.assemble_blog_post(title=title, sections=sections) + task_completer.complete_task(output=output) + else: + task_completer.fail_task(error_message=f"Unknown capability: {capability_name}") + except Exception as e: + # Log e + print(f"Error during task execution for {capability_name}: {e}") + task_completer.fail_task(error_message=f"Error executing {capability_name}: {str(e)}") diff --git a/tests/server/agent_execution/test_example_agent.py b/tests/server/agent_execution/test_example_agent.py new file mode 100644 index 00000000..aa5212a1 --- /dev/null +++ b/tests/server/agent_execution/test_example_agent.py @@ -0,0 +1,163 @@ +import pytest +from unittest.mock import patch, AsyncMock +import google.generativeai as genai + +# Keep existing imports for ExampleAgent, MockTaskCompleter, TaskParameters (adjust path if necessary) +from a2a.example_agent.agent import ExampleAgent, TaskCompleter, TaskParameters # Ensure this path is correct + + +class MockTaskCompleter(TaskCompleter): # Already correctly defined from previous content + def __init__(self): + self.completed_task_output = None + self.failed_task_error_message = None + + def complete_task(self, output: any): + self.completed_task_output = output + + def fail_task(self, error_message: str): + self.failed_task_error_message = error_message + + +@pytest.fixture +def agent(monkeypatch): + # Mock the environment variable for GEMINI_API_KEY + monkeypatch.setenv("GEMINI_API_KEY", "test_api_key") + return ExampleAgent() + +@pytest.mark.asyncio +@patch('google.generativeai.GenerativeModel.generate_content_async') +async def test_generate_blog_topic(mock_generate_content, agent): + task_completer = MockTaskCompleter() + # Create an AsyncMock for the return value of generate_content_async + # This AsyncMock needs a 'text' attribute. + async_mock_response = AsyncMock() + async_mock_response.text = "AI in Education" + mock_generate_content.return_value = async_mock_response + + parameters = TaskParameters(parameters={"capability_name": "generate_blog_topic", "keywords": ["AI", "education"]}) + + await agent.execute_task(task_completer, parameters) + + assert task_completer.completed_task_output == "AI in Education" + mock_generate_content.assert_called_once() + assert "Generate a compelling blog post topic" in mock_generate_content.call_args[0][0] + assert "AI" in mock_generate_content.call_args[0][0] + assert "education" in mock_generate_content.call_args[0][0] + +@pytest.mark.asyncio +@patch('google.generativeai.GenerativeModel.generate_content_async') +async def test_generate_blog_outline(mock_generate_content, agent): + task_completer = MockTaskCompleter() + async_mock_response = AsyncMock() + async_mock_response.text = "1. Introduction\n2. Main Point\n3. Conclusion" + mock_generate_content.return_value = async_mock_response + + parameters = TaskParameters(parameters={"capability_name": "generate_blog_outline", "topic": "AI in Education"}) + + await agent.execute_task(task_completer, parameters) + + expected_outline = ["1. Introduction", "2. Main Point", "3. Conclusion"] + assert task_completer.completed_task_output == expected_outline + mock_generate_content.assert_called_once() + assert "Generate a blog post outline" in mock_generate_content.call_args[0][0] + assert "AI in Education" in mock_generate_content.call_args[0][0] + +@pytest.mark.asyncio +@patch('google.generativeai.GenerativeModel.generate_content_async') +async def test_write_blog_section(mock_generate_content, agent): + task_completer = MockTaskCompleter() + async_mock_response = AsyncMock() + async_mock_response.text = "This is a detailed section about AI." + mock_generate_content.return_value = async_mock_response + + parameters = TaskParameters(parameters={ + "capability_name": "write_blog_section", + "section_prompt": "Introduction to AI", + "model": "gemini-1.5-flash-latest" + }) + + await agent.execute_task(task_completer, parameters) + + assert task_completer.completed_task_output == "This is a detailed section about AI." + mock_generate_content.assert_called_once() + assert "Write a detailed blog post section" in mock_generate_content.call_args[0][0] + assert "Introduction to AI" in mock_generate_content.call_args[0][0] + + +@pytest.mark.asyncio +async def test_assemble_blog_post(agent): # No API call, so no mock needed here + task_completer = MockTaskCompleter() + parameters = TaskParameters(parameters={ + "capability_name": "assemble_blog_post", + "title": "My AI Blog", + "sections": ["Section 1 content.", "Section 2 content."] + }) + + await agent.execute_task(task_completer, parameters) + + expected_post = "# My AI Blog\n\nSection 1 content.\n\nSection 2 content." + assert task_completer.completed_task_output == expected_post + +@pytest.mark.asyncio +async def test_execute_task_missing_parameters(agent): + task_completer = MockTaskCompleter() + + # Test generate_blog_topic with missing keywords + parameters_topic = TaskParameters(parameters={"capability_name": "generate_blog_topic"}) + await agent.execute_task(task_completer, parameters_topic) + assert "Keywords are required" in task_completer.failed_task_error_message + task_completer.failed_task_error_message = None # Reset for next check + + # Test generate_blog_outline with missing topic + parameters_outline = TaskParameters(parameters={"capability_name": "generate_blog_outline"}) + await agent.execute_task(task_completer, parameters_outline) + assert "A topic is required" in task_completer.failed_task_error_message + task_completer.failed_task_error_message = None + + # Test write_blog_section with missing section_prompt + parameters_section = TaskParameters(parameters={"capability_name": "write_blog_section"}) + await agent.execute_task(task_completer, parameters_section) + assert "A section_prompt is required" in task_completer.failed_task_error_message + task_completer.failed_task_error_message = None + + # Test assemble_blog_post with missing title + parameters_assemble_title = TaskParameters(parameters={"capability_name": "assemble_blog_post", "sections": ["test"]}) + await agent.execute_task(task_completer, parameters_assemble_title) + assert "Title and sections are required" in task_completer.failed_task_error_message + task_completer.failed_task_error_message = None + + # Test assemble_blog_post with missing sections + parameters_assemble_sections = TaskParameters(parameters={"capability_name": "assemble_blog_post", "title": "test"}) + await agent.execute_task(task_completer, parameters_assemble_sections) + assert "Title and sections are required" in task_completer.failed_task_error_message + +@pytest.mark.asyncio +async def test_agent_initialization_no_api_key(monkeypatch): + monkeypatch.delenv("GEMINI_API_KEY", raising=False) + with pytest.raises(ValueError) as excinfo: + ExampleAgent() + assert "GEMINI_API_KEY environment variable not set" in str(excinfo.value) + +@pytest.mark.asyncio +async def test_example_agent_echo(agent): # Make sure 'agent' fixture is used + task_completer = MockTaskCompleter() + parameters = TaskParameters(parameters={"capability_name": "echo", "message": "Hello, world!"}) + await agent.execute_task(task_completer, parameters) + assert task_completer.completed_task_output == "Hello, world!" + assert task_completer.failed_task_error_message is None + +@pytest.mark.asyncio +async def test_example_agent_echo_no_message(agent): + task_completer = MockTaskCompleter() + parameters = TaskParameters(parameters={"capability_name": "echo"}) # "message" is optional + await agent.execute_task(task_completer, parameters) + assert task_completer.completed_task_output == "No message provided." + assert task_completer.failed_task_error_message is None + +@pytest.mark.asyncio +async def test_example_agent_unknown_capability(agent): + task_completer = MockTaskCompleter() + parameters = TaskParameters(parameters={"capability_name": "non_existent_capability", "message": "test"}) + await agent.execute_task(task_completer, parameters) + assert task_completer.failed_task_error_message == "Unknown capability: non_existent_capability" + assert task_completer.completed_task_output is None