diff --git a/apps/dojo/src/files.json b/apps/dojo/src/files.json index a848fb55e..946544043 100644 --- a/apps/dojo/src/files.json +++ b/apps/dojo/src/files.json @@ -960,7 +960,7 @@ }, { "name": "agent.py", - "content": "\"\"\"\nA demo of shared state between the agent and CopilotKit using LangGraph.\n\"\"\"\n\nimport json\nfrom enum import Enum\nfrom typing import Dict, List, Any, Optional\nimport os\n\n# LangGraph imports\nfrom pydantic import BaseModel, Field\nfrom langchain_core.runnables import RunnableConfig\nfrom langchain_core.callbacks.manager import adispatch_custom_event\nfrom langchain_core.messages import SystemMessage\nfrom langchain_core.tools import tool\nfrom langchain_openai import ChatOpenAI\nfrom langgraph.graph import StateGraph, END, START\nfrom langgraph.types import Command\nfrom langgraph.graph import MessagesState\nfrom langgraph.checkpoint.memory import MemorySaver\n\nclass SkillLevel(str, Enum):\n \"\"\"\n The level of skill required for the recipe.\n \"\"\"\n BEGINNER = \"Beginner\"\n INTERMEDIATE = \"Intermediate\"\n ADVANCED = \"Advanced\"\n\nclass SpecialPreferences(str, Enum):\n \"\"\"\n Special preferences for the recipe.\n \"\"\"\n HIGH_PROTEIN = \"High Protein\"\n LOW_CARB = \"Low Carb\"\n SPICY = \"Spicy\"\n BUDGET_FRIENDLY = \"Budget-Friendly\"\n ONE_POT_MEAL = \"One-Pot Meal\"\n VEGETARIAN = \"Vegetarian\"\n VEGAN = \"Vegan\"\n\nclass CookingTime(str, Enum):\n \"\"\"\n The cooking time of the recipe.\n \"\"\"\n FIVE_MIN = \"5 min\"\n FIFTEEN_MIN = \"15 min\"\n THIRTY_MIN = \"30 min\"\n FORTY_FIVE_MIN = \"45 min\"\n SIXTY_PLUS_MIN = \"60+ min\"\n\nclass Ingredient(BaseModel):\n \"\"\"\n An ingredient.\n \"\"\"\n icon: str = Field(\n description=\"Icon: the actual emoji like 🥕\"\n )\n name: str = Field(description=\"The name of the ingredient\")\n amount: str = Field(description=\"The amount of the ingredient\")\n\nclass Recipe(BaseModel):\n \"\"\"\n A recipe.\n \"\"\"\n skill_level: SkillLevel = \\\n Field(description=\"The skill level required for the recipe\")\n special_preferences: List[SpecialPreferences] = \\\n Field(description=\"A list of special preferences for the recipe\")\n cooking_time: CookingTime = \\\n Field(description=\"The cooking time of the recipe\")\n ingredients: List[Ingredient] = \\\n Field(description=\n \"\"\"Entire list of ingredients for the recipe, including the new ingredients\n and the ones that are already in the recipe: Icon: the actual emoji like 🥕,\n name and amount.\n Like so: 🥕 Carrots (250g)\"\"\"\n )\n instructions: List[str] = \\\n Field(description=\n \"\"\"Entire list of instructions for the recipe,\n including the new instructions and the ones that are already there\"\"\"\n )\n changes: str = \\\n Field(description=\"A description of the changes made to the recipe\")\n\nclass GenerateRecipeArgs(BaseModel): # pylint: disable=missing-class-docstring\n recipe: Recipe\n\n@tool(args_schema=GenerateRecipeArgs)\ndef generate_recipe(recipe: Recipe): # pylint: disable=unused-argument\n \"\"\"\n Using the existing (if any) ingredients and instructions, proceed with the recipe to finish it.\n Make sure the recipe is complete. ALWAYS provide the entire recipe, not just the changes.\n \"\"\"\n\nclass AgentState(MessagesState):\n \"\"\"\n The state of the recipe.\n \"\"\"\n recipe: Optional[Dict[str, Any]] = None\n tools: List[Any]\n\n\nasync def start_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n This is the entry point for the flow.\n \"\"\"\n\n # Initialize recipe if not exists\n if \"recipe\" not in state or state[\"recipe\"] is None:\n state[\"recipe\"] = {\n \"skill_level\": SkillLevel.BEGINNER.value,\n \"special_preferences\": [],\n \"cooking_time\": CookingTime.FIFTEEN_MIN.value,\n \"ingredients\": [{\"icon\": \"🍴\", \"name\": \"Sample Ingredient\", \"amount\": \"1 unit\"}],\n \"instructions\": [\"First step instruction\"]\n }\n # Emit the initial state to ensure it's properly shared with the frontend\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n return Command(\n goto=\"chat_node\",\n update={\n \"messages\": state[\"messages\"],\n \"recipe\": state[\"recipe\"]\n }\n )\n\nasync def chat_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n Standard chat node.\n \"\"\"\n # Create a safer serialization of the recipe\n recipe_json = \"No recipe yet\"\n if \"recipe\" in state and state[\"recipe\"] is not None:\n try:\n recipe_json = json.dumps(state[\"recipe\"], indent=2)\n except Exception as e: # pylint: disable=broad-exception-caught\n recipe_json = f\"Error serializing recipe: {str(e)}\"\n\n system_prompt = f\"\"\"You are a helpful assistant for creating recipes. \n This is the current state of the recipe: {recipe_json}\n You can improve the recipe by calling the generate_recipe tool.\n \n IMPORTANT:\n 1. Create a recipe using the existing ingredients and instructions. Make sure the recipe is complete.\n 2. For ingredients, append new ingredients to the existing ones.\n 3. For instructions, append new steps to the existing ones.\n 4. 'ingredients' is always an array of objects with 'icon', 'name', and 'amount' fields\n 5. 'instructions' is always an array of strings\n\n If you have just created or modified the recipe, just answer in one sentence what you did. dont describe the recipe, just say what you did.\n \"\"\"\n\n # Define the model\n model = ChatOpenAI(model=\"gpt-4o-mini\")\n\n # Define config for the model\n if config is None:\n config = RunnableConfig(recursion_limit=25)\n\n # Use \"predict_state\" metadata to set up streaming for the write_document tool\n config[\"metadata\"][\"predict_state\"] = [{\n \"state_key\": \"recipe\",\n \"tool\": \"generate_recipe\",\n \"tool_argument\": \"recipe\"\n }]\n\n # Bind the tools to the model\n model_with_tools = model.bind_tools(\n [\n *state[\"tools\"],\n generate_recipe\n ],\n # Disable parallel tool calls to avoid race conditions\n parallel_tool_calls=False,\n )\n\n # Run the model and generate a response\n response = await model_with_tools.ainvoke([\n SystemMessage(content=system_prompt),\n *state[\"messages\"],\n ], config)\n\n # Update messages with the response\n messages = state[\"messages\"] + [response]\n\n # Handle tool calls\n if hasattr(response, \"tool_calls\") and response.tool_calls:\n # Handle dicts or object (backward compatibility)\n tool_call = (response.tool_calls[0]\n if isinstance(response.tool_calls[0], dict)\n else vars(response.tool_calls[0]))\n\n # Check if args is already a dict or needs to be parsed\n tool_call_args = (tool_call[\"args\"]\n if isinstance(tool_call[\"args\"], dict)\n else json.loads(tool_call[\"args\"]))\n\n if tool_call[\"name\"] == \"generate_recipe\":\n # Update recipe state with tool_call_args\n recipe_data = tool_call_args[\"recipe\"]\n\n # If we have an existing recipe, update it\n if \"recipe\" in state and state[\"recipe\"] is not None:\n recipe = state[\"recipe\"]\n for key, value in recipe_data.items():\n if value is not None: # Only update fields that were provided\n recipe[key] = value\n else:\n # Create a new recipe\n recipe = {\n \"skill_level\": recipe_data.get(\"skill_level\", SkillLevel.BEGINNER.value),\n \"special_preferences\": recipe_data.get(\"special_preferences\", []),\n \"cooking_time\": recipe_data.get(\"cooking_time\", CookingTime.FIFTEEN_MIN.value),\n \"ingredients\": recipe_data.get(\"ingredients\", []),\n \"instructions\": recipe_data.get(\"instructions\", [])\n }\n\n # Add tool response to messages\n tool_response = {\n \"role\": \"tool\",\n \"content\": \"Recipe generated.\",\n \"tool_call_id\": tool_call[\"id\"]\n }\n\n messages = messages + [tool_response]\n\n # Explicitly emit the updated state to ensure it's shared with frontend\n state[\"recipe\"] = recipe\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n # Return command with updated recipe\n return Command(\n goto=\"start_node\",\n update={\n \"messages\": messages,\n \"recipe\": recipe\n }\n )\n\n return Command(\n goto=END,\n update={\n \"messages\": messages,\n \"recipe\": state[\"recipe\"]\n }\n )\n\n\n# Define the graph\nworkflow = StateGraph(AgentState)\nworkflow.add_node(\"start_node\", start_node)\nworkflow.add_node(\"chat_node\", chat_node)\nworkflow.set_entry_point(\"start_node\")\nworkflow.add_edge(START, \"start_node\")\nworkflow.add_edge(\"start_node\", \"chat_node\")\nworkflow.add_edge(\"chat_node\", END)\n\n# Conditionally use a checkpointer based on the environment\n# Check for multiple indicators that we're running in LangGraph dev/API mode\nis_fast_api = os.environ.get(\"LANGGRAPH_FAST_API\", \"false\").lower() == \"true\"\n\n# Compile the graph\nif is_fast_api:\n # For CopilotKit and other contexts, use MemorySaver\n from langgraph.checkpoint.memory import MemorySaver\n memory = MemorySaver()\n graph = workflow.compile(checkpointer=memory)\nelse:\n # When running in LangGraph API/dev, don't use a custom checkpointer\n graph = workflow.compile()\n", + "content": "\"\"\"\nA demo of shared state between the agent and CopilotKit using LangGraph.\n\"\"\"\n\nimport json\nimport os\nfrom enum import Enum\nfrom typing import Any, Dict, List, Optional\n\nfrom langchain_core.callbacks.manager import adispatch_custom_event\nfrom langchain_core.messages import SystemMessage\nfrom langchain_core.runnables import RunnableConfig\nfrom langchain_core.tools import tool\nfrom langchain_openai import ChatOpenAI\nfrom langgraph.checkpoint.memory import MemorySaver\nfrom langgraph.graph import END, START, MessagesState, StateGraph\nfrom langgraph.types import Command\n\n# LangGraph imports\nfrom pydantic import BaseModel, Field\n\n\nclass SkillLevel(str, Enum):\n \"\"\"\n The level of skill required for the recipe.\n \"\"\"\n\n BEGINNER = \"Beginner\"\n INTERMEDIATE = \"Intermediate\"\n ADVANCED = \"Advanced\"\n\n\nclass SpecialPreferences(str, Enum):\n \"\"\"\n Special preferences for the recipe.\n \"\"\"\n\n HIGH_PROTEIN = \"High Protein\"\n LOW_CARB = \"Low Carb\"\n SPICY = \"Spicy\"\n BUDGET_FRIENDLY = \"Budget-Friendly\"\n ONE_POT_MEAL = \"One-Pot Meal\"\n VEGETARIAN = \"Vegetarian\"\n VEGAN = \"Vegan\"\n\n\nclass CookingTime(str, Enum):\n \"\"\"\n The cooking time of the recipe.\n \"\"\"\n\n FIVE_MIN = \"5 min\"\n FIFTEEN_MIN = \"15 min\"\n THIRTY_MIN = \"30 min\"\n FORTY_FIVE_MIN = \"45 min\"\n SIXTY_PLUS_MIN = \"60+ min\"\n\n\nclass Ingredient(BaseModel):\n \"\"\"\n An ingredient.\n \"\"\"\n\n icon: str = Field(description=\"Icon: the actual emoji like 🥕\")\n name: str = Field(description=\"The name of the ingredient\")\n amount: str = Field(description=\"The amount of the ingredient\")\n\n\nclass Recipe(BaseModel):\n \"\"\"\n A recipe.\n \"\"\"\n\n skill_level: SkillLevel = Field(\n description=\"The skill level required for the recipe\"\n )\n special_preferences: List[SpecialPreferences] = Field(\n description=\"A list of special preferences for the recipe\"\n )\n cooking_time: CookingTime = Field(description=\"The cooking time of the recipe\")\n ingredients: List[Ingredient] = Field(\n description=\"\"\"Entire list of ingredients for the recipe, including the new ingredients\n and the ones that are already in the recipe: Icon: the actual emoji like 🥕,\n name and amount.\n Like so: 🥕 Carrots (250g)\"\"\"\n )\n instructions: List[str] = Field(\n description=\"\"\"Entire list of instructions for the recipe,\n including the new instructions and the ones that are already there\"\"\"\n )\n changes: str = Field(description=\"A description of the changes made to the recipe\")\n\n\nclass GenerateRecipeArgs(BaseModel): # pylint: disable=missing-class-docstring\n recipe: Recipe\n\n\n@tool(args_schema=GenerateRecipeArgs)\ndef generate_recipe(recipe: Recipe): # pylint: disable=unused-argument\n \"\"\"\n Using the existing (if any) ingredients and instructions, proceed with the recipe to finish it.\n Make sure the recipe is complete. ALWAYS provide the entire recipe, not just the changes.\n \"\"\"\n\n\nclass AgentState(MessagesState):\n \"\"\"\n The state of the recipe.\n \"\"\"\n\n recipe: Optional[Dict[str, Any]] = None\n tools: List[Any]\n\n\nasync def start_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n This is the entry point for the flow.\n \"\"\"\n\n # Initialize recipe if not exists\n if \"recipe\" not in state or state[\"recipe\"] is None:\n state[\"recipe\"] = {\n \"skill_level\": SkillLevel.BEGINNER.value,\n \"special_preferences\": [],\n \"cooking_time\": CookingTime.FIFTEEN_MIN.value,\n \"ingredients\": [\n {\"icon\": \"🍴\", \"name\": \"Sample Ingredient\", \"amount\": \"1 unit\"}\n ],\n \"instructions\": [\"First step instruction\"],\n }\n # Emit the initial state to ensure it's properly shared with the frontend\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n return Command(\n goto=\"chat_node\",\n update={\"messages\": state[\"messages\"], \"recipe\": state[\"recipe\"]},\n )\n\n\nasync def chat_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n Standard chat node.\n \"\"\"\n # Create a safer serialization of the recipe\n recipe_json = \"No recipe yet\"\n if \"recipe\" in state and state[\"recipe\"] is not None:\n try:\n recipe_json = json.dumps(state[\"recipe\"], indent=2)\n except Exception as e: # pylint: disable=broad-exception-caught\n recipe_json = f\"Error serializing recipe: {str(e)}\"\n\n system_prompt = f\"\"\"You are a helpful assistant for creating recipes.\n This is the current state of the recipe: {recipe_json}\n You can improve the recipe by calling the generate_recipe tool.\n\n IMPORTANT:\n 1. Create a recipe using the existing ingredients and instructions. Make sure the recipe is complete.\n 2. For ingredients, append new ingredients to the existing ones.\n 3. For instructions, append new steps to the existing ones.\n 4. 'ingredients' is always an array of objects with 'icon', 'name', and 'amount' fields\n 5. 'instructions' is always an array of strings\n 6. For the 'icon' field in ingredients, ALWAYS use actual Unicode emoji characters (like 🥕 🍅 🧅 🥖 🧈 🥛 🧂 etc.), NEVER use text, ANSI codes, or placeholders\n\n If you have just created or modified the recipe, just answer in one sentence what you did. dont describe the recipe, just say what you did.\n \"\"\"\n\n # Define the model\n model = ChatOpenAI(model=\"gpt-4o-mini\")\n\n # Define config for the model\n if config is None:\n config = RunnableConfig(recursion_limit=25)\n\n # Use \"predict_state\" metadata to set up streaming for the write_document tool\n config[\"metadata\"][\"predict_state\"] = [\n {\"state_key\": \"recipe\", \"tool\": \"generate_recipe\", \"tool_argument\": \"recipe\"}\n ]\n\n # Bind the tools to the model\n model_with_tools = model.bind_tools(\n [*state[\"tools\"], generate_recipe],\n # Disable parallel tool calls to avoid race conditions\n parallel_tool_calls=False,\n )\n\n # Run the model and generate a response\n response = await model_with_tools.ainvoke(\n [\n SystemMessage(content=system_prompt),\n *state[\"messages\"],\n ],\n config,\n )\n\n # Update messages with the response\n messages = state[\"messages\"] + [response]\n\n # Handle tool calls\n if hasattr(response, \"tool_calls\") and response.tool_calls:\n # Handle dicts or object (backward compatibility)\n tool_call = (\n response.tool_calls[0]\n if isinstance(response.tool_calls[0], dict)\n else vars(response.tool_calls[0])\n )\n\n # Check if args is already a dict or needs to be parsed\n tool_call_args = (\n tool_call[\"args\"]\n if isinstance(tool_call[\"args\"], dict)\n else json.loads(tool_call[\"args\"])\n )\n\n if tool_call[\"name\"] == \"generate_recipe\":\n # Update recipe state with tool_call_args\n recipe_data = tool_call_args[\"recipe\"]\n\n # If we have an existing recipe, update it\n if \"recipe\" in state and state[\"recipe\"] is not None:\n recipe = state[\"recipe\"]\n for key, value in recipe_data.items():\n if value is not None: # Only update fields that were provided\n recipe[key] = value\n else:\n # Create a new recipe\n recipe = {\n \"skill_level\": recipe_data.get(\n \"skill_level\", SkillLevel.BEGINNER.value\n ),\n \"special_preferences\": recipe_data.get(\"special_preferences\", []),\n \"cooking_time\": recipe_data.get(\n \"cooking_time\", CookingTime.FIFTEEN_MIN.value\n ),\n \"ingredients\": recipe_data.get(\"ingredients\", []),\n \"instructions\": recipe_data.get(\"instructions\", []),\n }\n\n # Add tool response to messages\n tool_response = {\n \"role\": \"tool\",\n \"content\": \"Recipe generated.\",\n \"tool_call_id\": tool_call[\"id\"],\n }\n\n messages = messages + [tool_response]\n\n # Explicitly emit the updated state to ensure it's shared with frontend\n state[\"recipe\"] = recipe\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n # Return command with updated recipe\n return Command(\n goto=\"start_node\", update={\"messages\": messages, \"recipe\": recipe}\n )\n\n return Command(goto=END, update={\"messages\": messages, \"recipe\": state[\"recipe\"]})\n\n\n# Define the graph\nworkflow = StateGraph(AgentState)\nworkflow.add_node(\"start_node\", start_node)\nworkflow.add_node(\"chat_node\", chat_node)\nworkflow.set_entry_point(\"start_node\")\nworkflow.add_edge(START, \"start_node\")\nworkflow.add_edge(\"start_node\", \"chat_node\")\nworkflow.add_edge(\"chat_node\", END)\n\n# Conditionally use a checkpointer based on the environment\n# Check for multiple indicators that we're running in LangGraph dev/API mode\nis_fast_api = os.environ.get(\"LANGGRAPH_FAST_API\", \"false\").lower() == \"true\"\n\n# Compile the graph\nif is_fast_api:\n # For CopilotKit and other contexts, use MemorySaver\n from langgraph.checkpoint.memory import MemorySaver\n\n memory = MemorySaver()\n graph = workflow.compile(checkpointer=memory)\nelse:\n # When running in LangGraph API/dev, don't use a custom checkpointer\n graph = workflow.compile()\n", "language": "python", "type": "file" }, @@ -1213,7 +1213,7 @@ }, { "name": "agent.py", - "content": "\"\"\"\nA demo of shared state between the agent and CopilotKit using LangGraph.\n\"\"\"\n\nimport json\nfrom enum import Enum\nfrom typing import Dict, List, Any, Optional\nimport os\n\n# LangGraph imports\nfrom pydantic import BaseModel, Field\nfrom langchain_core.runnables import RunnableConfig\nfrom langchain_core.callbacks.manager import adispatch_custom_event\nfrom langchain_core.messages import SystemMessage\nfrom langchain_core.tools import tool\nfrom langchain_openai import ChatOpenAI\nfrom langgraph.graph import StateGraph, END, START\nfrom langgraph.types import Command\nfrom langgraph.graph import MessagesState\nfrom langgraph.checkpoint.memory import MemorySaver\n\nclass SkillLevel(str, Enum):\n \"\"\"\n The level of skill required for the recipe.\n \"\"\"\n BEGINNER = \"Beginner\"\n INTERMEDIATE = \"Intermediate\"\n ADVANCED = \"Advanced\"\n\nclass SpecialPreferences(str, Enum):\n \"\"\"\n Special preferences for the recipe.\n \"\"\"\n HIGH_PROTEIN = \"High Protein\"\n LOW_CARB = \"Low Carb\"\n SPICY = \"Spicy\"\n BUDGET_FRIENDLY = \"Budget-Friendly\"\n ONE_POT_MEAL = \"One-Pot Meal\"\n VEGETARIAN = \"Vegetarian\"\n VEGAN = \"Vegan\"\n\nclass CookingTime(str, Enum):\n \"\"\"\n The cooking time of the recipe.\n \"\"\"\n FIVE_MIN = \"5 min\"\n FIFTEEN_MIN = \"15 min\"\n THIRTY_MIN = \"30 min\"\n FORTY_FIVE_MIN = \"45 min\"\n SIXTY_PLUS_MIN = \"60+ min\"\n\nclass Ingredient(BaseModel):\n \"\"\"\n An ingredient.\n \"\"\"\n icon: str = Field(\n description=\"Icon: the actual emoji like 🥕\"\n )\n name: str = Field(description=\"The name of the ingredient\")\n amount: str = Field(description=\"The amount of the ingredient\")\n\nclass Recipe(BaseModel):\n \"\"\"\n A recipe.\n \"\"\"\n skill_level: SkillLevel = \\\n Field(description=\"The skill level required for the recipe\")\n special_preferences: List[SpecialPreferences] = \\\n Field(description=\"A list of special preferences for the recipe\")\n cooking_time: CookingTime = \\\n Field(description=\"The cooking time of the recipe\")\n ingredients: List[Ingredient] = \\\n Field(description=\n \"\"\"Entire list of ingredients for the recipe, including the new ingredients\n and the ones that are already in the recipe: Icon: the actual emoji like 🥕,\n name and amount.\n Like so: 🥕 Carrots (250g)\"\"\"\n )\n instructions: List[str] = \\\n Field(description=\n \"\"\"Entire list of instructions for the recipe,\n including the new instructions and the ones that are already there\"\"\"\n )\n changes: str = \\\n Field(description=\"A description of the changes made to the recipe\")\n\nclass GenerateRecipeArgs(BaseModel): # pylint: disable=missing-class-docstring\n recipe: Recipe\n\n@tool(args_schema=GenerateRecipeArgs)\ndef generate_recipe(recipe: Recipe): # pylint: disable=unused-argument\n \"\"\"\n Using the existing (if any) ingredients and instructions, proceed with the recipe to finish it.\n Make sure the recipe is complete. ALWAYS provide the entire recipe, not just the changes.\n \"\"\"\n\nclass AgentState(MessagesState):\n \"\"\"\n The state of the recipe.\n \"\"\"\n recipe: Optional[Dict[str, Any]] = None\n tools: List[Any]\n\n\nasync def start_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n This is the entry point for the flow.\n \"\"\"\n\n # Initialize recipe if not exists\n if \"recipe\" not in state or state[\"recipe\"] is None:\n state[\"recipe\"] = {\n \"skill_level\": SkillLevel.BEGINNER.value,\n \"special_preferences\": [],\n \"cooking_time\": CookingTime.FIFTEEN_MIN.value,\n \"ingredients\": [{\"icon\": \"🍴\", \"name\": \"Sample Ingredient\", \"amount\": \"1 unit\"}],\n \"instructions\": [\"First step instruction\"]\n }\n # Emit the initial state to ensure it's properly shared with the frontend\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n return Command(\n goto=\"chat_node\",\n update={\n \"messages\": state[\"messages\"],\n \"recipe\": state[\"recipe\"]\n }\n )\n\nasync def chat_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n Standard chat node.\n \"\"\"\n # Create a safer serialization of the recipe\n recipe_json = \"No recipe yet\"\n if \"recipe\" in state and state[\"recipe\"] is not None:\n try:\n recipe_json = json.dumps(state[\"recipe\"], indent=2)\n except Exception as e: # pylint: disable=broad-exception-caught\n recipe_json = f\"Error serializing recipe: {str(e)}\"\n\n system_prompt = f\"\"\"You are a helpful assistant for creating recipes. \n This is the current state of the recipe: {recipe_json}\n You can improve the recipe by calling the generate_recipe tool.\n \n IMPORTANT:\n 1. Create a recipe using the existing ingredients and instructions. Make sure the recipe is complete.\n 2. For ingredients, append new ingredients to the existing ones.\n 3. For instructions, append new steps to the existing ones.\n 4. 'ingredients' is always an array of objects with 'icon', 'name', and 'amount' fields\n 5. 'instructions' is always an array of strings\n\n If you have just created or modified the recipe, just answer in one sentence what you did. dont describe the recipe, just say what you did.\n \"\"\"\n\n # Define the model\n model = ChatOpenAI(model=\"gpt-4o-mini\")\n\n # Define config for the model\n if config is None:\n config = RunnableConfig(recursion_limit=25)\n\n # Use \"predict_state\" metadata to set up streaming for the write_document tool\n config[\"metadata\"][\"predict_state\"] = [{\n \"state_key\": \"recipe\",\n \"tool\": \"generate_recipe\",\n \"tool_argument\": \"recipe\"\n }]\n\n # Bind the tools to the model\n model_with_tools = model.bind_tools(\n [\n *state[\"tools\"],\n generate_recipe\n ],\n # Disable parallel tool calls to avoid race conditions\n parallel_tool_calls=False,\n )\n\n # Run the model and generate a response\n response = await model_with_tools.ainvoke([\n SystemMessage(content=system_prompt),\n *state[\"messages\"],\n ], config)\n\n # Update messages with the response\n messages = state[\"messages\"] + [response]\n\n # Handle tool calls\n if hasattr(response, \"tool_calls\") and response.tool_calls:\n # Handle dicts or object (backward compatibility)\n tool_call = (response.tool_calls[0]\n if isinstance(response.tool_calls[0], dict)\n else vars(response.tool_calls[0]))\n\n # Check if args is already a dict or needs to be parsed\n tool_call_args = (tool_call[\"args\"]\n if isinstance(tool_call[\"args\"], dict)\n else json.loads(tool_call[\"args\"]))\n\n if tool_call[\"name\"] == \"generate_recipe\":\n # Update recipe state with tool_call_args\n recipe_data = tool_call_args[\"recipe\"]\n\n # If we have an existing recipe, update it\n if \"recipe\" in state and state[\"recipe\"] is not None:\n recipe = state[\"recipe\"]\n for key, value in recipe_data.items():\n if value is not None: # Only update fields that were provided\n recipe[key] = value\n else:\n # Create a new recipe\n recipe = {\n \"skill_level\": recipe_data.get(\"skill_level\", SkillLevel.BEGINNER.value),\n \"special_preferences\": recipe_data.get(\"special_preferences\", []),\n \"cooking_time\": recipe_data.get(\"cooking_time\", CookingTime.FIFTEEN_MIN.value),\n \"ingredients\": recipe_data.get(\"ingredients\", []),\n \"instructions\": recipe_data.get(\"instructions\", [])\n }\n\n # Add tool response to messages\n tool_response = {\n \"role\": \"tool\",\n \"content\": \"Recipe generated.\",\n \"tool_call_id\": tool_call[\"id\"]\n }\n\n messages = messages + [tool_response]\n\n # Explicitly emit the updated state to ensure it's shared with frontend\n state[\"recipe\"] = recipe\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n # Return command with updated recipe\n return Command(\n goto=\"start_node\",\n update={\n \"messages\": messages,\n \"recipe\": recipe\n }\n )\n\n return Command(\n goto=END,\n update={\n \"messages\": messages,\n \"recipe\": state[\"recipe\"]\n }\n )\n\n\n# Define the graph\nworkflow = StateGraph(AgentState)\nworkflow.add_node(\"start_node\", start_node)\nworkflow.add_node(\"chat_node\", chat_node)\nworkflow.set_entry_point(\"start_node\")\nworkflow.add_edge(START, \"start_node\")\nworkflow.add_edge(\"start_node\", \"chat_node\")\nworkflow.add_edge(\"chat_node\", END)\n\n# Conditionally use a checkpointer based on the environment\n# Check for multiple indicators that we're running in LangGraph dev/API mode\nis_fast_api = os.environ.get(\"LANGGRAPH_FAST_API\", \"false\").lower() == \"true\"\n\n# Compile the graph\nif is_fast_api:\n # For CopilotKit and other contexts, use MemorySaver\n from langgraph.checkpoint.memory import MemorySaver\n memory = MemorySaver()\n graph = workflow.compile(checkpointer=memory)\nelse:\n # When running in LangGraph API/dev, don't use a custom checkpointer\n graph = workflow.compile()\n", + "content": "\"\"\"\nA demo of shared state between the agent and CopilotKit using LangGraph.\n\"\"\"\n\nimport json\nimport os\nfrom enum import Enum\nfrom typing import Any, Dict, List, Optional\n\nfrom langchain_core.callbacks.manager import adispatch_custom_event\nfrom langchain_core.messages import SystemMessage\nfrom langchain_core.runnables import RunnableConfig\nfrom langchain_core.tools import tool\nfrom langchain_openai import ChatOpenAI\nfrom langgraph.checkpoint.memory import MemorySaver\nfrom langgraph.graph import END, START, MessagesState, StateGraph\nfrom langgraph.types import Command\n\n# LangGraph imports\nfrom pydantic import BaseModel, Field\n\n\nclass SkillLevel(str, Enum):\n \"\"\"\n The level of skill required for the recipe.\n \"\"\"\n\n BEGINNER = \"Beginner\"\n INTERMEDIATE = \"Intermediate\"\n ADVANCED = \"Advanced\"\n\n\nclass SpecialPreferences(str, Enum):\n \"\"\"\n Special preferences for the recipe.\n \"\"\"\n\n HIGH_PROTEIN = \"High Protein\"\n LOW_CARB = \"Low Carb\"\n SPICY = \"Spicy\"\n BUDGET_FRIENDLY = \"Budget-Friendly\"\n ONE_POT_MEAL = \"One-Pot Meal\"\n VEGETARIAN = \"Vegetarian\"\n VEGAN = \"Vegan\"\n\n\nclass CookingTime(str, Enum):\n \"\"\"\n The cooking time of the recipe.\n \"\"\"\n\n FIVE_MIN = \"5 min\"\n FIFTEEN_MIN = \"15 min\"\n THIRTY_MIN = \"30 min\"\n FORTY_FIVE_MIN = \"45 min\"\n SIXTY_PLUS_MIN = \"60+ min\"\n\n\nclass Ingredient(BaseModel):\n \"\"\"\n An ingredient.\n \"\"\"\n\n icon: str = Field(description=\"Icon: the actual emoji like 🥕\")\n name: str = Field(description=\"The name of the ingredient\")\n amount: str = Field(description=\"The amount of the ingredient\")\n\n\nclass Recipe(BaseModel):\n \"\"\"\n A recipe.\n \"\"\"\n\n skill_level: SkillLevel = Field(\n description=\"The skill level required for the recipe\"\n )\n special_preferences: List[SpecialPreferences] = Field(\n description=\"A list of special preferences for the recipe\"\n )\n cooking_time: CookingTime = Field(description=\"The cooking time of the recipe\")\n ingredients: List[Ingredient] = Field(\n description=\"\"\"Entire list of ingredients for the recipe, including the new ingredients\n and the ones that are already in the recipe: Icon: the actual emoji like 🥕,\n name and amount.\n Like so: 🥕 Carrots (250g)\"\"\"\n )\n instructions: List[str] = Field(\n description=\"\"\"Entire list of instructions for the recipe,\n including the new instructions and the ones that are already there\"\"\"\n )\n changes: str = Field(description=\"A description of the changes made to the recipe\")\n\n\nclass GenerateRecipeArgs(BaseModel): # pylint: disable=missing-class-docstring\n recipe: Recipe\n\n\n@tool(args_schema=GenerateRecipeArgs)\ndef generate_recipe(recipe: Recipe): # pylint: disable=unused-argument\n \"\"\"\n Using the existing (if any) ingredients and instructions, proceed with the recipe to finish it.\n Make sure the recipe is complete. ALWAYS provide the entire recipe, not just the changes.\n \"\"\"\n\n\nclass AgentState(MessagesState):\n \"\"\"\n The state of the recipe.\n \"\"\"\n\n recipe: Optional[Dict[str, Any]] = None\n tools: List[Any]\n\n\nasync def start_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n This is the entry point for the flow.\n \"\"\"\n\n # Initialize recipe if not exists\n if \"recipe\" not in state or state[\"recipe\"] is None:\n state[\"recipe\"] = {\n \"skill_level\": SkillLevel.BEGINNER.value,\n \"special_preferences\": [],\n \"cooking_time\": CookingTime.FIFTEEN_MIN.value,\n \"ingredients\": [\n {\"icon\": \"🍴\", \"name\": \"Sample Ingredient\", \"amount\": \"1 unit\"}\n ],\n \"instructions\": [\"First step instruction\"],\n }\n # Emit the initial state to ensure it's properly shared with the frontend\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n return Command(\n goto=\"chat_node\",\n update={\"messages\": state[\"messages\"], \"recipe\": state[\"recipe\"]},\n )\n\n\nasync def chat_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n Standard chat node.\n \"\"\"\n # Create a safer serialization of the recipe\n recipe_json = \"No recipe yet\"\n if \"recipe\" in state and state[\"recipe\"] is not None:\n try:\n recipe_json = json.dumps(state[\"recipe\"], indent=2)\n except Exception as e: # pylint: disable=broad-exception-caught\n recipe_json = f\"Error serializing recipe: {str(e)}\"\n\n system_prompt = f\"\"\"You are a helpful assistant for creating recipes.\n This is the current state of the recipe: {recipe_json}\n You can improve the recipe by calling the generate_recipe tool.\n\n IMPORTANT:\n 1. Create a recipe using the existing ingredients and instructions. Make sure the recipe is complete.\n 2. For ingredients, append new ingredients to the existing ones.\n 3. For instructions, append new steps to the existing ones.\n 4. 'ingredients' is always an array of objects with 'icon', 'name', and 'amount' fields\n 5. 'instructions' is always an array of strings\n 6. For the 'icon' field in ingredients, ALWAYS use actual Unicode emoji characters (like 🥕 🍅 🧅 🥖 🧈 🥛 🧂 etc.), NEVER use text, ANSI codes, or placeholders\n\n If you have just created or modified the recipe, just answer in one sentence what you did. dont describe the recipe, just say what you did.\n \"\"\"\n\n # Define the model\n model = ChatOpenAI(model=\"gpt-4o-mini\")\n\n # Define config for the model\n if config is None:\n config = RunnableConfig(recursion_limit=25)\n\n # Use \"predict_state\" metadata to set up streaming for the write_document tool\n config[\"metadata\"][\"predict_state\"] = [\n {\"state_key\": \"recipe\", \"tool\": \"generate_recipe\", \"tool_argument\": \"recipe\"}\n ]\n\n # Bind the tools to the model\n model_with_tools = model.bind_tools(\n [*state[\"tools\"], generate_recipe],\n # Disable parallel tool calls to avoid race conditions\n parallel_tool_calls=False,\n )\n\n # Run the model and generate a response\n response = await model_with_tools.ainvoke(\n [\n SystemMessage(content=system_prompt),\n *state[\"messages\"],\n ],\n config,\n )\n\n # Update messages with the response\n messages = state[\"messages\"] + [response]\n\n # Handle tool calls\n if hasattr(response, \"tool_calls\") and response.tool_calls:\n # Handle dicts or object (backward compatibility)\n tool_call = (\n response.tool_calls[0]\n if isinstance(response.tool_calls[0], dict)\n else vars(response.tool_calls[0])\n )\n\n # Check if args is already a dict or needs to be parsed\n tool_call_args = (\n tool_call[\"args\"]\n if isinstance(tool_call[\"args\"], dict)\n else json.loads(tool_call[\"args\"])\n )\n\n if tool_call[\"name\"] == \"generate_recipe\":\n # Update recipe state with tool_call_args\n recipe_data = tool_call_args[\"recipe\"]\n\n # If we have an existing recipe, update it\n if \"recipe\" in state and state[\"recipe\"] is not None:\n recipe = state[\"recipe\"]\n for key, value in recipe_data.items():\n if value is not None: # Only update fields that were provided\n recipe[key] = value\n else:\n # Create a new recipe\n recipe = {\n \"skill_level\": recipe_data.get(\n \"skill_level\", SkillLevel.BEGINNER.value\n ),\n \"special_preferences\": recipe_data.get(\"special_preferences\", []),\n \"cooking_time\": recipe_data.get(\n \"cooking_time\", CookingTime.FIFTEEN_MIN.value\n ),\n \"ingredients\": recipe_data.get(\"ingredients\", []),\n \"instructions\": recipe_data.get(\"instructions\", []),\n }\n\n # Add tool response to messages\n tool_response = {\n \"role\": \"tool\",\n \"content\": \"Recipe generated.\",\n \"tool_call_id\": tool_call[\"id\"],\n }\n\n messages = messages + [tool_response]\n\n # Explicitly emit the updated state to ensure it's shared with frontend\n state[\"recipe\"] = recipe\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n # Return command with updated recipe\n return Command(\n goto=\"start_node\", update={\"messages\": messages, \"recipe\": recipe}\n )\n\n return Command(goto=END, update={\"messages\": messages, \"recipe\": state[\"recipe\"]})\n\n\n# Define the graph\nworkflow = StateGraph(AgentState)\nworkflow.add_node(\"start_node\", start_node)\nworkflow.add_node(\"chat_node\", chat_node)\nworkflow.set_entry_point(\"start_node\")\nworkflow.add_edge(START, \"start_node\")\nworkflow.add_edge(\"start_node\", \"chat_node\")\nworkflow.add_edge(\"chat_node\", END)\n\n# Conditionally use a checkpointer based on the environment\n# Check for multiple indicators that we're running in LangGraph dev/API mode\nis_fast_api = os.environ.get(\"LANGGRAPH_FAST_API\", \"false\").lower() == \"true\"\n\n# Compile the graph\nif is_fast_api:\n # For CopilotKit and other contexts, use MemorySaver\n from langgraph.checkpoint.memory import MemorySaver\n\n memory = MemorySaver()\n graph = workflow.compile(checkpointer=memory)\nelse:\n # When running in LangGraph API/dev, don't use a custom checkpointer\n graph = workflow.compile()\n", "language": "python", "type": "file" } @@ -1445,7 +1445,7 @@ }, { "name": "agent.py", - "content": "\"\"\"\nA demo of shared state between the agent and CopilotKit using LangGraph.\n\"\"\"\n\nimport json\nfrom enum import Enum\nfrom typing import Dict, List, Any, Optional\nimport os\n\n# LangGraph imports\nfrom pydantic import BaseModel, Field\nfrom langchain_core.runnables import RunnableConfig\nfrom langchain_core.callbacks.manager import adispatch_custom_event\nfrom langchain_core.messages import SystemMessage\nfrom langchain_core.tools import tool\nfrom langchain_openai import ChatOpenAI\nfrom langgraph.graph import StateGraph, END, START\nfrom langgraph.types import Command\nfrom langgraph.graph import MessagesState\nfrom langgraph.checkpoint.memory import MemorySaver\n\nclass SkillLevel(str, Enum):\n \"\"\"\n The level of skill required for the recipe.\n \"\"\"\n BEGINNER = \"Beginner\"\n INTERMEDIATE = \"Intermediate\"\n ADVANCED = \"Advanced\"\n\nclass SpecialPreferences(str, Enum):\n \"\"\"\n Special preferences for the recipe.\n \"\"\"\n HIGH_PROTEIN = \"High Protein\"\n LOW_CARB = \"Low Carb\"\n SPICY = \"Spicy\"\n BUDGET_FRIENDLY = \"Budget-Friendly\"\n ONE_POT_MEAL = \"One-Pot Meal\"\n VEGETARIAN = \"Vegetarian\"\n VEGAN = \"Vegan\"\n\nclass CookingTime(str, Enum):\n \"\"\"\n The cooking time of the recipe.\n \"\"\"\n FIVE_MIN = \"5 min\"\n FIFTEEN_MIN = \"15 min\"\n THIRTY_MIN = \"30 min\"\n FORTY_FIVE_MIN = \"45 min\"\n SIXTY_PLUS_MIN = \"60+ min\"\n\nclass Ingredient(BaseModel):\n \"\"\"\n An ingredient.\n \"\"\"\n icon: str = Field(\n description=\"Icon: the actual emoji like 🥕\"\n )\n name: str = Field(description=\"The name of the ingredient\")\n amount: str = Field(description=\"The amount of the ingredient\")\n\nclass Recipe(BaseModel):\n \"\"\"\n A recipe.\n \"\"\"\n skill_level: SkillLevel = \\\n Field(description=\"The skill level required for the recipe\")\n special_preferences: List[SpecialPreferences] = \\\n Field(description=\"A list of special preferences for the recipe\")\n cooking_time: CookingTime = \\\n Field(description=\"The cooking time of the recipe\")\n ingredients: List[Ingredient] = \\\n Field(description=\n \"\"\"Entire list of ingredients for the recipe, including the new ingredients\n and the ones that are already in the recipe: Icon: the actual emoji like 🥕,\n name and amount.\n Like so: 🥕 Carrots (250g)\"\"\"\n )\n instructions: List[str] = \\\n Field(description=\n \"\"\"Entire list of instructions for the recipe,\n including the new instructions and the ones that are already there\"\"\"\n )\n changes: str = \\\n Field(description=\"A description of the changes made to the recipe\")\n\nclass GenerateRecipeArgs(BaseModel): # pylint: disable=missing-class-docstring\n recipe: Recipe\n\n@tool(args_schema=GenerateRecipeArgs)\ndef generate_recipe(recipe: Recipe): # pylint: disable=unused-argument\n \"\"\"\n Using the existing (if any) ingredients and instructions, proceed with the recipe to finish it.\n Make sure the recipe is complete. ALWAYS provide the entire recipe, not just the changes.\n \"\"\"\n\nclass AgentState(MessagesState):\n \"\"\"\n The state of the recipe.\n \"\"\"\n recipe: Optional[Dict[str, Any]] = None\n tools: List[Any]\n\n\nasync def start_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n This is the entry point for the flow.\n \"\"\"\n\n # Initialize recipe if not exists\n if \"recipe\" not in state or state[\"recipe\"] is None:\n state[\"recipe\"] = {\n \"skill_level\": SkillLevel.BEGINNER.value,\n \"special_preferences\": [],\n \"cooking_time\": CookingTime.FIFTEEN_MIN.value,\n \"ingredients\": [{\"icon\": \"🍴\", \"name\": \"Sample Ingredient\", \"amount\": \"1 unit\"}],\n \"instructions\": [\"First step instruction\"]\n }\n # Emit the initial state to ensure it's properly shared with the frontend\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n return Command(\n goto=\"chat_node\",\n update={\n \"messages\": state[\"messages\"],\n \"recipe\": state[\"recipe\"]\n }\n )\n\nasync def chat_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n Standard chat node.\n \"\"\"\n # Create a safer serialization of the recipe\n recipe_json = \"No recipe yet\"\n if \"recipe\" in state and state[\"recipe\"] is not None:\n try:\n recipe_json = json.dumps(state[\"recipe\"], indent=2)\n except Exception as e: # pylint: disable=broad-exception-caught\n recipe_json = f\"Error serializing recipe: {str(e)}\"\n\n system_prompt = f\"\"\"You are a helpful assistant for creating recipes. \n This is the current state of the recipe: {recipe_json}\n You can improve the recipe by calling the generate_recipe tool.\n \n IMPORTANT:\n 1. Create a recipe using the existing ingredients and instructions. Make sure the recipe is complete.\n 2. For ingredients, append new ingredients to the existing ones.\n 3. For instructions, append new steps to the existing ones.\n 4. 'ingredients' is always an array of objects with 'icon', 'name', and 'amount' fields\n 5. 'instructions' is always an array of strings\n\n If you have just created or modified the recipe, just answer in one sentence what you did. dont describe the recipe, just say what you did.\n \"\"\"\n\n # Define the model\n model = ChatOpenAI(model=\"gpt-4o-mini\")\n\n # Define config for the model\n if config is None:\n config = RunnableConfig(recursion_limit=25)\n\n # Use \"predict_state\" metadata to set up streaming for the write_document tool\n config[\"metadata\"][\"predict_state\"] = [{\n \"state_key\": \"recipe\",\n \"tool\": \"generate_recipe\",\n \"tool_argument\": \"recipe\"\n }]\n\n # Bind the tools to the model\n model_with_tools = model.bind_tools(\n [\n *state[\"tools\"],\n generate_recipe\n ],\n # Disable parallel tool calls to avoid race conditions\n parallel_tool_calls=False,\n )\n\n # Run the model and generate a response\n response = await model_with_tools.ainvoke([\n SystemMessage(content=system_prompt),\n *state[\"messages\"],\n ], config)\n\n # Update messages with the response\n messages = state[\"messages\"] + [response]\n\n # Handle tool calls\n if hasattr(response, \"tool_calls\") and response.tool_calls:\n # Handle dicts or object (backward compatibility)\n tool_call = (response.tool_calls[0]\n if isinstance(response.tool_calls[0], dict)\n else vars(response.tool_calls[0]))\n\n # Check if args is already a dict or needs to be parsed\n tool_call_args = (tool_call[\"args\"]\n if isinstance(tool_call[\"args\"], dict)\n else json.loads(tool_call[\"args\"]))\n\n if tool_call[\"name\"] == \"generate_recipe\":\n # Update recipe state with tool_call_args\n recipe_data = tool_call_args[\"recipe\"]\n\n # If we have an existing recipe, update it\n if \"recipe\" in state and state[\"recipe\"] is not None:\n recipe = state[\"recipe\"]\n for key, value in recipe_data.items():\n if value is not None: # Only update fields that were provided\n recipe[key] = value\n else:\n # Create a new recipe\n recipe = {\n \"skill_level\": recipe_data.get(\"skill_level\", SkillLevel.BEGINNER.value),\n \"special_preferences\": recipe_data.get(\"special_preferences\", []),\n \"cooking_time\": recipe_data.get(\"cooking_time\", CookingTime.FIFTEEN_MIN.value),\n \"ingredients\": recipe_data.get(\"ingredients\", []),\n \"instructions\": recipe_data.get(\"instructions\", [])\n }\n\n # Add tool response to messages\n tool_response = {\n \"role\": \"tool\",\n \"content\": \"Recipe generated.\",\n \"tool_call_id\": tool_call[\"id\"]\n }\n\n messages = messages + [tool_response]\n\n # Explicitly emit the updated state to ensure it's shared with frontend\n state[\"recipe\"] = recipe\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n # Return command with updated recipe\n return Command(\n goto=\"start_node\",\n update={\n \"messages\": messages,\n \"recipe\": recipe\n }\n )\n\n return Command(\n goto=END,\n update={\n \"messages\": messages,\n \"recipe\": state[\"recipe\"]\n }\n )\n\n\n# Define the graph\nworkflow = StateGraph(AgentState)\nworkflow.add_node(\"start_node\", start_node)\nworkflow.add_node(\"chat_node\", chat_node)\nworkflow.set_entry_point(\"start_node\")\nworkflow.add_edge(START, \"start_node\")\nworkflow.add_edge(\"start_node\", \"chat_node\")\nworkflow.add_edge(\"chat_node\", END)\n\n# Conditionally use a checkpointer based on the environment\n# Check for multiple indicators that we're running in LangGraph dev/API mode\nis_fast_api = os.environ.get(\"LANGGRAPH_FAST_API\", \"false\").lower() == \"true\"\n\n# Compile the graph\nif is_fast_api:\n # For CopilotKit and other contexts, use MemorySaver\n from langgraph.checkpoint.memory import MemorySaver\n memory = MemorySaver()\n graph = workflow.compile(checkpointer=memory)\nelse:\n # When running in LangGraph API/dev, don't use a custom checkpointer\n graph = workflow.compile()\n", + "content": "\"\"\"\nA demo of shared state between the agent and CopilotKit using LangGraph.\n\"\"\"\n\nimport json\nimport os\nfrom enum import Enum\nfrom typing import Any, Dict, List, Optional\n\nfrom langchain_core.callbacks.manager import adispatch_custom_event\nfrom langchain_core.messages import SystemMessage\nfrom langchain_core.runnables import RunnableConfig\nfrom langchain_core.tools import tool\nfrom langchain_openai import ChatOpenAI\nfrom langgraph.checkpoint.memory import MemorySaver\nfrom langgraph.graph import END, START, MessagesState, StateGraph\nfrom langgraph.types import Command\n\n# LangGraph imports\nfrom pydantic import BaseModel, Field\n\n\nclass SkillLevel(str, Enum):\n \"\"\"\n The level of skill required for the recipe.\n \"\"\"\n\n BEGINNER = \"Beginner\"\n INTERMEDIATE = \"Intermediate\"\n ADVANCED = \"Advanced\"\n\n\nclass SpecialPreferences(str, Enum):\n \"\"\"\n Special preferences for the recipe.\n \"\"\"\n\n HIGH_PROTEIN = \"High Protein\"\n LOW_CARB = \"Low Carb\"\n SPICY = \"Spicy\"\n BUDGET_FRIENDLY = \"Budget-Friendly\"\n ONE_POT_MEAL = \"One-Pot Meal\"\n VEGETARIAN = \"Vegetarian\"\n VEGAN = \"Vegan\"\n\n\nclass CookingTime(str, Enum):\n \"\"\"\n The cooking time of the recipe.\n \"\"\"\n\n FIVE_MIN = \"5 min\"\n FIFTEEN_MIN = \"15 min\"\n THIRTY_MIN = \"30 min\"\n FORTY_FIVE_MIN = \"45 min\"\n SIXTY_PLUS_MIN = \"60+ min\"\n\n\nclass Ingredient(BaseModel):\n \"\"\"\n An ingredient.\n \"\"\"\n\n icon: str = Field(description=\"Icon: the actual emoji like 🥕\")\n name: str = Field(description=\"The name of the ingredient\")\n amount: str = Field(description=\"The amount of the ingredient\")\n\n\nclass Recipe(BaseModel):\n \"\"\"\n A recipe.\n \"\"\"\n\n skill_level: SkillLevel = Field(\n description=\"The skill level required for the recipe\"\n )\n special_preferences: List[SpecialPreferences] = Field(\n description=\"A list of special preferences for the recipe\"\n )\n cooking_time: CookingTime = Field(description=\"The cooking time of the recipe\")\n ingredients: List[Ingredient] = Field(\n description=\"\"\"Entire list of ingredients for the recipe, including the new ingredients\n and the ones that are already in the recipe: Icon: the actual emoji like 🥕,\n name and amount.\n Like so: 🥕 Carrots (250g)\"\"\"\n )\n instructions: List[str] = Field(\n description=\"\"\"Entire list of instructions for the recipe,\n including the new instructions and the ones that are already there\"\"\"\n )\n changes: str = Field(description=\"A description of the changes made to the recipe\")\n\n\nclass GenerateRecipeArgs(BaseModel): # pylint: disable=missing-class-docstring\n recipe: Recipe\n\n\n@tool(args_schema=GenerateRecipeArgs)\ndef generate_recipe(recipe: Recipe): # pylint: disable=unused-argument\n \"\"\"\n Using the existing (if any) ingredients and instructions, proceed with the recipe to finish it.\n Make sure the recipe is complete. ALWAYS provide the entire recipe, not just the changes.\n \"\"\"\n\n\nclass AgentState(MessagesState):\n \"\"\"\n The state of the recipe.\n \"\"\"\n\n recipe: Optional[Dict[str, Any]] = None\n tools: List[Any]\n\n\nasync def start_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n This is the entry point for the flow.\n \"\"\"\n\n # Initialize recipe if not exists\n if \"recipe\" not in state or state[\"recipe\"] is None:\n state[\"recipe\"] = {\n \"skill_level\": SkillLevel.BEGINNER.value,\n \"special_preferences\": [],\n \"cooking_time\": CookingTime.FIFTEEN_MIN.value,\n \"ingredients\": [\n {\"icon\": \"🍴\", \"name\": \"Sample Ingredient\", \"amount\": \"1 unit\"}\n ],\n \"instructions\": [\"First step instruction\"],\n }\n # Emit the initial state to ensure it's properly shared with the frontend\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n return Command(\n goto=\"chat_node\",\n update={\"messages\": state[\"messages\"], \"recipe\": state[\"recipe\"]},\n )\n\n\nasync def chat_node(state: Dict[str, Any], config: RunnableConfig):\n \"\"\"\n Standard chat node.\n \"\"\"\n # Create a safer serialization of the recipe\n recipe_json = \"No recipe yet\"\n if \"recipe\" in state and state[\"recipe\"] is not None:\n try:\n recipe_json = json.dumps(state[\"recipe\"], indent=2)\n except Exception as e: # pylint: disable=broad-exception-caught\n recipe_json = f\"Error serializing recipe: {str(e)}\"\n\n system_prompt = f\"\"\"You are a helpful assistant for creating recipes.\n This is the current state of the recipe: {recipe_json}\n You can improve the recipe by calling the generate_recipe tool.\n\n IMPORTANT:\n 1. Create a recipe using the existing ingredients and instructions. Make sure the recipe is complete.\n 2. For ingredients, append new ingredients to the existing ones.\n 3. For instructions, append new steps to the existing ones.\n 4. 'ingredients' is always an array of objects with 'icon', 'name', and 'amount' fields\n 5. 'instructions' is always an array of strings\n 6. For the 'icon' field in ingredients, ALWAYS use actual Unicode emoji characters (like 🥕 🍅 🧅 🥖 🧈 🥛 🧂 etc.), NEVER use text, ANSI codes, or placeholders\n\n If you have just created or modified the recipe, just answer in one sentence what you did. dont describe the recipe, just say what you did.\n \"\"\"\n\n # Define the model\n model = ChatOpenAI(model=\"gpt-4o-mini\")\n\n # Define config for the model\n if config is None:\n config = RunnableConfig(recursion_limit=25)\n\n # Use \"predict_state\" metadata to set up streaming for the write_document tool\n config[\"metadata\"][\"predict_state\"] = [\n {\"state_key\": \"recipe\", \"tool\": \"generate_recipe\", \"tool_argument\": \"recipe\"}\n ]\n\n # Bind the tools to the model\n model_with_tools = model.bind_tools(\n [*state[\"tools\"], generate_recipe],\n # Disable parallel tool calls to avoid race conditions\n parallel_tool_calls=False,\n )\n\n # Run the model and generate a response\n response = await model_with_tools.ainvoke(\n [\n SystemMessage(content=system_prompt),\n *state[\"messages\"],\n ],\n config,\n )\n\n # Update messages with the response\n messages = state[\"messages\"] + [response]\n\n # Handle tool calls\n if hasattr(response, \"tool_calls\") and response.tool_calls:\n # Handle dicts or object (backward compatibility)\n tool_call = (\n response.tool_calls[0]\n if isinstance(response.tool_calls[0], dict)\n else vars(response.tool_calls[0])\n )\n\n # Check if args is already a dict or needs to be parsed\n tool_call_args = (\n tool_call[\"args\"]\n if isinstance(tool_call[\"args\"], dict)\n else json.loads(tool_call[\"args\"])\n )\n\n if tool_call[\"name\"] == \"generate_recipe\":\n # Update recipe state with tool_call_args\n recipe_data = tool_call_args[\"recipe\"]\n\n # If we have an existing recipe, update it\n if \"recipe\" in state and state[\"recipe\"] is not None:\n recipe = state[\"recipe\"]\n for key, value in recipe_data.items():\n if value is not None: # Only update fields that were provided\n recipe[key] = value\n else:\n # Create a new recipe\n recipe = {\n \"skill_level\": recipe_data.get(\n \"skill_level\", SkillLevel.BEGINNER.value\n ),\n \"special_preferences\": recipe_data.get(\"special_preferences\", []),\n \"cooking_time\": recipe_data.get(\n \"cooking_time\", CookingTime.FIFTEEN_MIN.value\n ),\n \"ingredients\": recipe_data.get(\"ingredients\", []),\n \"instructions\": recipe_data.get(\"instructions\", []),\n }\n\n # Add tool response to messages\n tool_response = {\n \"role\": \"tool\",\n \"content\": \"Recipe generated.\",\n \"tool_call_id\": tool_call[\"id\"],\n }\n\n messages = messages + [tool_response]\n\n # Explicitly emit the updated state to ensure it's shared with frontend\n state[\"recipe\"] = recipe\n await adispatch_custom_event(\n \"manually_emit_intermediate_state\",\n state,\n config=config,\n )\n\n # Return command with updated recipe\n return Command(\n goto=\"start_node\", update={\"messages\": messages, \"recipe\": recipe}\n )\n\n return Command(goto=END, update={\"messages\": messages, \"recipe\": state[\"recipe\"]})\n\n\n# Define the graph\nworkflow = StateGraph(AgentState)\nworkflow.add_node(\"start_node\", start_node)\nworkflow.add_node(\"chat_node\", chat_node)\nworkflow.set_entry_point(\"start_node\")\nworkflow.add_edge(START, \"start_node\")\nworkflow.add_edge(\"start_node\", \"chat_node\")\nworkflow.add_edge(\"chat_node\", END)\n\n# Conditionally use a checkpointer based on the environment\n# Check for multiple indicators that we're running in LangGraph dev/API mode\nis_fast_api = os.environ.get(\"LANGGRAPH_FAST_API\", \"false\").lower() == \"true\"\n\n# Compile the graph\nif is_fast_api:\n # For CopilotKit and other contexts, use MemorySaver\n from langgraph.checkpoint.memory import MemorySaver\n\n memory = MemorySaver()\n graph = workflow.compile(checkpointer=memory)\nelse:\n # When running in LangGraph API/dev, don't use a custom checkpointer\n graph = workflow.compile()\n", "language": "python", "type": "file" }, diff --git a/integrations/langgraph/python/examples/agents/shared_state/agent.py b/integrations/langgraph/python/examples/agents/shared_state/agent.py index 12889a47c..6f90ce60a 100644 --- a/integrations/langgraph/python/examples/agents/shared_state/agent.py +++ b/integrations/langgraph/python/examples/agents/shared_state/agent.py @@ -3,34 +3,38 @@ """ import json -from enum import Enum -from typing import Dict, List, Any, Optional import os +from enum import Enum +from typing import Any, Dict, List, Optional -# LangGraph imports -from pydantic import BaseModel, Field -from langchain_core.runnables import RunnableConfig from langchain_core.callbacks.manager import adispatch_custom_event from langchain_core.messages import SystemMessage +from langchain_core.runnables import RunnableConfig from langchain_core.tools import tool from langchain_openai import ChatOpenAI -from langgraph.graph import StateGraph, END, START -from langgraph.types import Command -from langgraph.graph import MessagesState from langgraph.checkpoint.memory import MemorySaver +from langgraph.graph import END, START, MessagesState, StateGraph +from langgraph.types import Command + +# LangGraph imports +from pydantic import BaseModel, Field + class SkillLevel(str, Enum): """ The level of skill required for the recipe. """ + BEGINNER = "Beginner" INTERMEDIATE = "Intermediate" ADVANCED = "Advanced" + class SpecialPreferences(str, Enum): """ Special preferences for the recipe. """ + HIGH_PROTEIN = "High Protein" LOW_CARB = "Low Carb" SPICY = "Spicy" @@ -39,65 +43,71 @@ class SpecialPreferences(str, Enum): VEGETARIAN = "Vegetarian" VEGAN = "Vegan" + class CookingTime(str, Enum): """ The cooking time of the recipe. """ + FIVE_MIN = "5 min" FIFTEEN_MIN = "15 min" THIRTY_MIN = "30 min" FORTY_FIVE_MIN = "45 min" SIXTY_PLUS_MIN = "60+ min" + class Ingredient(BaseModel): """ An ingredient. """ - icon: str = Field( - description="Icon: the actual emoji like 🥕" - ) + + icon: str = Field(description="Icon: the actual emoji like 🥕") name: str = Field(description="The name of the ingredient") amount: str = Field(description="The amount of the ingredient") + class Recipe(BaseModel): """ A recipe. """ - skill_level: SkillLevel = \ - Field(description="The skill level required for the recipe") - special_preferences: List[SpecialPreferences] = \ - Field(description="A list of special preferences for the recipe") - cooking_time: CookingTime = \ - Field(description="The cooking time of the recipe") - ingredients: List[Ingredient] = \ - Field(description= - """Entire list of ingredients for the recipe, including the new ingredients + + skill_level: SkillLevel = Field( + description="The skill level required for the recipe" + ) + special_preferences: List[SpecialPreferences] = Field( + description="A list of special preferences for the recipe" + ) + cooking_time: CookingTime = Field(description="The cooking time of the recipe") + ingredients: List[Ingredient] = Field( + description="""Entire list of ingredients for the recipe, including the new ingredients and the ones that are already in the recipe: Icon: the actual emoji like 🥕, name and amount. Like so: 🥕 Carrots (250g)""" - ) - instructions: List[str] = \ - Field(description= - """Entire list of instructions for the recipe, + ) + instructions: List[str] = Field( + description="""Entire list of instructions for the recipe, including the new instructions and the ones that are already there""" - ) - changes: str = \ - Field(description="A description of the changes made to the recipe") + ) + changes: str = Field(description="A description of the changes made to the recipe") + -class GenerateRecipeArgs(BaseModel): # pylint: disable=missing-class-docstring +class GenerateRecipeArgs(BaseModel): # pylint: disable=missing-class-docstring recipe: Recipe + @tool(args_schema=GenerateRecipeArgs) -def generate_recipe(recipe: Recipe): # pylint: disable=unused-argument +def generate_recipe(recipe: Recipe): # pylint: disable=unused-argument """ Using the existing (if any) ingredients and instructions, proceed with the recipe to finish it. Make sure the recipe is complete. ALWAYS provide the entire recipe, not just the changes. """ + class AgentState(MessagesState): """ The state of the recipe. """ + recipe: Optional[Dict[str, Any]] = None tools: List[Any] @@ -113,8 +123,10 @@ async def start_node(state: Dict[str, Any], config: RunnableConfig): "skill_level": SkillLevel.BEGINNER.value, "special_preferences": [], "cooking_time": CookingTime.FIFTEEN_MIN.value, - "ingredients": [{"icon": "🍴", "name": "Sample Ingredient", "amount": "1 unit"}], - "instructions": ["First step instruction"] + "ingredients": [ + {"icon": "🍴", "name": "Sample Ingredient", "amount": "1 unit"} + ], + "instructions": ["First step instruction"], } # Emit the initial state to ensure it's properly shared with the frontend await adispatch_custom_event( @@ -125,12 +137,10 @@ async def start_node(state: Dict[str, Any], config: RunnableConfig): return Command( goto="chat_node", - update={ - "messages": state["messages"], - "recipe": state["recipe"] - } + update={"messages": state["messages"], "recipe": state["recipe"]}, ) + async def chat_node(state: Dict[str, Any], config: RunnableConfig): """ Standard chat node. @@ -140,19 +150,20 @@ async def chat_node(state: Dict[str, Any], config: RunnableConfig): if "recipe" in state and state["recipe"] is not None: try: recipe_json = json.dumps(state["recipe"], indent=2) - except Exception as e: # pylint: disable=broad-exception-caught + except Exception as e: # pylint: disable=broad-exception-caught recipe_json = f"Error serializing recipe: {str(e)}" - system_prompt = f"""You are a helpful assistant for creating recipes. + system_prompt = f"""You are a helpful assistant for creating recipes. This is the current state of the recipe: {recipe_json} You can improve the recipe by calling the generate_recipe tool. - + IMPORTANT: 1. Create a recipe using the existing ingredients and instructions. Make sure the recipe is complete. 2. For ingredients, append new ingredients to the existing ones. 3. For instructions, append new steps to the existing ones. 4. 'ingredients' is always an array of objects with 'icon', 'name', and 'amount' fields 5. 'instructions' is always an array of strings + 6. For the 'icon' field in ingredients, ALWAYS use actual Unicode emoji characters (like 🥕 🍅 🧅 🥖 🧈 🥛 🧂 etc.), NEVER use text, ANSI codes, or placeholders If you have just created or modified the recipe, just answer in one sentence what you did. dont describe the recipe, just say what you did. """ @@ -165,27 +176,25 @@ async def chat_node(state: Dict[str, Any], config: RunnableConfig): config = RunnableConfig(recursion_limit=25) # Use "predict_state" metadata to set up streaming for the write_document tool - config["metadata"]["predict_state"] = [{ - "state_key": "recipe", - "tool": "generate_recipe", - "tool_argument": "recipe" - }] + config["metadata"]["predict_state"] = [ + {"state_key": "recipe", "tool": "generate_recipe", "tool_argument": "recipe"} + ] # Bind the tools to the model model_with_tools = model.bind_tools( - [ - *state["tools"], - generate_recipe - ], + [*state["tools"], generate_recipe], # Disable parallel tool calls to avoid race conditions parallel_tool_calls=False, ) # Run the model and generate a response - response = await model_with_tools.ainvoke([ - SystemMessage(content=system_prompt), - *state["messages"], - ], config) + response = await model_with_tools.ainvoke( + [ + SystemMessage(content=system_prompt), + *state["messages"], + ], + config, + ) # Update messages with the response messages = state["messages"] + [response] @@ -193,14 +202,18 @@ async def chat_node(state: Dict[str, Any], config: RunnableConfig): # Handle tool calls if hasattr(response, "tool_calls") and response.tool_calls: # Handle dicts or object (backward compatibility) - tool_call = (response.tool_calls[0] - if isinstance(response.tool_calls[0], dict) - else vars(response.tool_calls[0])) + tool_call = ( + response.tool_calls[0] + if isinstance(response.tool_calls[0], dict) + else vars(response.tool_calls[0]) + ) # Check if args is already a dict or needs to be parsed - tool_call_args = (tool_call["args"] - if isinstance(tool_call["args"], dict) - else json.loads(tool_call["args"])) + tool_call_args = ( + tool_call["args"] + if isinstance(tool_call["args"], dict) + else json.loads(tool_call["args"]) + ) if tool_call["name"] == "generate_recipe": # Update recipe state with tool_call_args @@ -215,18 +228,22 @@ async def chat_node(state: Dict[str, Any], config: RunnableConfig): else: # Create a new recipe recipe = { - "skill_level": recipe_data.get("skill_level", SkillLevel.BEGINNER.value), + "skill_level": recipe_data.get( + "skill_level", SkillLevel.BEGINNER.value + ), "special_preferences": recipe_data.get("special_preferences", []), - "cooking_time": recipe_data.get("cooking_time", CookingTime.FIFTEEN_MIN.value), + "cooking_time": recipe_data.get( + "cooking_time", CookingTime.FIFTEEN_MIN.value + ), "ingredients": recipe_data.get("ingredients", []), - "instructions": recipe_data.get("instructions", []) + "instructions": recipe_data.get("instructions", []), } # Add tool response to messages tool_response = { "role": "tool", "content": "Recipe generated.", - "tool_call_id": tool_call["id"] + "tool_call_id": tool_call["id"], } messages = messages + [tool_response] @@ -241,20 +258,10 @@ async def chat_node(state: Dict[str, Any], config: RunnableConfig): # Return command with updated recipe return Command( - goto="start_node", - update={ - "messages": messages, - "recipe": recipe - } + goto="start_node", update={"messages": messages, "recipe": recipe} ) - return Command( - goto=END, - update={ - "messages": messages, - "recipe": state["recipe"] - } - ) + return Command(goto=END, update={"messages": messages, "recipe": state["recipe"]}) # Define the graph @@ -274,6 +281,7 @@ async def chat_node(state: Dict[str, Any], config: RunnableConfig): if is_fast_api: # For CopilotKit and other contexts, use MemorySaver from langgraph.checkpoint.memory import MemorySaver + memory = MemorySaver() graph = workflow.compile(checkpointer=memory) else: