Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,38 @@ jobs:
uses: github/codeql-action/analyze@v3
with:
category: "/language:${{matrix.language}}"

all-codeql-checks-pass:
runs-on: ubuntu-latest
needs: [analyze]
if: always()
steps:
- name: Checkout to get workflow file
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 #5.0.0

- name: Check all jobs succeeded and none missing
run: |
# Check if all needed jobs succeeded
results='${{ toJSON(needs) }}'
if echo "$results" | jq -r '.[] | .result' | grep -v success; then
echo "Some jobs failed"
exit 1
fi

# Extract all job names from workflow (excluding this gate job)
all_jobs=$(yq eval '.jobs | keys | .[]' .github/workflows/codeql.yml | grep -v "all-codeql-checks-pass" | sort)

# Extract job names from needs array
needed_jobs='${{ toJSON(needs) }}'
needs_list=$(echo "$needed_jobs" | jq -r 'keys[]' | sort)

# Check if any jobs are missing from needs
missing_jobs=$(comm -23 <(echo "$all_jobs") <(echo "$needs_list"))
if [ -n "$missing_jobs" ]; then
echo "ERROR: Jobs missing from needs array in all-codeql-checks-pass:"
echo "$missing_jobs"
echo "Please add these jobs to the needs array of all-codeql-checks-pass"
exit 1
fi

echo "All CodeQL checks passed and no jobs missing from gate!"
35 changes: 35 additions & 0 deletions .github/workflows/pr-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,38 @@ jobs:

- name: Build with Gradle
run: cd performance-tests; ./gradlew spotlessCheck

all-pr-checks-pass:
runs-on: ubuntu-latest
needs: [static-code-checks, lint, spotless, build, build-lambda]
if: always()
steps:
- name: Checkout to get workflow file
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 #5.0.0

- name: Check all jobs succeeded and none missing
run: |
# Check if all needed jobs succeeded
results='${{ toJSON(needs) }}'
if echo "$results" | jq -r '.[] | .result' | grep -v success; then
echo "Some jobs failed"
exit 1
fi

# Extract all job names from workflow (excluding this gate job)
all_jobs=$(yq eval '.jobs | keys | .[]' .github/workflows/pr-build.yml | grep -v "all-pr-checks-pass" | sort)

# Extract job names from needs array
needed_jobs='${{ toJSON(needs) }}'
needs_list=$(echo "$needed_jobs" | jq -r 'keys[]' | sort)

# Check if any jobs are missing from needs
missing_jobs=$(comm -23 <(echo "$all_jobs") <(echo "$needs_list"))
if [ -n "$missing_jobs" ]; then
echo "ERROR: Jobs missing from needs array in all-pr-checks-pass:"
echo "$missing_jobs"
echo "Please add these jobs to the needs array of all-pr-checks-pass"
exit 1
fi

echo "All checks passed and no jobs missing from gate!"
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Changelog

All notable changes to this project will be documented in this file.

> **Note:** This CHANGELOG was created starting from version 0.12.0. Earlier changes are not documented here.

