-
Couldn't load subscription status.
- Fork 566
feat: Add LangGraph integration #4727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 24 commits
ed4d833
4b3b664
e5d2520
b2ec8c2
e65168d
e9e53c3
aec2e26
ba7e4a0
c7e326d
affa2f9
0de085a
71af1cc
f013b48
1140d93
b5c6e37
8be760f
fa7aba3
573b989
7270228
f822271
3de395a
5df87e0
6966b82
fcdba78
58ac37c
95cf995
d400545
b544b05
67b7000
b363f20
2f311a8
c1096db
a2c7014
db1fb16
0500b81
72d384a
2891402
ff575b2
36f7e41
08fb67d
f903562
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,301 @@ | ||||||
| from functools import wraps | ||||||
| from typing import Any, Callable, List, Optional | ||||||
|
|
||||||
| import sentry_sdk | ||||||
| from sentry_sdk.ai.utils import set_data_normalized | ||||||
| from sentry_sdk.consts import OP, SPANDATA | ||||||
| from sentry_sdk.integrations import DidNotEnable, Integration | ||||||
| from sentry_sdk.scope import should_send_default_pii | ||||||
| from sentry_sdk.utils import safe_serialize | ||||||
|
|
||||||
|
|
||||||
| try: | ||||||
| from langgraph.graph import StateGraph | ||||||
| from langgraph.pregel import Pregel | ||||||
| except ImportError: | ||||||
| raise DidNotEnable("langgraph not installed") | ||||||
|
|
||||||
|
|
||||||
| class LanggraphIntegration(Integration): | ||||||
| identifier = "langgraph" | ||||||
| origin = f"auto.ai.{identifier}" | ||||||
|
|
||||||
| def __init__(self, include_prompts=True): | ||||||
| # type: (LanggraphIntegration, bool) -> None | ||||||
| self.include_prompts = include_prompts | ||||||
|
|
||||||
| @staticmethod | ||||||
| def setup_once(): | ||||||
| # type: () -> None | ||||||
| # LangGraph lets users create agents using a StateGraph or the Functional API. | ||||||
| # StateGraphs are then compiled to a CompiledStateGraph. Both CompiledStateGraph and | ||||||
| # the functional API execute on a Pregel instance. Pregel is the runtime for the graph | ||||||
| # and the invocation happens on Pregel, so patching the invoke methods takes care of both. | ||||||
| # The streaming methods are not patched, because due to some internal reasons, LangGraph | ||||||
| # will automatically patch the streaming methods to run through invoke, and by doing this | ||||||
| # we prevent duplicate spans for invocations. | ||||||
| StateGraph.compile = _wrap_state_graph_compile(StateGraph.compile) | ||||||
| if hasattr(Pregel, "invoke"): | ||||||
| Pregel.invoke = _wrap_pregel_invoke(Pregel.invoke) | ||||||
| if hasattr(Pregel, "ainvoke"): | ||||||
| Pregel.ainvoke = _wrap_pregel_ainvoke(Pregel.ainvoke) | ||||||
|
|
||||||
|
|
||||||
| def _get_graph_name(graph_obj): | ||||||
| # type: (Any) -> Optional[str] | ||||||
| for attr in ["name", "graph_name", "__name__", "_name"]: | ||||||
| if hasattr(graph_obj, attr): | ||||||
| name = getattr(graph_obj, attr) | ||||||
| if name and isinstance(name, str): | ||||||
| return name | ||||||
| return None | ||||||
|
|
||||||
|
|
||||||
| def _normalize_langgraph_message(message): | ||||||
| # type: (Any) -> Any | ||||||
| if not hasattr(message, "content"): | ||||||
| return None | ||||||
| parsed = {"role": getattr(message, "type", None), "content": message.content} | ||||||
|
|
||||||
| for attr in ["name", "tool_calls", "function_call", "tool_call_id"]: | ||||||
| if hasattr(message, attr): | ||||||
| value = getattr(message, attr) | ||||||
| if value is not None: | ||||||
| parsed[attr] = value | ||||||
|
|
||||||
| return parsed | ||||||
|
|
||||||
|
|
||||||
| def _parse_langgraph_messages(state): | ||||||
| # type: (Any) -> Optional[List[Any]] | ||||||
| if not state: | ||||||
| return None | ||||||
|
|
||||||
| messages = None | ||||||
|
|
||||||
| if isinstance(state, dict): | ||||||
| messages = state.get("messages") | ||||||
| elif hasattr(state, "messages"): | ||||||
| messages = state.messages | ||||||
| elif hasattr(state, "get") and callable(state.get): | ||||||
| try: | ||||||
| messages = state.get("messages") | ||||||
| except Exception: | ||||||
| pass | ||||||
|
|
||||||
| if not messages or not isinstance(messages, (list, tuple)): | ||||||
| return None | ||||||
|
|
||||||
| normalized_messages = [] | ||||||
| for message in messages: | ||||||
| try: | ||||||
| normalized = _normalize_langgraph_message(message) | ||||||
| if normalized: | ||||||
| normalized_messages.append(normalized) | ||||||
| except Exception: | ||||||
| continue | ||||||
|
|
||||||
| return normalized_messages if normalized_messages else None | ||||||
|
|
||||||
|
|
||||||
| def _wrap_state_graph_compile(f): | ||||||
| # type: (Callable[..., Any]) -> Callable[..., Any] | ||||||
| @wraps(f) | ||||||
| def new_compile(self, *args, **kwargs): | ||||||
| # type: (Any, Any, Any) -> Any | ||||||
| integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) | ||||||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| compiled_graph = f(self, *args, **kwargs) | ||||||
| if integration is None: | ||||||
| return compiled_graph | ||||||
|
|
||||||
| with sentry_sdk.start_span( | ||||||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| op=OP.GEN_AI_CREATE_AGENT, | ||||||
| name="create_agent", | ||||||
antonpirker marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| origin=LanggraphIntegration.origin, | ||||||
| ) as span: | ||||||
| span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "create_agent") | ||||||
| span.set_data( | ||||||
| SPANDATA.GEN_AI_AGENT_NAME, getattr(compiled_graph, "name", None) | ||||||
| ) | ||||||
| span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, kwargs.get("model")) | ||||||
antonpirker marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| tools = None | ||||||
| graph = getattr(compiled_graph, "get_graph", None) | ||||||
| if callable(graph): | ||||||
| graph_obj = graph() | ||||||
antonpirker marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| nodes = getattr(graph_obj, "nodes", None) | ||||||
| if nodes and isinstance(nodes, dict): | ||||||
| tools_node = nodes.get("tools") | ||||||
| if tools_node: | ||||||
| data = getattr(tools_node, "data", None) | ||||||
| if data and hasattr(data, "tools_by_name"): | ||||||
| tools = list(data.tools_by_name.keys()) | ||||||
| span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, tools) | ||||||
|
||||||
| return compiled_graph | ||||||
|
|
||||||
| return new_compile | ||||||
|
|
||||||
|
|
||||||
| def _wrap_pregel_invoke(f): | ||||||
| # type: (Callable[..., Any]) -> Callable[..., Any] | ||||||
|
|
||||||
| @wraps(f) | ||||||
| def new_invoke(self, *args, **kwargs): | ||||||
| # type: (Any, Any, Any) -> Any | ||||||
| integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) | ||||||
| if integration is None: | ||||||
| return f(self, *args, **kwargs) | ||||||
|
|
||||||
| graph_name = _get_graph_name(self) | ||||||
|
|
||||||
| with sentry_sdk.start_span( | ||||||
| op=OP.GEN_AI_INVOKE_AGENT, | ||||||
| name="invoke_agent", | ||||||
|
||||||
| origin=LanggraphIntegration.origin, | ||||||
| ) as span: | ||||||
| if graph_name: | ||||||
| span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) | ||||||
| span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) | ||||||
|
|
||||||
| span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") | ||||||
| span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) | ||||||
|
||||||
|
|
||||||
| # Store input messages to later compare with output | ||||||
| input_messages = None | ||||||
| if ( | ||||||
| len(args) > 0 | ||||||
| and should_send_default_pii() | ||||||
| and integration.include_prompts | ||||||
| ): | ||||||
| input_messages = _parse_langgraph_messages(args[0]) | ||||||
| if input_messages: | ||||||
| set_data_normalized( | ||||||
| span, | ||||||
| SPANDATA.GEN_AI_REQUEST_MESSAGES, | ||||||
| safe_serialize(input_messages), | ||||||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| ) | ||||||
|
|
||||||
| result = f(self, *args, **kwargs) | ||||||
| _set_response_attributes(span, input_messages, result, integration) | ||||||
| return result | ||||||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
shellmayr marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| return new_invoke | ||||||
|
|
||||||
|
|
||||||
| def _wrap_pregel_ainvoke(f): | ||||||
| # type: (Callable[..., Any]) -> Callable[..., Any] | ||||||
|
|
||||||
| @wraps(f) | ||||||
| async def new_ainvoke(self, *args, **kwargs): | ||||||
| # type: (Any, Any, Any) -> Any | ||||||
| integration = sentry_sdk.get_client().get_integration(LanggraphIntegration) | ||||||
| if integration is None: | ||||||
| return await f(self, *args, **kwargs) | ||||||
|
|
||||||
| graph_name = _get_graph_name(self) | ||||||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| with sentry_sdk.start_span( | ||||||
| op=OP.GEN_AI_INVOKE_AGENT, | ||||||
| name="invoke_agent", | ||||||
antonpirker marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| origin=LanggraphIntegration.origin, | ||||||
| ) as span: | ||||||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| if graph_name: | ||||||
| span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, graph_name) | ||||||
| span.set_data(SPANDATA.GEN_AI_AGENT_NAME, graph_name) | ||||||
|
|
||||||
| span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent") | ||||||
| span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False) | ||||||
antonpirker marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
|
|
||||||
| input_messages = None | ||||||
| if ( | ||||||
| len(args) > 0 | ||||||
| and should_send_default_pii() | ||||||
| and integration.include_prompts | ||||||
| ): | ||||||
| input_messages = _parse_langgraph_messages(args[0]) | ||||||
| if input_messages: | ||||||
| set_data_normalized( | ||||||
| span, | ||||||
| SPANDATA.GEN_AI_REQUEST_MESSAGES, | ||||||
| safe_serialize(input_messages), | ||||||
| ) | ||||||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| result = await f(self, *args, **kwargs) | ||||||
| _set_response_attributes(span, input_messages, result, integration) | ||||||
| return result | ||||||
|
|
||||||
| return new_ainvoke | ||||||
|
|
||||||
|
|
||||||
| def _get_new_messages(input_messages, output_messages): | ||||||
| # type: (Optional[List[Any]], Optional[List[Any]]) -> Optional[List[Any]] | ||||||
| """Extract only the new messages added during this invocation.""" | ||||||
| if not output_messages: | ||||||
| return None | ||||||
|
|
||||||
| if not input_messages: | ||||||
| return output_messages | ||||||
|
|
||||||
| # only return the new messages, aka the output messages that are not in the input messages | ||||||
| input_count = len(input_messages) | ||||||
| new_messages = ( | ||||||
| output_messages[input_count:] if len(output_messages) > input_count else [] | ||||||
| ) | ||||||
|
|
||||||
| return new_messages if new_messages else None | ||||||
|
|
||||||
|
|
||||||
| def _extract_llm_response_text(messages): | ||||||
| # type: (Optional[List[Any]]) -> Optional[str] | ||||||
| if not messages: | ||||||
| return None | ||||||
|
|
||||||
| for message in reversed(messages): | ||||||
| if isinstance(message, dict): | ||||||
| role = message.get("role") | ||||||
| if role in ["assistant", "ai"]: | ||||||
| content = message.get("content") | ||||||
| if content and isinstance(content, str): | ||||||
| return content | ||||||
|
|
||||||
| return None | ||||||
|
|
||||||
|
|
||||||
| def _extract_tool_calls(messages): | ||||||
| # type: (Optional[List[Any]]) -> Optional[List[Any]] | ||||||
| if not messages: | ||||||
| return None | ||||||
|
|
||||||
| tool_calls = [] | ||||||
| for message in messages: | ||||||
| if isinstance(message, dict): | ||||||
| msg_tool_calls = message.get("tool_calls") | ||||||
| if msg_tool_calls and isinstance(msg_tool_calls, list): | ||||||
| tool_calls.extend(msg_tool_calls) | ||||||
|
|
||||||
| return tool_calls if tool_calls else None | ||||||
|
|
||||||
|
|
||||||
| def _set_response_attributes(span, input_messages, result, integration): | ||||||
| # type: (Any, Optional[List[Any]], Any, LanggraphIntegration) -> None | ||||||
| if not (should_send_default_pii() and integration.include_prompts): | ||||||
| return | ||||||
|
|
||||||
| parsed_response_messages = _parse_langgraph_messages(result) | ||||||
| new_messages = _get_new_messages(input_messages, parsed_response_messages) | ||||||
|
|
||||||
| llm_response_text = _extract_llm_response_text(new_messages) | ||||||
| if llm_response_text: | ||||||
| set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, llm_response_text) | ||||||
| elif new_messages: | ||||||
| set_data_normalized( | ||||||
| span, SPANDATA.GEN_AI_RESPONSE_TEXT, safe_serialize(new_messages) | ||||||
| ) | ||||||
| else: | ||||||
| set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, result) | ||||||
antonpirker marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
|
|
||||||
antonpirker marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| tool_calls = _extract_tool_calls(new_messages) | ||||||
| if tool_calls: | ||||||
| set_data_normalized( | ||||||
| span, SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, safe_serialize(tool_calls) | ||||||
|
||||||
| span, SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, safe_serialize(tool_calls) | |
| span, SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS, safe_serialize(tool_calls), unpack=False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| import pytest | ||
|
|
||
| pytest.importorskip("langgraph") |
Uh oh!
There was an error while loading. Please reload this page.