Skip to content

Commit 89de2ad

Browse files
committed
Merge branch 'master' into potel-base
2 parents 30f8f85 + ab2e3f0 commit 89de2ad

File tree

4 files changed

+173
-66
lines changed

4 files changed

+173
-66
lines changed

sentry_sdk/integrations/langchain.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import itertools
12
from collections import OrderedDict
23
from functools import wraps
34

@@ -87,12 +88,9 @@ def __init__(self, span):
8788
class SentryLangchainCallback(BaseCallbackHandler): # type: ignore[misc]
8889
"""Base callback handler that can be used to handle callbacks from langchain."""
8990

90-
span_map = OrderedDict() # type: OrderedDict[UUID, WatchedSpan]
91-
92-
max_span_map_size = 0
93-
9491
def __init__(self, max_span_map_size, include_prompts, tiktoken_encoding_name=None):
9592
# type: (int, bool, Optional[str]) -> None
93+
self.span_map = OrderedDict() # type: OrderedDict[UUID, WatchedSpan]
9694
self.max_span_map_size = max_span_map_size
9795
self.include_prompts = include_prompts
9896

@@ -458,7 +456,14 @@ def new_configure(
458456
**kwargs,
459457
)
460458

461-
if not any(isinstance(cb, SentryLangchainCallback) for cb in callbacks_list):
459+
inheritable_callbacks_list = (
460+
inheritable_callbacks if isinstance(inheritable_callbacks, list) else []
461+
)
462+
463+
if not any(
464+
isinstance(cb, SentryLangchainCallback)
465+
for cb in itertools.chain(callbacks_list, inheritable_callbacks_list)
466+
):
462467
# Avoid mutating the existing callbacks list
463468
callbacks_list = [
464469
*callbacks_list,

sentry_sdk/integrations/ray.py

Lines changed: 64 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -44,77 +44,85 @@ def _patch_ray_remote():
4444
old_remote = ray.remote
4545

4646
@functools.wraps(old_remote)
47-
def new_remote(f, *args, **kwargs):
48-
# type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any]
47+
def new_remote(f=None, *args, **kwargs):
48+
# type: (Optional[Callable[..., Any]], *Any, **Any) -> Callable[..., Any]
49+
4950
if inspect.isclass(f):
5051
# Ray Actors
5152
# (https://docs.ray.io/en/latest/ray-core/actors.html)
5253
# are not supported
5354
# (Only Ray Tasks are supported)
54-
return old_remote(f, *args, *kwargs)
55-
56-
def _f(*f_args, _tracing=None, **f_kwargs):
57-
# type: (Any, Optional[dict[str, Any]], Any) -> Any
58-
"""
59-
Ray Worker
60-
"""
61-
_check_sentry_initialized()
62-
63-
root_span_name = qualname_from_function(f) or DEFAULT_TRANSACTION_NAME
64-
sentry_sdk.get_current_scope().set_transaction_name(
65-
root_span_name,
66-
source=TransactionSource.TASK,
67-
)
68-
with sentry_sdk.continue_trace(_tracing or {}):
55+
return old_remote(f, *args, **kwargs)
56+
57+
def wrapper(user_f):
58+
# type: (Callable[..., Any]) -> Any
59+
def new_func(*f_args, _tracing=None, **f_kwargs):
60+
# type: (Any, Optional[dict[str, Any]], Any) -> Any
61+
_check_sentry_initialized()
62+
63+
root_span_name = qualname_from_function(f) or DEFAULT_TRANSACTION_NAME
64+
sentry_sdk.get_current_scope().set_transaction_name(
65+
root_span_name,
66+
source=TransactionSource.TASK,
67+
)
68+
with sentry_sdk.continue_trace(_tracing or {}):
69+
with sentry_sdk.start_span(
70+
op=OP.QUEUE_TASK_RAY,
71+
name=qualname_from_function(user_f),
72+
origin=RayIntegration.origin,
73+
source=TransactionSource.TASK,
74+
) as root_span:
75+
try:
76+
result = user_f(*f_args, **f_kwargs)
77+
root_span.set_status(SPANSTATUS.OK)
78+
except Exception:
79+
root_span.set_status(SPANSTATUS.INTERNAL_ERROR)
80+
exc_info = sys.exc_info()
81+
_capture_exception(exc_info)
82+
reraise(*exc_info)
83+
84+
return result
85+
86+
if f:
87+
rv = old_remote(new_func)
88+
else:
89+
rv = old_remote(*args, **kwargs)(new_func)
90+
old_remote_method = rv.remote
91+
92+
def _remote_method_with_header_propagation(*args, **kwargs):
93+
# type: (*Any, **Any) -> Any
94+
"""
95+
Ray Client
96+
"""
6997
with sentry_sdk.start_span(
70-
op=OP.QUEUE_TASK_RAY,
71-
name=root_span_name,
98+
op=OP.QUEUE_SUBMIT_RAY,
99+
name=qualname_from_function(user_f),
72100
origin=RayIntegration.origin,
73-
source=TransactionSource.TASK,
74-
) as root_span:
101+
only_if_parent=True,
102+
) as span:
103+
tracing = {
104+
k: v
105+
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
106+
}
75107
try:
76-
result = f(*f_args, **f_kwargs)
77-
root_span.set_status(SPANSTATUS.OK)
108+
result = old_remote_method(*args, **kwargs, _tracing=tracing)
109+
span.set_status(SPANSTATUS.OK)
78110
except Exception:
79-
root_span.set_status(SPANSTATUS.INTERNAL_ERROR)
111+
span.set_status(SPANSTATUS.INTERNAL_ERROR)
80112
exc_info = sys.exc_info()
81113
_capture_exception(exc_info)
82114
reraise(*exc_info)
83115

84116
return result
85117

86-
rv = old_remote(_f, *args, *kwargs)
87-
old_remote_method = rv.remote
88-
89-
def _remote_method_with_header_propagation(*args, **kwargs):
90-
# type: (*Any, **Any) -> Any
91-
"""
92-
Ray Client
93-
"""
94-
with sentry_sdk.start_span(
95-
op=OP.QUEUE_SUBMIT_RAY,
96-
name=qualname_from_function(f),
97-
origin=RayIntegration.origin,
98-
only_if_parent=True,
99-
) as span:
100-
tracing = {
101-
k: v
102-
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
103-
}
104-
try:
105-
result = old_remote_method(*args, **kwargs, _tracing=tracing)
106-
span.set_status(SPANSTATUS.OK)
107-
except Exception:
108-
span.set_status(SPANSTATUS.INTERNAL_ERROR)
109-
exc_info = sys.exc_info()
110-
_capture_exception(exc_info)
111-
reraise(*exc_info)
112-
113-
return result
114-
115-
rv.remote = _remote_method_with_header_propagation
116-
117-
return rv
118+
rv.remote = _remote_method_with_header_propagation
119+
120+
return rv
121+
122+
if f is not None:
123+
return wrapper(f)
124+
else:
125+
return wrapper
118126

119127
ray.remote = new_remote
120128

tests/integrations/langchain/test_langchain.py

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414

1515
from langchain_core.callbacks import CallbackManagerForLLMRun
1616
from langchain_core.messages import BaseMessage, AIMessageChunk
17-
from langchain_core.outputs import ChatGenerationChunk
17+
from langchain_core.outputs import ChatGenerationChunk, ChatResult
18+
from langchain_core.runnables import RunnableConfig
19+
from langchain_core.language_models.chat_models import BaseChatModel
1820

1921
from sentry_sdk import start_span
20-
from sentry_sdk.integrations.langchain import LangchainIntegration
22+
from sentry_sdk.integrations.langchain import (
23+
LangchainIntegration,
24+
SentryLangchainCallback,
25+
)
2126
from langchain.agents import tool, AgentExecutor, create_openai_tools_agent
2227
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
2328

