Skip to content

Commit 41dd1f2

Browse files
committed
fix(integrations): Handle new types of streamed data
Handles InputJsonDelta for anthropic >= 0.27.0 which broke tool calling in streams.
1 parent 00f8140 commit 41dd1f2

File tree

2 files changed

+253
-10
lines changed

2 files changed

+253
-10
lines changed

sentry_sdk/integrations/anthropic.py

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from functools import wraps
2+
from typing import TYPE_CHECKING
23

34
import sentry_sdk
45
from sentry_sdk.ai.monitoring import record_token_usage
@@ -11,8 +12,6 @@
1112
package_version,
1213
)
1314

14-
from typing import TYPE_CHECKING
15-
1615
try:
1716
from anthropic.resources import Messages
1817

@@ -46,7 +45,10 @@ def setup_once():
4645
if version < (0, 16):
4746
raise DidNotEnable("anthropic 0.16 or newer required.")
4847

49-
Messages.create = _wrap_message_create(Messages.create)
48+
if version >= (0, 27, 0):
49+
Messages.create = _wrap_message_create(Messages.create)
50+
else:
51+
Messages.create = _wrap_message_create_old(Messages.create)
5052

5153

5254
def _capture_exception(exc):
@@ -75,6 +77,112 @@ def _calculate_token_usage(result, span):
7577

7678

7779
def _wrap_message_create(f):
80+
# type: (Any) -> Any
81+
@wraps(f)
82+
def _sentry_patched_create(*args, **kwargs):
83+
# type: (*Any, **Any) -> Any
84+
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
85+
86+
if integration is None or "messages" not in kwargs:
87+
return f(*args, **kwargs)
88+
89+
try:
90+
iter(kwargs["messages"])
91+
except TypeError:
92+
return f(*args, **kwargs)
93+
94+
messages = list(kwargs["messages"])
95+
model = kwargs.get("model")
96+
97+
span = sentry_sdk.start_span(
98+
op=OP.ANTHROPIC_MESSAGES_CREATE,
99+
name="Anthropic messages create",
100+
origin=AnthropicIntegration.origin,
101+
)
102+
span.__enter__()
103+
104+
try:
105+
result = f(*args, **kwargs)
106+
except Exception as exc:
107+
_capture_exception(exc)
108+
span.__exit__(None, None, None)
109+
raise exc from None
110+
111+
with capture_internal_exceptions():
112+
span.set_data(SPANDATA.AI_MODEL_ID, model)
113+
span.set_data(SPANDATA.AI_STREAMING, False)
114+
if should_send_default_pii() and integration.include_prompts:
115+
span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages)
116+
if hasattr(result, "content"):
117+
if should_send_default_pii() and integration.include_prompts:
118+
span.set_data(
119+
SPANDATA.AI_RESPONSES,
120+
list(
121+
map(
122+
lambda message: {
123+
"type": message.type,
124+
"text": message.text,
125+
},
126+
result.content,
127+
)
128+
),
129+
)
130+
_calculate_token_usage(result, span)
131+
span.__exit__(None, None, None)
132+
elif hasattr(result, "_iterator"):
133+
old_iterator = result._iterator
134+
135+
def new_iterator():
136+
# type: () -> Iterator[MessageStreamEvent]
137+
input_tokens = 0
138+
output_tokens = 0
139+
content_blocks = []
140+
with capture_internal_exceptions():
141+
for event in old_iterator:
142+
if hasattr(event, "type"):
143+
if event.type == "message_start":
144+
usage = event.message.usage
145+
input_tokens += usage.input_tokens
146+
output_tokens += usage.output_tokens
147+
elif event.type == "content_block_start":
148+
pass
149+
elif event.type == "content_block_delta":
150+
if hasattr(event.delta, "text"):
151+
content_blocks.append(event.delta.text)
152+
elif hasattr(event.delta, "partial_json"):
153+
content_blocks.append(event.delta.partial_json)
154+
elif event.type == "content_block_stop":
155+
pass
156+
elif event.type == "message_delta":
157+
output_tokens += event.usage.output_tokens
158+
elif event.type == "message_stop":
159+
continue
160+
yield event
161+
162+
if should_send_default_pii() and integration.include_prompts:
163+
complete_message = "".join(content_blocks)
164+
span.set_data(
165+
SPANDATA.AI_RESPONSES,
166+
[{"type": "text", "text": complete_message}],
167+
)
168+
total_tokens = input_tokens + output_tokens
169+
record_token_usage(
170+
span, input_tokens, output_tokens, total_tokens
171+
)
172+
span.set_data(SPANDATA.AI_STREAMING, True)
173+
span.__exit__(None, None, None)
174+
175+
result._iterator = new_iterator()
176+
else:
177+
span.set_data("unknown_response", True)
178+
span.__exit__(None, None, None)
179+
180+
return result
181+
182+
return _sentry_patched_create
183+
184+
185+
def _wrap_message_create_old(f):
78186
# type: (Any) -> Any
79187
@wraps(f)
80188
def _sentry_patched_create(*args, **kwargs):

