|
| 1 | +""" |
| 2 | +Sample tests for AgentEx ACP agent. |
| 3 | +
|
| 4 | +This test suite demonstrates how to test the main AgentEx API functions: |
| 5 | +- Non-streaming message sending |
| 6 | +- Streaming message sending |
| 7 | +- Task creation via RPC |
| 8 | +
|
| 9 | +To run these tests: |
| 10 | +1. Make sure the agent is running (via docker-compose or `agentex agents run`) |
| 11 | +2. Set the AGENTEX_API_BASE_URL environment variable if not using default |
| 12 | +3. Run: pytest test_agent.py -v |
| 13 | +
|
| 14 | +Configuration: |
| 15 | +- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) |
| 16 | +- AGENT_NAME: Name of the agent to test (default: hello-acp) |
| 17 | +""" |
| 18 | + |
| 19 | +import os |
| 20 | +from agentex.types import TextContentParam, TextDelta, TextContent |
| 21 | +from agentex.types.agent_rpc_params import ParamsSendMessageRequest |
| 22 | +from agentex.types.task_message_update import StreamTaskMessageDelta, StreamTaskMessageFull |
| 23 | +import pytest |
| 24 | +from agentex import Agentex |
| 25 | + |
| 26 | + |
| 27 | +# Configuration from environment variables |
| 28 | +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") |
| 29 | +AGENT_NAME = os.environ.get("AGENT_NAME", "s000-hello-acp") |
| 30 | + |
| 31 | + |
| 32 | +@pytest.fixture |
| 33 | +def client(): |
| 34 | + """Create an AgentEx client instance for testing.""" |
| 35 | + client = Agentex(base_url=AGENTEX_API_BASE_URL) |
| 36 | + yield client |
| 37 | + # Clean up: close the client connection |
| 38 | + client.close() |
| 39 | + |
| 40 | + |
| 41 | +@pytest.fixture |
| 42 | +def agent_name(): |
| 43 | + """Return the agent name for testing.""" |
| 44 | + return AGENT_NAME |
| 45 | + |
| 46 | + |
| 47 | +class TestNonStreamingMessages: |
| 48 | + """Test non-streaming message sending.""" |
| 49 | + |
| 50 | + def test_send_simple_message(self, client: Agentex, agent_name: str): |
| 51 | + """Test sending a simple message and receiving a response.""" |
| 52 | + |
| 53 | + message_content = "Hello, Agent! How are you?" |
| 54 | + response = client.agents.send_message( |
| 55 | + agent_name=agent_name, |
| 56 | + params=ParamsSendMessageRequest( |
| 57 | + content=TextContentParam( |
| 58 | + author="user", |
| 59 | + content=message_content, |
| 60 | + type="text", |
| 61 | + ) |
| 62 | + ), |
| 63 | + ) |
| 64 | + result = response.result |
| 65 | + assert result is not None |
| 66 | + assert len(result) == 1 |
| 67 | + message = result[0] |
| 68 | + assert isinstance(message.content, TextContent) |
| 69 | + assert ( |
| 70 | + message.content.content |
| 71 | + == f"Hello! I've received your message. Here's a generic response, but in future tutorials we'll see how you can get me to intelligently respond to your message. This is what I heard you say: {message_content}" |
| 72 | + ) |
| 73 | + |
| 74 | + |
| 75 | +class TestStreamingMessages: |
| 76 | + """Test streaming message sending.""" |
| 77 | + |
| 78 | + def test_stream_simple_message(self, client: Agentex, agent_name: str): |
| 79 | + """Test streaming a simple message and aggregating deltas.""" |
| 80 | + |
| 81 | + message_content = "Hello, Agent! Can you stream your response?" |
| 82 | + aggregated_content = "" |
| 83 | + full_content = "" |
| 84 | + received_chunks = False |
| 85 | + |
| 86 | + for chunk in client.agents.send_message_stream( |
| 87 | + agent_name=agent_name, |
| 88 | + params=ParamsSendMessageRequest( |
| 89 | + content=TextContentParam( |
| 90 | + author="user", |
| 91 | + content=message_content, |
| 92 | + type="text", |
| 93 | + ) |
| 94 | + ), |
| 95 | + ): |
| 96 | + received_chunks = True |
| 97 | + task_message_update = chunk.result |
| 98 | + # Collect text deltas as they arrive or check full messages |
| 99 | + if isinstance(task_message_update, StreamTaskMessageDelta) and task_message_update.delta is not None: |
| 100 | + delta = task_message_update.delta |
| 101 | + if isinstance(delta, TextDelta) and delta.text_delta is not None: |
| 102 | + aggregated_content += delta.text_delta |
| 103 | + |
| 104 | + elif isinstance(task_message_update, StreamTaskMessageFull): |
| 105 | + content = task_message_update.content |
| 106 | + if isinstance(content, TextContent): |
| 107 | + full_content = content.content |
| 108 | + |
| 109 | + if not full_content and not aggregated_content: |
| 110 | + raise AssertionError("No content was received in the streaming response.") |
| 111 | + if not received_chunks: |
| 112 | + raise AssertionError("No streaming chunks were received, when at least 1 was expected.") |
| 113 | + |
| 114 | + if full_content: |
| 115 | + assert ( |
| 116 | + full_content |
| 117 | + == f"Hello! I've received your message. Here's a generic response, but in future tutorials we'll see how you can get me to intelligently respond to your message. This is what I heard you say: {message_content}" |
| 118 | + ) |
| 119 | + |
| 120 | + if aggregated_content: |
| 121 | + assert ( |
| 122 | + aggregated_content |
| 123 | + == f"Hello! I've received your message. Here's a generic response, but in future tutorials we'll see how you can get me to intelligently respond to your message. This is what I heard you say: {message_content}" |
| 124 | + ) |
| 125 | + |
| 126 | + |
| 127 | +if __name__ == "__main__": |
| 128 | + pytest.main([__file__, "-v"]) |
0 commit comments