|
| 1 | +import os |
| 2 | +from contextlib import asynccontextmanager |
| 3 | + |
| 4 | +import dotenv |
| 5 | +from langchain.output_parsers import PydanticOutputParser |
| 6 | +from langchain_mcp_adapters.tools import load_mcp_tools |
| 7 | +from langgraph.graph import END, StateGraph |
| 8 | +from langgraph.prebuilt import create_react_agent |
| 9 | +from langgraph.prebuilt.chat_agent_executor import AgentState |
| 10 | +from mcp import ClientSession |
| 11 | +from mcp.client.sse import sse_client |
| 12 | +from pydantic import BaseModel |
| 13 | +from uipath_langchain.chat.models import UiPathAzureChatOpenAI |
| 14 | + |
| 15 | +dotenv.load_dotenv() |
| 16 | + |
| 17 | + |
| 18 | +class GraphInput(BaseModel): |
| 19 | + """Input containing a query about what function to build""" |
| 20 | + |
| 21 | + query: str |
| 22 | + |
| 23 | + |
| 24 | +class GraphOutput(BaseModel): |
| 25 | + """Final function code""" |
| 26 | + |
| 27 | + function_code: str |
| 28 | + |
| 29 | + |
| 30 | +class Spec(BaseModel): |
| 31 | + name: str |
| 32 | + description: str |
| 33 | + input_schema: str |
| 34 | + expected_behavior: str |
| 35 | + |
| 36 | + |
| 37 | +class GraphState(AgentState): |
| 38 | + """Graph state""" |
| 39 | + |
| 40 | + name: str = "" |
| 41 | + description: str = "" |
| 42 | + input_schema: str = "" |
| 43 | + expected_behavior: str = "" |
| 44 | + validation_feedback: str = "" |
| 45 | + attempts: int = 0 |
| 46 | + |
| 47 | + |
| 48 | +FUNCTIONS_MCP_SERVER_URL = os.getenv("FUNCTIONS_MCP_SERVER_URL") |
| 49 | +UIPATH_ACCESS_TOKEN = os.getenv("UIPATH_ACCESS_TOKEN") |
| 50 | + |
| 51 | + |
| 52 | +@asynccontextmanager |
| 53 | +async def make_graph(): |
| 54 | + async with sse_client( |
| 55 | + url=FUNCTIONS_MCP_SERVER_URL, |
| 56 | + headers={"Authorization": f"Bearer {UIPATH_ACCESS_TOKEN}"}, |
| 57 | + timeout=60, |
| 58 | + ) as (read, write): |
| 59 | + async with ClientSession(read, write) as session: |
| 60 | + await session.initialize() |
| 61 | + tools = await load_mcp_tools(session) |
| 62 | + |
| 63 | + model = UiPathAzureChatOpenAI( |
| 64 | + model="gpt-4.1-2025-04-14", |
| 65 | + temperature=0, |
| 66 | + max_tokens=10000, |
| 67 | + timeout=120, |
| 68 | + max_retries=2, |
| 69 | + ) |
| 70 | + |
| 71 | + async def planner_agent(input: GraphInput) -> GraphState: |
| 72 | + """Determines the function details based on the user query""" |
| 73 | + |
| 74 | + planning_parser = PydanticOutputParser(pydantic_object=Spec) |
| 75 | + planning_prompt = f"""You are a planning agent that analyzes user requests for Python functions and determines the appropriate function specifications. |
| 76 | +
|
| 77 | +Based on the user's query, determine: |
| 78 | +1. A clear, concise function name following Python naming conventions |
| 79 | +2. A detailed description of what the function should do |
| 80 | +3. An input JSON schema specifying all parameters the function should accept |
| 81 | +4. The expected behavior including example inputs and outputs |
| 82 | +
|
| 83 | +The user requested: {input.query} |
| 84 | +
|
| 85 | +Format your response as a JSON object with the following keys: |
| 86 | +- name: The function name (snake_case) |
| 87 | +- description: Detailed description |
| 88 | +- input_schema: JSON schema for the input parameters |
| 89 | +- expected_behavior: Clear explanation of what the function should return and how it should handle different cases |
| 90 | +
|
| 91 | +Respond with a JSON object containing: |
| 92 | +{planning_parser.get_format_instructions()} |
| 93 | +""" |
| 94 | + |
| 95 | + result = await model.ainvoke(planning_prompt) |
| 96 | + |
| 97 | + specs = planning_parser.parse(result.content) |
| 98 | + |
| 99 | + # Return the specifications along with the original query |
| 100 | + return { |
| 101 | + "name": specs.name, |
| 102 | + "description": specs.description, |
| 103 | + "input_schema": specs.input_schema, |
| 104 | + "expected_behavior": specs.expected_behavior, |
| 105 | + } |
| 106 | + |
| 107 | + async def builder_agent(state: GraphState) -> GraphState: |
| 108 | + system_message = f"""You are a professional developer tasked with creating a Python function. |
| 109 | +
|
| 110 | +Function Name: {state.get("name")} |
| 111 | +Function Description: {state.get("description")} |
| 112 | +Input JSON Schema: {state.get("input_schema")} |
| 113 | +Expected Behavior: {state.get("expected_behavior")} |
| 114 | +""" |
| 115 | + |
| 116 | + if state.get("validation_feedback"): |
| 117 | + print(state.get("validation_feedback")) |
| 118 | + system_message += f""" |
| 119 | +Previous validation feedback: |
| 120 | +{state.get("validation_feedback")} |
| 121 | +
|
| 122 | +Based on this feedback, improve your implementation of the function. |
| 123 | +""" |
| 124 | + |
| 125 | + system_message += """ |
| 126 | +Write Python code that implements this function. The code MUST: |
| 127 | +1. Define a SINGLE function with the specified name that accepts input parameters according to the INPUT JSON Schema parameters list |
| 128 | +2. Return the expected outputs as specified in the expected behavior |
| 129 | +3. Be robust, handle edge cases, and include appropriate error handling |
| 130 | +4. Use ONLY Python's standard library - DO NOT use ANY external libraries or subprocess calls |
| 131 | +5. If you need to parse or analyze code, use Python's built-in modules like ast, tokenize, etc. |
| 132 | +6. If you need to handle web requests, use only urllib from the standard library |
| 133 | +
|
| 134 | +You MUST start by registering the code using the `add_function` tool. The function's description must also contain the expected behavior with example inputs/outputs. You MUST specify the INPUT JSON SCHEMA. |
| 135 | +Then you must respond with the complete function code. |
| 136 | +IMPORTANT: Your response should only contain the complete function code as a clearly formatted Python code block. This is crucial as the final function code will be extracted from your response. |
| 137 | +""" |
| 138 | + |
| 139 | + add_tool = [tool for tool in tools if tool.name == "add_function"] |
| 140 | + |
| 141 | + agent = create_react_agent(model, tools=add_tool, prompt=system_message) |
| 142 | + result = await agent.ainvoke(state) |
| 143 | + |
| 144 | + # Extract the builder's message |
| 145 | + builder_message = result["messages"][-1] |
| 146 | + updated_messages = state.get("messages", []) + [builder_message] |
| 147 | + |
| 148 | + # Increment the attempts counter and save the extracted code |
| 149 | + attempts = state.get("attempts", 0) + 1 |
| 150 | + return { |
| 151 | + **state, |
| 152 | + "messages": updated_messages, |
| 153 | + "attempts": attempts, |
| 154 | + } |
| 155 | + |
| 156 | + async def validator_agent(state: GraphState) -> GraphState: |
| 157 | + # Create mock test cases based on expected behavior |
| 158 | + test_case_prompt = f"""Create 3 test cases with real but MINIMAL data for this tool: |
| 159 | +
|
| 160 | + Function Name: {state.get("name")} |
| 161 | + Function Description: {state.get("description")} |
| 162 | + Input Schema: {state.get("input_schema")} |
| 163 | + Expected Behavior: {state.get("expected_behavior")} |
| 164 | +
|
| 165 | + This function may depend on specific data or files being present for testing. To generate realistic and minimal test cases: |
| 166 | +
|
| 167 | + 1. If files or input data need to exist before the function runs: |
| 168 | + - Define a **setup function** using the `add_function` tool. |
| 169 | + - This setup function can create any necessary files, folders, or data structures required for valid input. |
| 170 | + - You may generate sample files with small, valid content. |
| 171 | +
|
| 172 | + 2. Use `call_function` to execute the setup function and create the test environment. |
| 173 | +
|
| 174 | + 3. Based on the environment created, define test case inputs that match the input schema, and describe expected results. |
| 175 | +
|
| 176 | + 4. Return a list of 3 test cases. Each test case must include: |
| 177 | + - `"input"`: The actual input values for the main function, based on the seeded environment |
| 178 | + - `"expected_output"`: What the function should return |
| 179 | +
|
| 180 | + **Example use case:** If the function reads files from disk, your setup function should create a temporary folder and write some files into it. Then, test the function against that folder path. |
| 181 | + """ |
| 182 | + |
| 183 | + seeder = create_react_agent( |
| 184 | + model, tools=tools, prompt=test_case_prompt |
| 185 | + ) |
| 186 | + |
| 187 | + test_result = await seeder.ainvoke(state) |
| 188 | + |
| 189 | + system_message = f"""You are a function validator. Test if this function works correctly: |
| 190 | +
|
| 191 | + Function Name: {state.get("name")} |
| 192 | + Function Description: {state.get("description")} |
| 193 | + Input Schema: {state.get("input_schema")} |
| 194 | +
|
| 195 | + Test Cases: |
| 196 | + {test_result["messages"][-1].content} |
| 197 | +
|
| 198 | + Use the call_function tool with these test cases and verify the results match expected outputs. |
| 199 | +
|
| 200 | + If the function doesn't work with the test cases: |
| 201 | + 1. Explain EXACTLY what went wrong |
| 202 | + 2. Provide CLEAR feedback on how to fix the issues |
| 203 | + 3. Be specific about what changes are needed |
| 204 | +
|
| 205 | + If all of the test cases are successful, you MUST reply with "ALL TESTS PASSED". |
| 206 | + """ |
| 207 | + |
| 208 | + validator = create_react_agent( |
| 209 | + model, tools=tools, prompt=system_message |
| 210 | + ) |
| 211 | + |
| 212 | + validation_result = await validator.ainvoke(state) |
| 213 | + |
| 214 | + validation_message = validation_result["messages"][-1].content |
| 215 | + |
| 216 | + return {**state, "validation_feedback": validation_message} |
| 217 | + |
| 218 | + async def extractor_agent(state: GraphState) -> GraphOutput: |
| 219 | + # The function code is already extracted and stored in the state |
| 220 | + # Just return it as the output |
| 221 | + return GraphOutput(function_code=state.get("messages")[-1].content) |
| 222 | + |
| 223 | + def should_continue_loop(state: GraphState): |
| 224 | + # Continue the loop if: |
| 225 | + # 1. Tests haven't passed yet (feedback doesn't contain "ALL TESTS PASSED") |
| 226 | + # 2. We haven't exceeded the maximum number of attempts (5) |
| 227 | + return ( |
| 228 | + "ALL TESTS PASSED" not in state.get("validation_feedback").upper() |
| 229 | + and state.get("attempts") < 5 |
| 230 | + ) |
| 231 | + |
| 232 | + # Build the workflow |
| 233 | + workflow = StateGraph(GraphState, input=GraphInput, output=GraphOutput) |
| 234 | + |
| 235 | + workflow.add_node("planner_agent", planner_agent) |
| 236 | + workflow.add_node("builder_agent", builder_agent) |
| 237 | + workflow.add_node("validator_agent", validator_agent) |
| 238 | + workflow.add_node("extractor_agent", extractor_agent) |
| 239 | + |
| 240 | + workflow.add_edge("planner_agent", "builder_agent") |
| 241 | + workflow.add_edge("builder_agent", "validator_agent") |
| 242 | + workflow.add_edge("validator_agent", "extractor_agent") |
| 243 | + workflow.add_edge("extractor_agent", END) |
| 244 | + |
| 245 | + workflow.set_entry_point("planner_agent") |
| 246 | + |
| 247 | + workflow.add_conditional_edges( |
| 248 | + "validator_agent", |
| 249 | + lambda s: "builder_agent" |
| 250 | + if should_continue_loop(s) |
| 251 | + else "extractor_agent", |
| 252 | + ) |
| 253 | + |
| 254 | + # Compile the graph |
| 255 | + graph = workflow.compile() |
| 256 | + |
| 257 | + yield graph |
0 commit comments