Skip to content

Commit 5e8881d

Browse files
thpiercejj22ee
authored andcommitted
Only decode JSON input buffer in Anthropic Claude streaming (aws-observability#497)
_decode_tool_use was only used when _tool_json_input_buf was found, but we were decoding the entire _content_block after adding _tool_json_input_buf to it. The _content_block overall which could contain non-JSON elements (e.g. {}), causing failures. To fix this, we have removed _decode_tool_use helper function and inlined JSON decoding logic directly into content_block_stop handler in _process_anthropic_claude_chunk, where we only use it to decode _tool_json_input_buf before appending to _content_block. Patch based on open-telemetry/opentelemetry-python-contrib#3875 with code copied directly from https://github.com/open-telemetry/opentelemetry-python-contrib/blob/v0.54b1/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py#L289 Repeated testing in open-telemetry/opentelemetry-python-contrib#3875 to confirm this works By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 606f629 commit 5e8881d

File tree

3 files changed

+200
-5
lines changed

3 files changed

+200
-5
lines changed

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Changelog
2+
3+
All notable changes to this project will be documented in this file.
4+
5+
> **Note:** This CHANGELOG was created starting from version 0.12.0. Earlier changes are not documented here.
6+
7+
For any change that affects end users of this package, please add an entry under the **Unreleased** section. Briefly summarize the change and provide the link to the PR. Example:
8+
- add GenAI attribute support for Amazon Bedrock models
9+
([#300](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/300))
10+
11+
If your change does not need a CHANGELOG entry, add the "skip changelog" label to your PR.
12+
13+
## Unreleased
14+
- [PATCH] Only decode JSON input buffer in Anthropic Claude streaming
15+
([#497](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/497))

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,89 @@ def patched_extract_tool_calls(
327327
tool_calls.append(tool_call)
328328
return tool_calls
329329

330+
# TODO: The following code is to patch a bedrock bug that was fixed in
331+
# opentelemetry-instrumentation-botocore==0.60b0 in:
332+
# https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3875
333+
# Remove this code once we've bumped opentelemetry-instrumentation-botocore dependency to 0.60b0
334+
def patched_process_anthropic_claude_chunk(self, chunk):
335+
# pylint: disable=too-many-return-statements,too-many-branches
336+
if not (message_type := chunk.get("type")):
337+
return
338+
339+
if message_type == "message_start":
340+
# {'type': 'message_start', 'message': {'id': 'id', 'type': 'message', 'role': 'assistant',
341+
# 'model': 'claude-2.0', 'content': [], 'stop_reason': None, 'stop_sequence': None,
342+
# 'usage': {'input_tokens': 18, 'output_tokens': 1}}}
343+
if chunk.get("message", {}).get("role") == "assistant":
344+
self._record_message = True
345+
message = chunk["message"]
346+
self._message = {
347+
"role": message["role"],
348+
"content": message.get("content", []),
349+
}
350+
return
351+
352+
if message_type == "content_block_start":
353+
# {'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}}
354+
# {'type': 'content_block_start', 'index': 1, 'content_block':
355+
# {'type': 'tool_use', 'id': 'id', 'name': 'func_name', 'input': {}}}
356+
if self._record_message:
357+
block = chunk.get("content_block", {})
358+
if block.get("type") == "text":
359+
self._content_block = block
360+
elif block.get("type") == "tool_use":
361+
self._content_block = block
362+
return
363+
364+
if message_type == "content_block_delta":
365+
# {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Here'}}
366+
# {'type': 'content_block_delta', 'index': 1, 'delta': {'type': 'input_json_delta', 'partial_json': ''}}
367+
if self._record_message:
368+
delta = chunk.get("delta", {})
369+
if delta.get("type") == "text_delta":
370+
self._content_block["text"] += delta.get("text", "")
371+
elif delta.get("type") == "input_json_delta":
372+
self._tool_json_input_buf += delta.get("partial_json", "")
373+
return
374+
375+
if message_type == "content_block_stop":
376+
# {'type': 'content_block_stop', 'index': 0}
377+
if self._tool_json_input_buf:
378+
try:
379+
self._content_block["input"] = json.loads(self._tool_json_input_buf)
380+
except json.JSONDecodeError:
381+
self._content_block["input"] = self._tool_json_input_buf
382+
self._message["content"].append(self._content_block)
383+
self._content_block = {}
384+
self._tool_json_input_buf = ""
385+
return
386+
387+
if message_type == "message_delta":
388+
# {'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None},
389+
# 'usage': {'output_tokens': 123}}
390+
if (stop_reason := chunk.get("delta", {}).get("stop_reason")) is not None:
391+
self._response["stopReason"] = stop_reason
392+
return
393+
394+
if message_type == "message_stop":
395+
# {'type': 'message_stop', 'amazon-bedrock-invocationMetrics':
396+
# {'inputTokenCount': 18, 'outputTokenCount': 123, 'invocationLatency': 5250, 'firstByteLatency': 290}}
397+
if invocation_metrics := chunk.get("amazon-bedrock-invocationMetrics"):
398+
self._process_invocation_metrics(invocation_metrics)
399+
400+
if self._record_message:
401+
self._response["output"] = {"message": self._message}
402+
self._record_message = False
403+
self._message = None
404+
405+
self._stream_done_callback(self._response)
406+
return
407+
330408
bedrock_utils.ConverseStreamWrapper.__init__ = patched_init
331409
bedrock_utils.ConverseStreamWrapper._process_event = patched_process_event
410+
bedrock_utils.InvokeModelWithResponseStreamWrapper._process_anthropic_claude_chunk = (
411+
patched_process_anthropic_claude_chunk
412+
)
332413
bedrock_utils.extract_tool_calls = patched_extract_tool_calls
333414

334415
# END The OpenTelemetry Authors code

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,6 @@ def _run_patch_behaviour_tests(self):
8484
self._test_unpatched_botocore_propagator()
8585
self._test_unpatched_gevent_instrumentation()
8686
self._test_unpatched_starlette_instrumentation()
87-
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
88-
# Bedrock Runtime tests
89-
self._test_unpatched_converse_stream_wrapper()
90-
self._test_unpatched_extract_tool_calls()
9187

9288
# Apply patches
9389
apply_instrumentation_patches()
@@ -178,6 +174,16 @@ def _test_unpatched_botocore_instrumentation(self):
178174
# DynamoDB
179175
self.assertTrue("dynamodb" in _KNOWN_EXTENSIONS, "Upstream has removed a DynamoDB extension")
180176

177+
# Bedrock Runtime tests
178+
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
179+
self._test_unpatched_converse_stream_wrapper()
180+
self._test_unpatched_extract_tool_calls()
181+
182+
# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
183+
self._test_unpatched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
184+
self._test_unpatched_process_anthropic_claude_chunk(None, None)
185+
self._test_unpatched_process_anthropic_claude_chunk({}, {})
186+
181187
def _test_unpatched_gevent_instrumentation(self):
182188
self.assertFalse(gevent.monkey.is_module_patched("os"), "gevent os module has been patched")
183189
self.assertFalse(gevent.monkey.is_module_patched("thread"), "gevent thread module has been patched")
@@ -223,10 +229,14 @@ def _test_patched_botocore_instrumentation(self):
223229
# Bedrock Agent Operation
224230
self._test_patched_bedrock_agent_instrumentation()
225231

226-
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
227232
# Bedrock Runtime
233+
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
228234
self._test_patched_converse_stream_wrapper()
229235
self._test_patched_extract_tool_calls()
236+
# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
237+
self._test_patched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
238+
self._test_patched_process_anthropic_claude_chunk(None, None)
239+
self._test_patched_process_anthropic_claude_chunk({}, {})
230240

231241
# Bedrock Agent Runtime
232242
self.assertTrue("bedrock-agent-runtime" in _KNOWN_EXTENSIONS)
@@ -600,6 +610,95 @@ def _test_patched_extract_tool_calls(self):
600610
result = bedrock_utils.extract_tool_calls(message_with_string_content, True)
601611
self.assertIsNone(result)
602612

613+
# Test with toolUse format to exercise the for loop
614+
message_with_tool_use = {"role": "assistant", "content": [{"toolUse": {"toolUseId": "id1", "name": "func1"}}]}
615+
result = bedrock_utils.extract_tool_calls(message_with_tool_use, True)
616+
self.assertEqual(len(result), 1)
617+
618+
# Test with tool_use format to exercise the for loop
619+
message_with_type_tool_use = {
620+
"role": "assistant",
621+
"content": [{"type": "tool_use", "id": "id2", "name": "func2"}],
622+
}
623+
result = bedrock_utils.extract_tool_calls(message_with_type_tool_use, True)
624+
self.assertEqual(len(result), 1)
625+
626+
def _test_patched_process_anthropic_claude_chunk(
627+
self, input_value: Dict[str, str], expected_output: Dict[str, str]
628+
):
629+
self._test_process_anthropic_claude_chunk(input_value, expected_output, False)
630+
631+
def _test_unpatched_process_anthropic_claude_chunk(
632+
self, input_value: Dict[str, str], expected_output: Dict[str, str]
633+
):
634+
self._test_process_anthropic_claude_chunk(input_value, expected_output, True)
635+
636+
def _test_process_anthropic_claude_chunk(
637+
self, input_value: Dict[str, str], expected_output: Dict[str, str], expect_exception: bool
638+
):
639+
"""Test that _process_anthropic_claude_chunk handles various tool_use input formats."""
640+
wrapper = bedrock_utils.InvokeModelWithResponseStreamWrapper(
641+
stream=MagicMock(),
642+
stream_done_callback=MagicMock,
643+
stream_error_callback=MagicMock,
644+
model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
645+
)
646+
647+
# Simulate message_start
648+
wrapper._process_anthropic_claude_chunk(
649+
{
650+
"type": "message_start",
651+
"message": {
652+
"role": "assistant",
653+
"content": [],
654+
},
655+
}
656+
)
657+
658+
# Simulate content_block_start with specified input
659+
content_block = {
660+
"type": "tool_use",
661+
"id": "test_id",
662+
"name": "test_tool",
663+
}
664+
if input_value is not None:
665+
content_block["input"] = input_value
666+
667+
wrapper._process_anthropic_claude_chunk(
668+
{
669+
"type": "content_block_start",
670+
"index": 0,
671+
"content_block": content_block,
672+
}
673+
)
674+
675+
# Simulate content_block_stop
676+
try:
677+
wrapper._process_anthropic_claude_chunk({"type": "content_block_stop", "index": 0})
678+
except TypeError:
679+
if expect_exception:
680+
return
681+
else:
682+
raise
683+
684+
# Verify the message content
685+
self.assertEqual(len(wrapper._message["content"]), 1)
686+
tool_block = wrapper._message["content"][0]
687+
self.assertEqual(tool_block["type"], "tool_use")
688+
self.assertEqual(tool_block["id"], "test_id")
689+
self.assertEqual(tool_block["name"], "test_tool")
690+
691+
if expected_output is not None:
692+
self.assertEqual(tool_block["input"], expected_output)
693+
self.assertIsInstance(tool_block["input"], dict)
694+
else:
695+
self.assertNotIn("input", tool_block)
696+
697+
# Just adding this to do basic sanity checks and increase code coverage
698+
wrapper._process_anthropic_claude_chunk({"type": "content_block_delta", "index": 0})
699+
wrapper._process_anthropic_claude_chunk({"type": "message_delta"})
700+
wrapper._process_anthropic_claude_chunk({"type": "message_stop"})
701+
603702
def _test_patched_bedrock_agent_instrumentation(self):
604703
"""For bedrock-agent service, both extract_attributes and on_success provides attributes,
605704
the attributes depend on the API being invoked."""

0 commit comments

Comments
 (0)