|
1 | | -# Agent Chat with Async Human Inputs |
| 1 | +# Agent Chat with Async Operations |
2 | 2 | # |
3 | | -# We are going to create an agent that can chat with a human asynchronously. The agent will be able to respond to messages from the human and will also be able to send messages to the human. |
| 3 | +# We are going to create agents that can perform asynchronous operations and chat with each other. |
| 4 | +# This example demonstrates async capabilities without requiring human input. |
4 | 5 | # |
5 | | -# We are going to use AgentOps to monitor the agent's performance and observe its interactions with the human. |
| 6 | +# We are going to use AgentOps to monitor the agent's performance and observe their interactions. |
6 | 7 | # # Install required dependencies |
7 | 8 | # %pip install agentops |
8 | 9 | # %pip install ag2 |
|
25 | 26 | os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_api_key_here") |
26 | 27 | os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here") |
27 | 28 |
|
28 | | -agentops.init(auto_start_session=False, trace_name="AG2 Async Human Input") |
29 | | -tracer = agentops.start_trace( |
30 | | - trace_name="AG2 Agent chat with Async Human Inputs", tags=["ag2-chat-async-human-inputs", "agentops-example"] |
31 | | -) |
| 29 | +agentops.init(auto_start_session=False, trace_name="AG2 Async Demo") |
| 30 | +tracer = agentops.start_trace(trace_name="AG2 Async Agent Demo", tags=["ag2-async-demo", "agentops-example"]) |
32 | 31 |
|
33 | 32 |
|
34 | | -# Define an asynchronous function that simulates some asynchronous task (e.g., I/O operation) |
35 | | -async def my_asynchronous_function(): |
36 | | - print("Start asynchronous function") |
37 | | - await asyncio.sleep(2) # Simulate some asynchronous task (e.g., I/O operation) |
38 | | - print("End asynchronous function") |
39 | | - return "input" |
| 33 | +# Define an asynchronous function that simulates async processing |
| 34 | +async def simulate_async_processing(task_name: str, delay: float = 1.0) -> str: |
| 35 | + """ |
| 36 | + Simulate some asynchronous processing (e.g., API calls, file operations, etc.) |
| 37 | + """ |
| 38 | + print(f"🔄 Starting async task: {task_name}") |
| 39 | + await asyncio.sleep(delay) # Simulate async work |
| 40 | + print(f"✅ Completed async task: {task_name}") |
| 41 | + return f"Processed: {task_name}" |
40 | 42 |
|
41 | 43 |
|
42 | | -# Define a custom class `CustomisedUserProxyAgent` that extends `UserProxyAgent` |
43 | | -class CustomisedUserProxyAgent(UserProxyAgent): |
44 | | - # Asynchronous function to get human input |
| 44 | +# Define a custom UserProxyAgent that simulates automated responses |
| 45 | +class AutomatedUserProxyAgent(UserProxyAgent): |
| 46 | + def __init__(self, name: str, **kwargs): |
| 47 | + super().__init__(name, **kwargs) |
| 48 | + self.response_count = 0 |
| 49 | + self.predefined_responses = [ |
| 50 | + "Yes, please generate interview questions for these topics.", |
| 51 | + "The questions look good. Can you make them more specific to senior-level positions?", |
| 52 | + "Perfect! These questions are exactly what we need. Thank you!", |
| 53 | + ] |
| 54 | + |
45 | 55 | async def a_get_human_input(self, prompt: str) -> str: |
46 | | - # Call the asynchronous function to get user input asynchronously |
47 | | - user_input = await my_asynchronous_function() |
48 | | - return user_input |
| 56 | + # Simulate async processing before responding |
| 57 | + await simulate_async_processing(f"Processing user input #{self.response_count + 1}") |
| 58 | + |
| 59 | + if self.response_count < len(self.predefined_responses): |
| 60 | + response = self.predefined_responses[self.response_count] |
| 61 | + self.response_count += 1 |
| 62 | + print(f"👤 User: {response}") |
| 63 | + return response |
| 64 | + else: |
| 65 | + print("👤 User: TERMINATE") |
| 66 | + return "TERMINATE" |
49 | 67 |
|
50 | | - # Asynchronous function to receive a message |
51 | 68 | async def a_receive( |
52 | 69 | self, |
53 | 70 | message: Union[Dict, str], |
54 | 71 | sender, |
55 | 72 | request_reply: Optional[bool] = None, |
56 | 73 | silent: Optional[bool] = False, |
57 | 74 | ): |
58 | | - # Call the superclass method to handle message reception asynchronously |
59 | 75 | await super().a_receive(message, sender, request_reply, silent) |
60 | 76 |
|
61 | 77 |
|
62 | | -class CustomisedAssistantAgent(AssistantAgent): |
63 | | - # Asynchronous function to get human input |
64 | | - async def a_get_human_input(self, prompt: str) -> str: |
65 | | - # Call the asynchronous function to get user input asynchronously |
66 | | - user_input = await my_asynchronous_function() |
67 | | - return user_input |
68 | | - |
69 | | - # Asynchronous function to receive a message |
| 78 | +class AsyncAssistantAgent(AssistantAgent): |
70 | 79 | async def a_receive( |
71 | 80 | self, |
72 | 81 | message: Union[Dict, str], |
73 | 82 | sender, |
74 | 83 | request_reply: Optional[bool] = None, |
75 | 84 | silent: Optional[bool] = False, |
76 | 85 | ): |
77 | | - # Call the superclass method to handle message reception asynchronously |
| 86 | + # Simulate async processing before responding |
| 87 | + await simulate_async_processing("Analyzing request and preparing response", 0.5) |
78 | 88 | await super().a_receive(message, sender, request_reply, silent) |
79 | 89 |
|
80 | 90 |
|
81 | 91 | nest_asyncio.apply() |
82 | 92 |
|
83 | 93 |
|
84 | 94 | async def main(): |
85 | | - boss = CustomisedUserProxyAgent( |
86 | | - name="boss", |
87 | | - human_input_mode="ALWAYS", |
88 | | - max_consecutive_auto_reply=0, |
| 95 | + print("🚀 Starting AG2 Async Demo") |
| 96 | + print("=" * 50) |
| 97 | + |
| 98 | + # Create agents with automated behavior |
| 99 | + user_proxy = AutomatedUserProxyAgent( |
| 100 | + name="hiring_manager", |
| 101 | + human_input_mode="NEVER", # No human input required |
| 102 | + max_consecutive_auto_reply=3, |
89 | 103 | code_execution_config=False, |
| 104 | + is_termination_msg=lambda msg: "TERMINATE" in str(msg.get("content", "")), |
90 | 105 | ) |
91 | 106 |
|
92 | | - assistant = CustomisedAssistantAgent( |
93 | | - name="assistant", |
94 | | - system_message="You will provide some agenda, and I will create questions for an interview meeting. Every time when you generate question then you have to ask user for feedback and if user provides the feedback then you have to incorporate that feedback and generate new set of questions and if user don't want to update then terminate the process and exit", |
| 107 | + assistant = AsyncAssistantAgent( |
| 108 | + name="interview_consultant", |
| 109 | + system_message="""You are an expert interview consultant. When given interview topics, |
| 110 | + you create thoughtful, relevant questions. You ask for feedback and incorporate it. |
| 111 | + When the user is satisfied with the questions, end with 'TERMINATE'.""", |
95 | 112 | llm_config={"config_list": [{"model": "gpt-4o-mini", "api_key": os.environ.get("OPENAI_API_KEY")}]}, |
| 113 | + is_termination_msg=lambda msg: "TERMINATE" in str(msg.get("content", "")), |
96 | 114 | ) |
97 | 115 |
|
98 | | - await boss.a_initiate_chat( |
99 | | - assistant, |
100 | | - message="Resume Review, Technical Skills Assessment, Project Discussion, Job Role Expectations, Closing Remarks.", |
101 | | - n_results=3, |
102 | | - ) |
103 | | - |
104 | | - |
105 | | -# await main() |
106 | | -agentops.end_trace(tracer, end_state="Success") |
107 | | - |
108 | | -# Let's check programmatically that spans were recorded in AgentOps |
109 | | -print("\n" + "=" * 50) |
110 | | -print("Now let's verify that our LLM calls were tracked properly...") |
111 | | -try: |
112 | | - agentops.validate_trace_spans(trace_context=tracer) |
113 | | - print("\n✅ Success! All LLM spans were properly recorded in AgentOps.") |
114 | | -except agentops.ValidationError as e: |
115 | | - print(f"\n❌ Error validating spans: {e}") |
116 | | - raise |
| 116 | + try: |
| 117 | + print("🤖 Initiating automated conversation...") |
| 118 | + await user_proxy.a_initiate_chat( |
| 119 | + assistant, |
| 120 | + message="""I need help creating interview questions for these topics: |
| 121 | + - Resume Review |
| 122 | + - Technical Skills Assessment |
| 123 | + - Project Discussion |
| 124 | + - Job Role Expectations |
| 125 | + - Closing Remarks |
| 126 | + |
| 127 | + Please create 2-3 questions for each topic.""", |
| 128 | + max_turns=6, |
| 129 | + ) |
| 130 | + except Exception as e: |
| 131 | + print(f"\n❌ Error occurred: {e}") |
| 132 | + finally: |
| 133 | + agentops.end_trace(tracer, end_state="Success") |
| 134 | + |
| 135 | + # Validate AgentOps tracking |
| 136 | + print("\n" + "=" * 50) |
| 137 | + print("🔍 Validating AgentOps tracking...") |
| 138 | + try: |
| 139 | + agentops.validate_trace_spans(trace_context=tracer) |
| 140 | + print("✅ Success! All LLM spans were properly recorded in AgentOps.") |
| 141 | + except agentops.ValidationError as e: |
| 142 | + print(f"❌ Error validating spans: {e}") |
| 143 | + raise |
| 144 | + |
| 145 | + print("\n🎉 Demo completed successfully!") |
| 146 | + |
| 147 | + |
| 148 | +if __name__ == "__main__": |
| 149 | + asyncio.run(main()) |
0 commit comments