From 41dd1f216c9da9e4ff0871a5e53840e6fbe55f0b Mon Sep 17 00:00:00 2001 From: Nathan Date: Sat, 5 Oct 2024 00:38:09 +0100 Subject: [PATCH 1/6] fix(integrations): Handle new types of streamed data Handles InputJsonDelta for anthropic >= 0.27.0 which broke tool calling in streams. --- sentry_sdk/integrations/anthropic.py | 114 +++++++++++++- .../integrations/anthropic/test_anthropic.py | 149 +++++++++++++++++- 2 files changed, 253 insertions(+), 10 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index f3fd8d2d92..82db1f1ad8 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -1,4 +1,5 @@ from functools import wraps +from typing import TYPE_CHECKING import sentry_sdk from sentry_sdk.ai.monitoring import record_token_usage @@ -11,8 +12,6 @@ package_version, ) -from typing import TYPE_CHECKING - try: from anthropic.resources import Messages @@ -46,7 +45,10 @@ def setup_once(): if version < (0, 16): raise DidNotEnable("anthropic 0.16 or newer required.") - Messages.create = _wrap_message_create(Messages.create) + if version >= (0, 27, 0): + Messages.create = _wrap_message_create(Messages.create) + else: + Messages.create = _wrap_message_create_old(Messages.create) def _capture_exception(exc): @@ -75,6 +77,112 @@ def _calculate_token_usage(result, span): def _wrap_message_create(f): + # type: (Any) -> Any + @wraps(f) + def _sentry_patched_create(*args, **kwargs): + # type: (*Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) + + if integration is None or "messages" not in kwargs: + return f(*args, **kwargs) + + try: + iter(kwargs["messages"]) + except TypeError: + return f(*args, **kwargs) + + messages = list(kwargs["messages"]) + model = kwargs.get("model") + + span = sentry_sdk.start_span( + op=OP.ANTHROPIC_MESSAGES_CREATE, + name="Anthropic messages create", + origin=AnthropicIntegration.origin, + ) + span.__enter__() + + try: + result = f(*args, **kwargs) + except Exception as exc: + _capture_exception(exc) + span.__exit__(None, None, None) + raise exc from None + + with capture_internal_exceptions(): + span.set_data(SPANDATA.AI_MODEL_ID, model) + span.set_data(SPANDATA.AI_STREAMING, False) + if should_send_default_pii() and integration.include_prompts: + span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages) + if hasattr(result, "content"): + if should_send_default_pii() and integration.include_prompts: + span.set_data( + SPANDATA.AI_RESPONSES, + list( + map( + lambda message: { + "type": message.type, + "text": message.text, + }, + result.content, + ) + ), + ) + _calculate_token_usage(result, span) + span.__exit__(None, None, None) + elif hasattr(result, "_iterator"): + old_iterator = result._iterator + + def new_iterator(): + # type: () -> Iterator[MessageStreamEvent] + input_tokens = 0 + output_tokens = 0 + content_blocks = [] + with capture_internal_exceptions(): + for event in old_iterator: + if hasattr(event, "type"): + if event.type == "message_start": + usage = event.message.usage + input_tokens += usage.input_tokens + output_tokens += usage.output_tokens + elif event.type == "content_block_start": + pass + elif event.type == "content_block_delta": + if hasattr(event.delta, "text"): + content_blocks.append(event.delta.text) + elif hasattr(event.delta, "partial_json"): + content_blocks.append(event.delta.partial_json) + elif event.type == "content_block_stop": + pass + elif event.type == "message_delta": + output_tokens += event.usage.output_tokens + elif event.type == "message_stop": + continue + yield event + + if should_send_default_pii() and integration.include_prompts: + complete_message = "".join(content_blocks) + span.set_data( + SPANDATA.AI_RESPONSES, + [{"type": "text", "text": complete_message}], + ) + total_tokens = input_tokens + output_tokens + record_token_usage( + span, input_tokens, output_tokens, total_tokens + ) + span.set_data(SPANDATA.AI_STREAMING, True) + span.__exit__(None, None, None) + + result._iterator = new_iterator() + else: + span.set_data("unknown_response", True) + span.__exit__(None, None, None) + + return result + + return _sentry_patched_create + + +def _wrap_message_create_old(f): # type: (Any) -> Any @wraps(f) def _sentry_patched_create(*args, **kwargs): diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 5fefde9b5a..ccbb4d4a2f 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,17 +1,22 @@ -import pytest from unittest import mock -from anthropic import Anthropic, Stream, AnthropicError -from anthropic.types import Usage, MessageDeltaUsage, TextDelta + +import pytest +from anthropic import Anthropic, AnthropicError, Stream +from anthropic.types import MessageDeltaUsage, TextDelta, Usage +from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent +from anthropic.types.content_block_start_event import ContentBlockStartEvent +from anthropic.types.content_block_stop_event import ContentBlockStopEvent from anthropic.types.message import Message from anthropic.types.message_delta_event import MessageDeltaEvent from anthropic.types.message_start_event import MessageStartEvent -from anthropic.types.content_block_start_event import ContentBlockStartEvent -from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent -from anthropic.types.content_block_stop_event import ContentBlockStopEvent + +from sentry_sdk.utils import package_version try: # 0.27+ + from anthropic.types import InputJsonDelta from anthropic.types.raw_message_delta_event import Delta + from anthropic.types.tool_use_block import ToolUseBlock except ImportError: # pre 0.27 from anthropic.types.message_delta_event import Delta @@ -25,7 +30,7 @@ from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations.anthropic import AnthropicIntegration - +ANTHROPIC_VERSION = package_version("anthropic") EXAMPLE_MESSAGE = Message( id="id", model="model", @@ -203,6 +208,136 @@ def test_streaming_create_message( assert span["data"]["ai.streaming"] is True +@pytest.mark.skipif( + ANTHROPIC_VERSION < (0, 27), + reason="Versions <0.27.0 do not include InputJsonDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", +) +@pytest.mark.parametrize( + "send_default_pii, include_prompts", + [ + (True, True), + (True, False), + (False, True), + (False, False), + ], +) +def test_streaming_create_message_with_input_json_delta( + sentry_init, capture_events, send_default_pii, include_prompts +): + client = Anthropic(api_key="z") + returned_stream = Stream(cast_to=None, response=None, client=client) + returned_stream._iterator = [ + MessageStartEvent( + message=Message( + id="msg_0", + content=[], + model="claude-3-5-sonnet-20240620", + role="assistant", + stop_reason=None, + stop_sequence=None, + type="message", + usage=Usage(input_tokens=366, output_tokens=10), + ), + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=ToolUseBlock( + id="toolu_0", input={}, name="get_weather", type="tool_use" + ), + ), + ContentBlockDeltaEvent( + delta=InputJsonDelta(partial_json="", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJsonDelta(partial_json="{'location':", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJsonDelta(partial_json=" 'S", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJsonDelta(partial_json="an ", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJsonDelta(partial_json="Francisco, C", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=InputJsonDelta(partial_json="A'}", type="input_json_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(stop_reason="tool_use", stop_sequence=None), + usage=MessageDeltaUsage(output_tokens=41), + type="message_delta", + ), + ] + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=include_prompts)], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + client.messages._post = mock.Mock(return_value=returned_stream) + + messages = [ + { + "role": "user", + "content": "What is the weather like in San Francisco?", + } + ] + + with start_transaction(name="anthropic"): + message = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + for _ in message: + pass + + assert message == returned_stream + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + assert len(event["spans"]) == 1 + (span,) = event["spans"] + + assert span["op"] == OP.ANTHROPIC_MESSAGES_CREATE + assert span["description"] == "Anthropic messages create" + assert span["data"][SPANDATA.AI_MODEL_ID] == "model" + + if send_default_pii and include_prompts: + assert span["data"][SPANDATA.AI_INPUT_MESSAGES] == messages + assert span["data"][SPANDATA.AI_RESPONSES] == [ + {"text": "{'location': 'San Francisco, CA'}", "type": "text"} + ] + + else: + assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] + assert SPANDATA.AI_RESPONSES not in span["data"] + + assert span["measurements"]["ai_prompt_tokens_used"]["value"] == 366 + assert span["measurements"]["ai_completion_tokens_used"]["value"] == 51 + assert span["measurements"]["ai_total_tokens_used"]["value"] == 417 + assert span["data"]["ai.streaming"] is True + + def test_exception_message_create(sentry_init, capture_events): sentry_init(integrations=[AnthropicIntegration()], traces_sample_rate=1.0) events = capture_events() From ca54feb5ba240b19fb46a04e0bcd68476899749b Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 11 Oct 2024 13:46:22 +0200 Subject: [PATCH 2/6] Making sure responses from tool use work the same in streaming and normal messages --- sentry_sdk/integrations/anthropic.py | 30 +++++++++++++++------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 82db1f1ad8..76db62b3ce 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -76,6 +76,21 @@ def _calculate_token_usage(result, span): record_token_usage(span, input_tokens, output_tokens, total_tokens) +def _get_responses(content): + # type: (list[dict[str, str]]) -> dict[str, str] + """Get JSON of a Anthropic responses.""" + responses = [] + for item in content: + if hasattr(item, "text"): + responses.append( + { + "type": item.type, + "text": item.text, + } + ) + return responses + + def _wrap_message_create(f): # type: (Any) -> Any @wraps(f) @@ -115,18 +130,7 @@ def _sentry_patched_create(*args, **kwargs): span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages) if hasattr(result, "content"): if should_send_default_pii() and integration.include_prompts: - span.set_data( - SPANDATA.AI_RESPONSES, - list( - map( - lambda message: { - "type": message.type, - "text": message.text, - }, - result.content, - ) - ), - ) + span.set_data(SPANDATA.AI_RESPONSES, _get_responses(result.content)) _calculate_token_usage(result, span) span.__exit__(None, None, None) elif hasattr(result, "_iterator"): @@ -149,8 +153,6 @@ def new_iterator(): elif event.type == "content_block_delta": if hasattr(event.delta, "text"): content_blocks.append(event.delta.text) - elif hasattr(event.delta, "partial_json"): - content_blocks.append(event.delta.partial_json) elif event.type == "content_block_stop": pass elif event.type == "message_delta": From 6bdaceac4d64c24124b0337c9d435f7586b24e9a Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 11 Oct 2024 13:58:50 +0200 Subject: [PATCH 3/6] If we only deal with text response types, it works in old and new Anthropic the same way --- sentry_sdk/integrations/anthropic.py | 108 +-------------------------- 1 file changed, 1 insertion(+), 107 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 76db62b3ce..029723b158 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -45,10 +45,7 @@ def setup_once(): if version < (0, 16): raise DidNotEnable("anthropic 0.16 or newer required.") - if version >= (0, 27, 0): - Messages.create = _wrap_message_create(Messages.create) - else: - Messages.create = _wrap_message_create_old(Messages.create) + Messages.create = _wrap_message_create(Messages.create) def _capture_exception(exc): @@ -182,106 +179,3 @@ def new_iterator(): return result return _sentry_patched_create - - -def _wrap_message_create_old(f): - # type: (Any) -> Any - @wraps(f) - def _sentry_patched_create(*args, **kwargs): - # type: (*Any, **Any) -> Any - integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) - - if integration is None or "messages" not in kwargs: - return f(*args, **kwargs) - - try: - iter(kwargs["messages"]) - except TypeError: - return f(*args, **kwargs) - - messages = list(kwargs["messages"]) - model = kwargs.get("model") - - span = sentry_sdk.start_span( - op=OP.ANTHROPIC_MESSAGES_CREATE, - name="Anthropic messages create", - origin=AnthropicIntegration.origin, - ) - span.__enter__() - - try: - result = f(*args, **kwargs) - except Exception as exc: - _capture_exception(exc) - span.__exit__(None, None, None) - raise exc from None - - with capture_internal_exceptions(): - span.set_data(SPANDATA.AI_MODEL_ID, model) - span.set_data(SPANDATA.AI_STREAMING, False) - if should_send_default_pii() and integration.include_prompts: - span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages) - if hasattr(result, "content"): - if should_send_default_pii() and integration.include_prompts: - span.set_data( - SPANDATA.AI_RESPONSES, - list( - map( - lambda message: { - "type": message.type, - "text": message.text, - }, - result.content, - ) - ), - ) - _calculate_token_usage(result, span) - span.__exit__(None, None, None) - elif hasattr(result, "_iterator"): - old_iterator = result._iterator - - def new_iterator(): - # type: () -> Iterator[MessageStreamEvent] - input_tokens = 0 - output_tokens = 0 - content_blocks = [] - with capture_internal_exceptions(): - for event in old_iterator: - if hasattr(event, "type"): - if event.type == "message_start": - usage = event.message.usage - input_tokens += usage.input_tokens - output_tokens += usage.output_tokens - elif event.type == "content_block_start": - pass - elif event.type == "content_block_delta": - content_blocks.append(event.delta.text) - elif event.type == "content_block_stop": - pass - elif event.type == "message_delta": - output_tokens += event.usage.output_tokens - elif event.type == "message_stop": - continue - yield event - - if should_send_default_pii() and integration.include_prompts: - complete_message = "".join(content_blocks) - span.set_data( - SPANDATA.AI_RESPONSES, - [{"type": "text", "text": complete_message}], - ) - total_tokens = input_tokens + output_tokens - record_token_usage( - span, input_tokens, output_tokens, total_tokens - ) - span.set_data(SPANDATA.AI_STREAMING, True) - span.__exit__(None, None, None) - - result._iterator = new_iterator() - else: - span.set_data("unknown_response", True) - span.__exit__(None, None, None) - - return result - - return _sentry_patched_create From 8f0846bd25c55040c8003b97d91c36f705fef524 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 11 Oct 2024 14:19:51 +0200 Subject: [PATCH 4/6] Updated tests to work in latest Anthropic and also check for not recorded InputJsonDelta --- .../integrations/anthropic/test_anthropic.py | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index ccbb4d4a2f..7e33ac831d 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -12,9 +12,16 @@ from sentry_sdk.utils import package_version +try: + from anthropic.types import InputJSONDelta +except ImportError: + try: + from anthropic.types import InputJsonDelta as InputJSONDelta + except ImportError: + pass + try: # 0.27+ - from anthropic.types import InputJsonDelta from anthropic.types.raw_message_delta_event import Delta from anthropic.types.tool_use_block import ToolUseBlock except ImportError: @@ -210,7 +217,7 @@ def test_streaming_create_message( @pytest.mark.skipif( ANTHROPIC_VERSION < (0, 27), - reason="Versions <0.27.0 do not include InputJsonDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", + reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", ) @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -248,32 +255,32 @@ def test_streaming_create_message_with_input_json_delta( ), ), ContentBlockDeltaEvent( - delta=InputJsonDelta(partial_json="", type="input_json_delta"), + delta=InputJSONDelta(partial_json="", type="input_json_delta"), index=0, type="content_block_delta", ), ContentBlockDeltaEvent( - delta=InputJsonDelta(partial_json="{'location':", type="input_json_delta"), + delta=InputJSONDelta(partial_json="{'location':", type="input_json_delta"), index=0, type="content_block_delta", ), ContentBlockDeltaEvent( - delta=InputJsonDelta(partial_json=" 'S", type="input_json_delta"), + delta=InputJSONDelta(partial_json=" 'S", type="input_json_delta"), index=0, type="content_block_delta", ), ContentBlockDeltaEvent( - delta=InputJsonDelta(partial_json="an ", type="input_json_delta"), + delta=InputJSONDelta(partial_json="an ", type="input_json_delta"), index=0, type="content_block_delta", ), ContentBlockDeltaEvent( - delta=InputJsonDelta(partial_json="Francisco, C", type="input_json_delta"), + delta=InputJSONDelta(partial_json="Francisco, C", type="input_json_delta"), index=0, type="content_block_delta", ), ContentBlockDeltaEvent( - delta=InputJsonDelta(partial_json="A'}", type="input_json_delta"), + delta=InputJSONDelta(partial_json="A'}", type="input_json_delta"), index=0, type="content_block_delta", ), @@ -325,8 +332,8 @@ def test_streaming_create_message_with_input_json_delta( if send_default_pii and include_prompts: assert span["data"][SPANDATA.AI_INPUT_MESSAGES] == messages assert span["data"][SPANDATA.AI_RESPONSES] == [ - {"text": "{'location': 'San Francisco, CA'}", "type": "text"} - ] + {"text": "", "type": "text"} + ] # we do not record InputJSONDelta because it could contain PII else: assert SPANDATA.AI_INPUT_MESSAGES not in span["data"] From 5a64f635af9cf133761455b44af44110b2058945 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 11 Oct 2024 14:22:27 +0200 Subject: [PATCH 5/6] Fixed types --- sentry_sdk/integrations/anthropic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 029723b158..1b0454556a 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -74,7 +74,7 @@ def _calculate_token_usage(result, span): def _get_responses(content): - # type: (list[dict[str, str]]) -> dict[str, str] + # type: (list[Any]) -> dict[str, Any] """Get JSON of a Anthropic responses.""" responses = [] for item in content: From f54ea9925033102f740b0f27c7493e4ee0fa7753 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Fri, 11 Oct 2024 14:27:05 +0200 Subject: [PATCH 6/6] oh... --- sentry_sdk/integrations/anthropic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 1b0454556a..08c40bc7b6 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -74,7 +74,7 @@ def _calculate_token_usage(result, span): def _get_responses(content): - # type: (list[Any]) -> dict[str, Any] + # type: (list[Any]) -> list[dict[str, Any]] """Get JSON of a Anthropic responses.""" responses = [] for item in content: