Skip to content

Commit 708adfd

Browse files
committed
Make it work with async streaming responses
1 parent f837fab commit 708adfd

File tree

1 file changed

+53
-4
lines changed

1 file changed

+53
-4
lines changed

sentry_sdk/integrations/anthropic.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ def _get_responses(content):
9090

9191
def _sentry_patched_create_common(f, *args, **kwargs):
9292
# type: (Any, *Any, **Any) -> Any
93+
94+
# check requirements
9395
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
9496
if integration is None:
9597
return f(*args, **kwargs)
@@ -102,36 +104,79 @@ def _sentry_patched_create_common(f, *args, **kwargs):
102104
except TypeError:
103105
return f(*args, **kwargs)
104106

105-
messages = list(kwargs["messages"])
106-
model = kwargs.get("model")
107-
107+
# start span
108108
span = sentry_sdk.start_span(
109109
op=OP.ANTHROPIC_MESSAGES_CREATE,
110110
description="Anthropic messages create",
111111
origin=AnthropicIntegration.origin,
112112
)
113113
span.__enter__()
114114

115+
# yield generator
115116
try:
116117
result = yield f, args, kwargs
117118
except Exception as exc:
118119
_capture_exception(exc)
119120
span.__exit__(None, None, None)
120121
raise exc from None
121122

123+
# add data to span and finish it
124+
messages = list(kwargs["messages"])
125+
model = kwargs.get("model")
126+
122127
with capture_internal_exceptions():
123128
span.set_data(SPANDATA.AI_MODEL_ID, model)
124129
span.set_data(SPANDATA.AI_STREAMING, False)
130+
125131
if should_send_default_pii() and integration.include_prompts:
126132
span.set_data(SPANDATA.AI_INPUT_MESSAGES, messages)
133+
127134
if hasattr(result, "content"):
128135
if should_send_default_pii() and integration.include_prompts:
129136
span.set_data(SPANDATA.AI_RESPONSES, _get_responses(result.content))
130137
_calculate_token_usage(result, span)
131138
span.__exit__(None, None, None)
139+
132140
elif hasattr(result, "_iterator"):
133141
old_iterator = result._iterator
134142

143+
async def async_new_iterator():
144+
# type: () -> Iterator[MessageStreamEvent]
145+
input_tokens = 0
146+
output_tokens = 0
147+
content_blocks = []
148+
with capture_internal_exceptions():
149+
async for event in old_iterator:
150+
if hasattr(event, "type"):
151+
if event.type == "message_start":
152+
usage = event.message.usage
153+
input_tokens += usage.input_tokens
154+
output_tokens += usage.output_tokens
155+
elif event.type == "content_block_start":
156+
pass
157+
elif event.type == "content_block_delta":
158+
if hasattr(event.delta, "text"):
159+
content_blocks.append(event.delta.text)
160+
elif event.type == "content_block_stop":
161+
pass
162+
elif event.type == "message_delta":
163+
output_tokens += event.usage.output_tokens
164+
elif event.type == "message_stop":
165+
continue
166+
167+
yield event
168+
169+
if should_send_default_pii() and integration.include_prompts:
170+
complete_message = "".join(content_blocks)
171+
span.set_data(
172+
SPANDATA.AI_RESPONSES,
173+
[{"type": "text", "text": complete_message}],
174+
)
175+
total_tokens = input_tokens + output_tokens
176+
record_token_usage(span, input_tokens, output_tokens, total_tokens)
177+
span.set_data(SPANDATA.AI_STREAMING, True)
178+
span.__exit__(None, None, None)
179+
135180
def new_iterator():
136181
# type: () -> Iterator[MessageStreamEvent]
137182
input_tokens = 0
@@ -168,7 +213,11 @@ def new_iterator():
168213
span.set_data(SPANDATA.AI_STREAMING, True)
169214
span.__exit__(None, None, None)
170215

171-
result._iterator = new_iterator()
216+
if str(type(result._iterator)) == "<class 'async_generator'>":
217+
result._iterator = async_new_iterator()
218+
else:
219+
result._iterator = new_iterator()
220+
172221
else:
173222
span.set_data("unknown_response", True)
174223
span.__exit__(None, None, None)

0 commit comments

Comments
 (0)