tests/integrations/anthropic/test_anthropic.py

Lines changed: 142 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1-
import pytest
21
from unittest import mock
3-
from anthropic import Anthropic, Stream, AnthropicError
4-
from anthropic.types import Usage, MessageDeltaUsage, TextDelta
2+
3+
import pytest
4+
from anthropic import Anthropic, AnthropicError, Stream
5+
from anthropic.types import MessageDeltaUsage, TextDelta, Usage
6+
from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent
7+
from anthropic.types.content_block_start_event import ContentBlockStartEvent
8+
from anthropic.types.content_block_stop_event import ContentBlockStopEvent
59
from anthropic.types.message import Message
610
from anthropic.types.message_delta_event import MessageDeltaEvent
711
from anthropic.types.message_start_event import MessageStartEvent
8-
from anthropic.types.content_block_start_event import ContentBlockStartEvent
9-
from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent
10-
from anthropic.types.content_block_stop_event import ContentBlockStopEvent
12+
13+
from sentry_sdk.utils import package_version
1114

1215
try:
1316
# 0.27+
17+
from anthropic.types import InputJsonDelta
1418
from anthropic.types.raw_message_delta_event import Delta
19+
from anthropic.types.tool_use_block import ToolUseBlock
1520
except ImportError:
1621
# pre 0.27
1722
from anthropic.types.message_delta_event import Delta
@@ -25,7 +30,7 @@
2530
from sentry_sdk.consts import OP, SPANDATA
2631
from sentry_sdk.integrations.anthropic import AnthropicIntegration
2732

