-
Notifications
You must be signed in to change notification settings - Fork 38
Description
Hi, I am using langgraph tutorial to create agentic workflow solutions. In this example, we have a primary agent which will call the specialised agent to get the date. Tutorial that was referenced can be found here: https://langchain-ai.github.io/langgraph/tutorials/customer-support/customer-support/#flights
I have streamlined the tutorial to make it simpler (only 1 specialised assistant). Please find my sample code below:
from datetime import datetime
from langchain.tools import Tool
import shutil
import uuid
from typing import Annotated, Literal, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
import httpx
import os
from typing import Callable
from langchain_core.messages import ToolMessage
from typing import Literal
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_core.runnables import RunnableLambda
from databricks_langchain import ChatDatabricks
DATABRICKS_TOKEN = ""
BASE_URL = f""
# Define tool
def get_today_date(not_used: str) -> str:
return datetime.today().strftime('%d-%m-%Y')
get_today_date_tool = Tool.from_function(
func=get_today_date,
name="GetTodayDate",
description="you can use this tool to get today date so u can use it to calc dates before or after",
)
def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
"""Push or pop the state."""
if right is None:
return left
if right == "pop":
return left[:-1]
return left + [right]
class State(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
dialog_state: Annotated[
list[
Literal[
"assistant",
"get_date",
]
],
update_dialog_stack,
]
class Assistant:
def __init__(self, runnable: Runnable):
self.runnable = runnable
def __call__(self, state: State, config: RunnableConfig):
while True:
result = self.runnable.invoke(state)
if not result.tool_calls and (
not result.content
or isinstance(result.content, list)
and not result.content[0].get("text")
):
messages = state["messages"] + [("user", "Respond with a real output.")]
state = {**state, "messages": messages}
else:
break
return {"messages": result}
class CompleteOrEscalate(BaseModel):
"""A tool to mark the current task as completed and/or to escalate control of the dialog to the main assistant,
who can re-route the dialog based on the user's needs."""
cancel: bool = True
reason: str
class Config:
json_schema_extra = {
"example": {
"cancel": True,
"reason": "User changed their mind about the current task.",
},
"example 2": {
"cancel": True,
"reason": "I have fully completed the task.",
}
}
date_assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a specialized assistant for obtaining the date today. "
" The primary assistant delegates work to you whenever the user needs help about the date today. "
"If you need more information or the user changes their mind, escalate the task back to the main assistant."
" Remember that a task isn't completed until after the relevant tool has successfully been used."
"\n\nIf the user needs help, and none of your tools are appropriate for it, then"
' "CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.',
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now)
os.environ['DATABRICKS_HOST'] = BASE_URL
os.environ['DATABRICKS_TOKEN'] = DATABRICKS_TOKEN
os.environ['DATABRICKS_INSECURE'] = "True"
os.environ['REQUESTS_CA_BUNDLE'] = ""
llm = ChatDatabricks(
endpoint="sc-ta-gpt4o",
http_client= httpx.Client(verify=False)
)
date_assistant_tools = [get_today_date_tool]
date_assistant = date_assistant_prompt | llm.bind_tools(
date_assistant_tools + [CompleteOrEscalate]
)
def handle_tool_error(state) -> dict:
error = state.get("error")
tool_calls = state["messages"][-1].tool_calls
return {
"messages": [
ToolMessage(
content=f"Error: {repr(error)}\n please fix your mistakes.",
tool_call_id=tc["id"],
)
for tc in tool_calls
]
}
def create_tool_node_with_fallback(tools: list) -> dict:
return ToolNode(tools).with_fallbacks(
[RunnableLambda(handle_tool_error)], exception_key="error"
)
# Primary Assistant
class ToDateAssistant(BaseModel):
"""Transfers work to a specialized assistant to handle dates."""
request: str = Field(
description="Any necessary followup questions the date assistant should clarify before proceeding."
)
primary_assistant_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"Your primary role is to answer customer queries. "
"If a customer requests for today's date, "
"delegate the task to the appropriate specialized assistant by invoking the corresponding tool."
" Only the specialized assistants are given permission to do this for the user."
"The user is not aware of the different specialized assistants, so do not mention them; just quietly delegate through function calls. "
),
("placeholder", "{messages}"),
]
).partial(time=datetime.now)
primary_assistant_tools = []
date_assistant_runnable = date_assistant_prompt | llm.bind_tools(
primary_assistant_tools + [CompleteOrEscalate]
)
assistant_runnable = primary_assistant_prompt | llm.bind_tools(
primary_assistant_tools
+ [
ToDateAssistant,
]
)
def create_entry_node(assistant_name: str, new_dialog_state: str) -> Callable:
def entry_node(state: State) -> dict:
tool_call_id = state["messages"][-1].tool_calls[0]["id"]
return {
"messages": [
ToolMessage(
content=f"The assistant is now the {assistant_name}. Reflect on the above conversation between the host assistant and the user."
f" The user's intent is unsatisfied. Use the provided tools to assist the user. Remember, you are {assistant_name},"
" and the booking, update, other other action is not complete until after you have successfully invoked the appropriate tool."
" If the user changes their mind or needs help for other tasks, call the CompleteOrEscalate function to let the primary host assistant take control."
" Do not mention who you are - just act as the proxy for the assistant.",
tool_call_id=tool_call_id,
)
],
"dialog_state": new_dialog_state,
}
return entry_node
builder = StateGraph(State)
builder.add_edge(START, "primary_assistant")
builder.add_node(
"enter_date_assistant",
create_entry_node("Date Assistant", "get_date"),
)
builder.add_node("get_date", Assistant(date_assistant_runnable))
builder.add_edge("enter_date_assistant", "get_date")
builder.add_node(
"date_assistant_tools",
create_tool_node_with_fallback(date_assistant_tools),
)
def route_get_date(
state: State,
):
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
if did_cancel:
return "leave_skill"
return "date_assistant_tools"
builder.add_edge("date_assistant_tools", "get_date")
builder.add_conditional_edges(
"get_date",
route_get_date,
["date_assistant_tools", "leave_skill", END],
)
# This node will be shared for exiting all specialized assistants
def pop_dialog_state(state: State) -> dict:
"""Pop the dialog stack and return to the main assistant.
This lets the full graph explicitly track the dialog flow and delegate control
to specific sub-graphs.
"""
messages = []
if state["messages"][-1].tool_calls:
# Note: Doesn't currently handle the edge case where the llm performs parallel tool calls
messages.append(
ToolMessage(
content="Resuming dialog with the host assistant. Please reflect on the past conversation and assist the user as needed.",
tool_call_id=state["messages"][-1].tool_calls[0]["id"],
)
)
return {
"dialog_state": "pop",
"messages": messages,
}
builder.add_node("leave_skill", pop_dialog_state)
builder.add_edge("leave_skill", "primary_assistant")
builder.add_node("primary_assistant", Assistant(assistant_runnable))
builder.add_node(
"primary_assistant_tools", create_tool_node_with_fallback(primary_assistant_tools)
)
def route_primary_assistant(
state: State,
):
route = tools_condition(state)
if route == END:
return END
tool_calls = state["messages"][-1].tool_calls
if tool_calls:
if tool_calls[0]["name"] == ToDateAssistant.__name__:
return "enter_date_assistant"
return "primary_assistant_tools"
raise ValueError("Invalid route")
# The assistant can route to one of the delegated assistants,
# directly use a tool, or directly respond to the user
builder.add_conditional_edges(
"primary_assistant",
route_primary_assistant,
[
"enter_date_assistant",
"primary_assistant_tools",
END,
],
)
builder.add_edge("primary_assistant_tools", "primary_assistant")
# Compile graph
memory = MemorySaver()
part_4_graph = builder.compile(
checkpointer=memory,
)
# Update with the backup file so we can restart from the original place in each section
thread_id = str(uuid.uuid4())
query="what is the date today"
config = {
"configurable": {
# Checkpoints are accessed by thread_id
"thread_id": thread_id,
}
}
for res in part_4_graph.stream(
{"messages": [{"role": "user", "content": query}]},
config,
stream_mode="debug", # If use anything else, tool calling will not be logged with log_tool_calls
):
print("\n")
print(res['payload'])
This script works with input guardrails turned off. However, when it is on, I get the below error:
Error code: 400 - {'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'Invalid type of embeddings model input parameter. Expecting String or List[String].'}
The output of the LLM just before the error was:
{'id': 'fb5a2cc2-0a04-e9af-bfc3-c1471251f571', 'name': 'get_date', 'input': {'messages': [HumanMessage(content='what is the date today', additional_kwargs={}, response_metadata={}, id='afb3eeae-f46a-4fb0-bddc-1e7bd8ed1fac'), AIMessage(content='', additional_kwargs={'tool_calls': [{'function': {'arguments': '{"request":"What is today\'s date?"}', 'name': 'ToDateAssistant'}, 'id': 'call_Mr0s27r9KM4fk5NvcKmzzjJ2', 'type': 'function'}]}, response_metadata={'completion_tokens': 20, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens': 136, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}, 'total_tokens': 156}, id='run-aa219ef6-e305-408a-9951-6141b2369ea2-0', tool_calls=[{'name': 'ToDateAssistant', 'args': {'request': "What is today's date?"}, 'id': 'call_Mr0s27r9KM4fk5NvcKmzzjJ2', 'type': 'tool_call'}]), ToolMessage(content="The assistant is now the Date Assistant. Reflect on the above conversation between the host assistant and the user. The user's intent is unsatisfied. Use the provided tools to assist the user. Remember, you are Date Assistant, and the booking, update, other other action is not complete until after you have successfully invoked the appropriate tool. If the user changes their mind or needs help for other tasks, call the CompleteOrEscalate function to let the primary host assistant take control. Do not mention who you are - just act as the proxy for the assistant.", id='aa33c62d-d98a-4d3a-aa88-448c0d630d7b', tool_call_id='call_Mr0s27r9KM4fk5NvcKmzzjJ2')], 'dialog_state': ['get_date']}, 'triggers': ['enter_date_assistant']}
When the main assistant invokes the Date assistant, it makes a tool call in in such a case, the content of the message is an empty string as seen above (AIMessage(content=''). Somehow, langchain/langgraph changes this '' to None and this is then passed to databricks which then throws an error because None is not a string. I tried intercepting the message and changing that '' to like '-' but then langchain will throw an error saying that content should be empty string as it is a tool call.
Can you please check this issue and maybe if content is not a string, have input guardrails convert it to a string first?