@@ -341,3 +346,84 @@ def test_span_origin(sentry_init, capture_events):
341346
assert event["contexts"]["trace"]["origin"] == "manual"
342347
for span in event["spans"]:
343348
assert span["origin"] == "auto.ai.langchain"
349+
350+
351+
def test_manual_callback_no_duplication(sentry_init):
352+
"""
353+
Test that when a user manually provides a SentryLangchainCallback,
354+
the integration doesn't create a duplicate callback.
355+
"""
356+
357+
# Track callback instances
358+
tracked_callback_instances = set()
359+
360+
class CallbackTrackingModel(BaseChatModel):
361+
"""Mock model that tracks callback instances for testing."""
362+
363+
def _generate(
364+
self,
365+
messages,
366+
stop=None,
367+
run_manager=None,
368+
**kwargs,
369+
):
370+
# Track all SentryLangchainCallback instances
371+
if run_manager:
372+
for handler in run_manager.handlers:
373+
if isinstance(handler, SentryLangchainCallback):
374+
tracked_callback_instances.add(id(handler))
375+
376+
for handler in run_manager.inheritable_handlers:
377+
if isinstance(handler, SentryLangchainCallback):
378+
tracked_callback_instances.add(id(handler))
379+
380+
return ChatResult(
381+
generations=[
382+
ChatGenerationChunk(message=AIMessageChunk(content="Hello!"))
383+
],
384+
llm_output={},
385+
)
386+
387+
@property
388+
def _llm_type(self):
389+
return "test_model"
390+
391+
@property
392+
def _identifying_params(self):
393+
return {}
394+
395+
sentry_init(integrations=[LangchainIntegration()])
396+
397+
# Create a manual SentryLangchainCallback
398+
manual_callback = SentryLangchainCallback(
399+
max_span_map_size=100, include_prompts=False
400+
)
401+
402+
# Create RunnableConfig with the manual callback
403+
config = RunnableConfig(callbacks=[manual_callback])
404+
405+
# Invoke the model with the config
406+
llm = CallbackTrackingModel()
407+
llm.invoke("Hello", config)
408+
409+
# Verify that only ONE SentryLangchainCallback instance was used
410+
assert len(tracked_callback_instances) == 1, (
411+
f"Expected exactly 1 SentryLangchainCallback instance, "
412+
f"but found {len(tracked_callback_instances)}. "
413+
f"This indicates callback duplication occurred."
414+
)
415+
416+
# Verify the callback ID matches our manual callback
417+
assert id(manual_callback) in tracked_callback_instances
418+
419+
420+
def test_span_map_is_instance_variable():
421+
"""Test that each SentryLangchainCallback instance has its own span_map."""
422+
# Create two separate callback instances
423+
callback1 = SentryLangchainCallback(max_span_map_size=100, include_prompts=True)
424+
callback2 = SentryLangchainCallback(max_span_map_size=100, include_prompts=True)
425+
426+
# Verify they have different span_map instances
427+
assert (
428+
callback1.span_map is not callback2.span_map
429+
), "span_map should be an instance variable, not shared between instances"

tests/integrations/ray/test_ray.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ def read_error_from_log(job_id):
5959

6060

6161
@pytest.mark.forked
62-
def test_tracing_in_ray_tasks():
62+
@pytest.mark.parametrize(
63+
"task_options", [{}, {"num_cpus": 0, "memory": 1024 * 1024 * 10}]
64+
)
65+
def test_tracing_in_ray_tasks(task_options):
6366
setup_sentry()
6467

6568
ray.init(
@@ -69,14 +72,19 @@ def test_tracing_in_ray_tasks():
6972
}
7073
)
7174

72-
# Setup ray task
73-
@ray.remote
7475
def example_task():
7576
with sentry_sdk.start_span(op="task", name="example task step"):
7677
...
7778

7879
return sentry_sdk.get_client().transport.envelopes
7980

81+
# Setup ray task, calling decorator directly instead of @,
82+
# to accommodate for test parametrization
83+
if task_options:
84+
example_task = ray.remote(**task_options)(example_task)
85+
else:
86+
example_task = ray.remote(example_task)
87+
8088
with sentry_sdk.start_span(op="test", name="ray client root span"):
8189
worker_envelopes = ray.get(example_task.remote())
8290

0 commit comments

Comments
 (0)