For any change that affects end users of this package, please add an entry under the **Unreleased** section. Briefly summarize the change and provide the link to the PR. Example:
- add GenAI attribute support for Amazon Bedrock models
([#300](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/300))

If your change does not need a CHANGELOG entry, add the "skip changelog" label to your PR.

## Unreleased
- [PATCH] Only decode JSON input buffer in Anthropic Claude streaming
([#497](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/497))
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,89 @@ def patched_extract_tool_calls(
tool_calls.append(tool_call)
return tool_calls

# TODO: The following code is to patch a bedrock bug that was fixed in
# opentelemetry-instrumentation-botocore==0.60b0 in:
# https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3875
# Remove this code once we've bumped opentelemetry-instrumentation-botocore dependency to 0.60b0
def patched_process_anthropic_claude_chunk(self, chunk):
# pylint: disable=too-many-return-statements,too-many-branches
if not (message_type := chunk.get("type")):
return

if message_type == "message_start":
# {'type': 'message_start', 'message': {'id': 'id', 'type': 'message', 'role': 'assistant',
# 'model': 'claude-2.0', 'content': [], 'stop_reason': None, 'stop_sequence': None,
# 'usage': {'input_tokens': 18, 'output_tokens': 1}}}
if chunk.get("message", {}).get("role") == "assistant":
self._record_message = True
message = chunk["message"]
self._message = {
"role": message["role"],
"content": message.get("content", []),
}
return

if message_type == "content_block_start":
# {'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}}
# {'type': 'content_block_start', 'index': 1, 'content_block':
# {'type': 'tool_use', 'id': 'id', 'name': 'func_name', 'input': {}}}
if self._record_message:
block = chunk.get("content_block", {})
if block.get("type") == "text":
self._content_block = block
elif block.get("type") == "tool_use":
self._content_block = block
return

if message_type == "content_block_delta":
# {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Here'}}
# {'type': 'content_block_delta', 'index': 1, 'delta': {'type': 'input_json_delta', 'partial_json': ''}}
if self._record_message:
delta = chunk.get("delta", {})
if delta.get("type") == "text_delta":
self._content_block["text"] += delta.get("text", "")
elif delta.get("type") == "input_json_delta":
self._tool_json_input_buf += delta.get("partial_json", "")
return

if message_type == "content_block_stop":
# {'type': 'content_block_stop', 'index': 0}
if self._tool_json_input_buf:
try:
self._content_block["input"] = json.loads(self._tool_json_input_buf)
except json.JSONDecodeError:
self._content_block["input"] = self._tool_json_input_buf
self._message["content"].append(self._content_block)
self._content_block = {}
self._tool_json_input_buf = ""
return

if message_type == "message_delta":
# {'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None},
# 'usage': {'output_tokens': 123}}
if (stop_reason := chunk.get("delta", {}).get("stop_reason")) is not None:
self._response["stopReason"] = stop_reason
return

if message_type == "message_stop":
# {'type': 'message_stop', 'amazon-bedrock-invocationMetrics':
# {'inputTokenCount': 18, 'outputTokenCount': 123, 'invocationLatency': 5250, 'firstByteLatency': 290}}
if invocation_metrics := chunk.get("amazon-bedrock-invocationMetrics"):
self._process_invocation_metrics(invocation_metrics)

if self._record_message:
self._response["output"] = {"message": self._message}
self._record_message = False
self._message = None

self._stream_done_callback(self._response)
return

bedrock_utils.ConverseStreamWrapper.__init__ = patched_init
bedrock_utils.ConverseStreamWrapper._process_event = patched_process_event
bedrock_utils.InvokeModelWithResponseStreamWrapper._process_anthropic_claude_chunk = (
patched_process_anthropic_claude_chunk
)
bedrock_utils.extract_tool_calls = patched_extract_tool_calls

# END The OpenTelemetry Authors code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ def _run_patch_behaviour_tests(self):
self._test_unpatched_botocore_propagator()
self._test_unpatched_gevent_instrumentation()
self._test_unpatched_starlette_instrumentation()
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
# Bedrock Runtime tests
self._test_unpatched_converse_stream_wrapper()
self._test_unpatched_extract_tool_calls()

# Apply patches
apply_instrumentation_patches()
Expand Down Expand Up @@ -178,6 +174,16 @@ def _test_unpatched_botocore_instrumentation(self):
# DynamoDB
self.assertTrue("dynamodb" in _KNOWN_EXTENSIONS, "Upstream has removed a DynamoDB extension")

# Bedrock Runtime tests
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
self._test_unpatched_converse_stream_wrapper()
self._test_unpatched_extract_tool_calls()

# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
self._test_unpatched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
self._test_unpatched_process_anthropic_claude_chunk(None, None)
self._test_unpatched_process_anthropic_claude_chunk({}, {})

def _test_unpatched_gevent_instrumentation(self):
self.assertFalse(gevent.monkey.is_module_patched("os"), "gevent os module has been patched")
self.assertFalse(gevent.monkey.is_module_patched("thread"), "gevent thread module has been patched")
Expand Down Expand Up @@ -223,10 +229,14 @@ def _test_patched_botocore_instrumentation(self):
# Bedrock Agent Operation
self._test_patched_bedrock_agent_instrumentation()

# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
# Bedrock Runtime
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
self._test_patched_converse_stream_wrapper()
self._test_patched_extract_tool_calls()
# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
self._test_patched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
self._test_patched_process_anthropic_claude_chunk(None, None)
self._test_patched_process_anthropic_claude_chunk({}, {})

# Bedrock Agent Runtime
self.assertTrue("bedrock-agent-runtime" in _KNOWN_EXTENSIONS)
Expand Down Expand Up @@ -600,6 +610,95 @@ def _test_patched_extract_tool_calls(self):
result = bedrock_utils.extract_tool_calls(message_with_string_content, True)
self.assertIsNone(result)

# Test with toolUse format to exercise the for loop
message_with_tool_use = {"role": "assistant", "content": [{"toolUse": {"toolUseId": "id1", "name": "func1"}}]}
result = bedrock_utils.extract_tool_calls(message_with_tool_use, True)
self.assertEqual(len(result), 1)

# Test with tool_use format to exercise the for loop
message_with_type_tool_use = {
"role": "assistant",
"content": [{"type": "tool_use", "id": "id2", "name": "func2"}],
}
result = bedrock_utils.extract_tool_calls(message_with_type_tool_use, True)
self.assertEqual(len(result), 1)

def _test_patched_process_anthropic_claude_chunk(
self, input_value: Dict[str, str], expected_output: Dict[str, str]
):
self._test_process_anthropic_claude_chunk(input_value, expected_output, False)

def _test_unpatched_process_anthropic_claude_chunk(
self, input_value: Dict[str, str], expected_output: Dict[str, str]
):
self._test_process_anthropic_claude_chunk(input_value, expected_output, True)

def _test_process_anthropic_claude_chunk(
self, input_value: Dict[str, str], expected_output: Dict[str, str], expect_exception: bool
):
"""Test that _process_anthropic_claude_chunk handles various tool_use input formats."""
wrapper = bedrock_utils.InvokeModelWithResponseStreamWrapper(
stream=MagicMock(),
stream_done_callback=MagicMock,
stream_error_callback=MagicMock,
model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
)

# Simulate message_start
wrapper._process_anthropic_claude_chunk(
{
"type": "message_start",
"message": {
"role": "assistant",
"content": [],
},
}
)

# Simulate content_block_start with specified input
content_block = {
"type": "tool_use",
"id": "test_id",
"name": "test_tool",
}
if input_value is not None:
content_block["input"] = input_value

wrapper._process_anthropic_claude_chunk(
{
"type": "content_block_start",
"index": 0,
"content_block": content_block,
}
)

# Simulate content_block_stop
try:
wrapper._process_anthropic_claude_chunk({"type": "content_block_stop", "index": 0})
except TypeError:
if expect_exception:
return
else:
raise

# Verify the message content
self.assertEqual(len(wrapper._message["content"]), 1)
tool_block = wrapper._message["content"][0]
self.assertEqual(tool_block["type"], "tool_use")
self.assertEqual(tool_block["id"], "test_id")
self.assertEqual(tool_block["name"], "test_tool")

if expected_output is not None:
self.assertEqual(tool_block["input"], expected_output)
self.assertIsInstance(tool_block["input"], dict)
else:
self.assertNotIn("input", tool_block)

# Just adding this to do basic sanity checks and increase code coverage
wrapper._process_anthropic_claude_chunk({"type": "content_block_delta", "index": 0})
wrapper._process_anthropic_claude_chunk({"type": "message_delta"})
wrapper._process_anthropic_claude_chunk({"type": "message_stop"})

def _test_patched_bedrock_agent_instrumentation(self):
"""For bedrock-agent service, both extract_attributes and on_success provides attributes,
the attributes depend on the API being invoked."""
Expand Down