28-
33+
ANTHROPIC_VERSION = package_version("anthropic")
2934
EXAMPLE_MESSAGE = Message(
3035
id="id",
3136
model="model",
@@ -203,6 +208,136 @@ def test_streaming_create_message(
203208
assert span["data"]["ai.streaming"] is True
204209

205210

211+
@pytest.mark.skipif(
212+
ANTHROPIC_VERSION < (0, 27),
213+
reason="Versions <0.27.0 do not include InputJsonDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.",
214+
)
215+
@pytest.mark.parametrize(
216+
"send_default_pii, include_prompts",
217+
[
218+
(True, True),
219+
(True, False),
220+
(False, True),
221+
(False, False),
222+
],
223+
)
224+
def test_streaming_create_message_with_input_json_delta(
225+
sentry_init, capture_events, send_default_pii, include_prompts
226+
):
227+
client = Anthropic(api_key="z")
228+
returned_stream = Stream(cast_to=None, response=None, client=client)
229+
returned_stream._iterator = [
230+
MessageStartEvent(
231+
message=Message(
232+
id="msg_0",
233+
content=[],
234+
model="claude-3-5-sonnet-20240620",
235+
role="assistant",
236+
stop_reason=None,
237+
stop_sequence=None,
238+
type="message",
239+
usage=Usage(input_tokens=366, output_tokens=10),
240+
),
241+
type="message_start",
242+
),
243+
ContentBlockStartEvent(
244+
type="content_block_start",
245+
index=0,
246+
content_block=ToolUseBlock(
247+
id="toolu_0", input={}, name="get_weather", type="tool_use"
248+
),
249+
),
250+
ContentBlockDeltaEvent(
251+
delta=InputJsonDelta(partial_json="", type="input_json_delta"),
252+
index=0,
253+
type="content_block_delta",
254+
),
255+
ContentBlockDeltaEvent(
256+
delta=InputJsonDelta(partial_json="{'location':", type="input_json_delta"),
257+
index=0,
258+
type="content_block_delta",
259+
),
260+
ContentBlockDeltaEvent(
261+
delta=InputJsonDelta(partial_json=" 'S", type="input_json_delta"),
262+
index=0,
263+
type="content_block_delta",
264+
),
265+
ContentBlockDeltaEvent(
266+
delta=InputJsonDelta(partial_json="an ", type="input_json_delta"),
267+
index=0,
268+
type="content_block_delta",
269+
),
270+
ContentBlockDeltaEvent(
271+
delta=InputJsonDelta(partial_json="Francisco, C", type="input_json_delta"),
272+
index=0,
273+
type="content_block_delta",
274+
),
275+
ContentBlockDeltaEvent(
276+
delta=InputJsonDelta(partial_json="A'}", type="input_json_delta"),
277+
index=0,
278+
type="content_block_delta",
279+
),
280+
ContentBlockStopEvent(type="content_block_stop", index=0),
281+
MessageDeltaEvent(
282+
delta=Delta(stop_reason="tool_use", stop_sequence=None),
283+
usage=MessageDeltaUsage(output_tokens=41),
284+
type="message_delta",
285+
),
286+
]
287+
288+
sentry_init(
289+
integrations=[AnthropicIntegration(include_prompts=include_prompts)],
290+
traces_sample_rate=1.0,
291+
send_default_pii=send_default_pii,
292+
)
293+
events = capture_events()
294+
client.messages._post = mock.Mock(return_value=returned_stream)
295+
296+
messages = [
297+
{
298+
"role": "user",
299+
"content": "What is the weather like in San Francisco?",
300+
}
301+
]
302+
303+
with start_transaction(name="anthropic"):
304+
message = client.messages.create(
305+
max_tokens=1024, messages=messages, model="model", stream=True
306+
)
307+
308+
for _ in message:
309+
pass
310+
311+
assert message == returned_stream
312+
assert len(events) == 1
313+
(event,) = events
314+
315+
assert event["type"] == "transaction"
316+
assert event["transaction"] == "anthropic"
317+
318+
assert len(event["spans"]) == 1
319+
(span,) = event["spans"]
320+
321+
assert span["op"] == OP.ANTHROPIC_MESSAGES_CREATE
322+
assert span["description"] == "Anthropic messages create"
323+
assert span["data"][SPANDATA.AI_MODEL_ID] == "model"
324+
325+
if send_default_pii and include_prompts:
326+
assert span["data"][SPANDATA.AI_INPUT_MESSAGES] == messages
327+
assert span["data"][SPANDATA.AI_RESPONSES] == [
328+
{"text": "{'location': 'San Francisco, CA'}", "type": "text"}
329+
]
330+
331+
else:
332+
assert SPANDATA.AI_INPUT_MESSAGES not in span["data"]
333+
assert SPANDATA.AI_RESPONSES not in span["data"]
334+
335+
assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 366
336+
assert span["measurements"]["ai_completion_tokens_used"]["value"] == 51
337+
assert span["measurements"]["ai_total_tokens_used"]["value"] == 417
338+
assert span["data"]["ai.streaming"] is True
339+
340+
206341
def test_exception_message_create(sentry_init, capture_events):
207342
sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0)
208343
events = capture_events()

0 commit comments

Comments
 (0)