Skip to content

Commit 525f0ef

Browse files
committed
feat: Add auto-instrumentation support for Kinesis Stream Consumer
1 parent a8bd238 commit 525f0ef

File tree

5 files changed

+38
-1
lines changed

5 files changed

+38
-1
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
AWS_SQS_QUEUE_URL: str = "aws.sqs.queue.url"
2020
AWS_SQS_QUEUE_NAME: str = "aws.sqs.queue.name"
2121
AWS_KINESIS_STREAM_NAME: str = "aws.kinesis.stream.name"
22+
AWS_KINESIS_STREAM_CONSUMERNAME: str = "aws.kinesis.stream.consumer_name"
2223
AWS_BEDROCK_DATA_SOURCE_ID: str = "aws.bedrock.data_source.id"
2324
AWS_BEDROCK_KNOWLEDGE_BASE_ID: str = "aws.bedrock.knowledge_base.id"
2425
AWS_BEDROCK_AGENT_ID: str = "aws.bedrock.agent.id"

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
AWS_BEDROCK_GUARDRAIL_ID,
1212
AWS_BEDROCK_KNOWLEDGE_BASE_ID,
1313
AWS_CLOUDFORMATION_PRIMARY_IDENTIFIER,
14+
AWS_KINESIS_STREAM_CONSUMERNAME,
1415
AWS_KINESIS_STREAM_NAME,
1516
AWS_LOCAL_OPERATION,
1617
AWS_LOCAL_SERVICE,
@@ -393,6 +394,9 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
393394
elif is_key_present(span, AWS_KINESIS_STREAM_NAME):
394395
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream"
395396
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_KINESIS_STREAM_NAME))
397+
elif is_key_present(span, AWS_KINESIS_STREAM_CONSUMERNAME):
398+
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::StreamConsumer"
399+
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_KINESIS_STREAM_CONSUMERNAME))
396400
elif is_key_present(span, _AWS_BUCKET_NAME):
397401
remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket"
398402
remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME))

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import importlib
55

66
from amazon.opentelemetry.distro._aws_attribute_keys import (
7+
AWS_KINESIS_STREAM_CONSUMERNAME,
78
AWS_KINESIS_STREAM_NAME,
89
AWS_SECRETSMANAGER_SECRET_ARN,
910
AWS_SNS_TOPIC_ARN,
@@ -204,3 +205,6 @@ def extract_attributes(self, attributes: _AttributeMapT):
204205
stream_name = self._call_context.params.get("StreamName")
205206
if stream_name:
206207
attributes[AWS_KINESIS_STREAM_NAME] = stream_name
208+
consumer_name = self._call_context.params.get("ConsumerName")
209+
if consumer_name:
210+
attributes[AWS_KINESIS_STREAM_CONSUMERNAME] = consumer_name

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_metric_attribute_generator.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
AWS_BEDROCK_GUARDRAIL_ID,
1414
AWS_BEDROCK_KNOWLEDGE_BASE_ID,
1515
AWS_CONSUMER_PARENT_SPAN_KIND,
16+
AWS_KINESIS_STREAM_CONSUMERNAME,
1617
AWS_KINESIS_STREAM_NAME,
1718
AWS_LOCAL_OPERATION,
1819
AWS_LOCAL_SERVICE,
@@ -1142,6 +1143,30 @@ def test_sdk_client_span_with_remote_resource_attributes(self):
11421143
)
11431144
self._mock_attribute([AWS_STEPFUNCTIONS_ACTIVITY_ARN], [None])
11441145

1146+
# Validate behaviour of AWS_KINESIS_STREAM_CONSUMERNAME present, then remove it.
1147+
self._mock_attribute(
1148+
[AWS_KINESIS_STREAM_CONSUMERNAME],
1149+
["aws_stream_consumername"],
1150+
keys,
1151+
values,
1152+
)
1153+
self._validate_remote_resource_attributes(
1154+
"AWS::Kinesis::StreamConsumer",
1155+
"aws_stream_consumername"
1156+
)
1157+
self._mock_attribute([AWS_KINESIS_STREAM_CONSUMERNAME], [None])
1158+
1159+
# Validate behaviour with both AWS_KINESIS_STREAM_NAME and AWS_KINESIS_STREAM_CONSUMERNAME
1160+
# present, then remove it.
1161+
self._mock_attribute(
1162+
[AWS_KINESIS_STREAM_NAME, AWS_KINESIS_STREAM_CONSUMERNAME],
1163+
["aws_stream_name", "aws_stream_consumername"],
1164+
keys,
1165+
values,
1166+
)
1167+
self._validate_remote_resource_attributes("AWS::Kinesis::Stream", "aws_stream_name")
1168+
self._mock_attribute([AWS_KINESIS_STREAM_NAME, AWS_KINESIS_STREAM_CONSUMERNAME], [None, None])
1169+
11451170
self._mock_attribute([SpanAttributes.RPC_SYSTEM], [None])
11461171

11471172
def test_client_db_span_with_remote_resource_attributes(self):

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from opentelemetry.trace.span import Span
1818

1919
_STREAM_NAME: str = "streamName"
20+
_CONSUMER_NAME: str = "consumerName"
2021
_BUCKET_NAME: str = "bucketName"
2122
_QUEUE_NAME: str = "queueName"
2223
_QUEUE_URL: str = "queueUrl"
@@ -175,6 +176,8 @@ def _test_patched_botocore_instrumentation(self):
175176
kinesis_attributes: Dict[str, str] = _do_extract_kinesis_attributes()
176177
self.assertTrue("aws.kinesis.stream.name" in kinesis_attributes)
177178
self.assertEqual(kinesis_attributes["aws.kinesis.stream.name"], _STREAM_NAME)
179+
self.assertTrue("aws.kinesis.stream.consumer_name" in kinesis_attributes)
180+
self.assertEqual(kinesis_attributes["aws.kinesis.stream.consumer_name"], _CONSUMER_NAME)
178181

179182
# S3
180183
self.assertTrue("s3" in _KNOWN_EXTENSIONS)
@@ -356,7 +359,7 @@ def _reset_mocks(self):
356359

357360
def _do_extract_kinesis_attributes() -> Dict[str, str]:
358361
service_name: str = "kinesis"
359-
params: Dict[str, str] = {"StreamName": _STREAM_NAME}
362+
params: Dict[str, str] = {"StreamName": _STREAM_NAME, "ConsumerName": _CONSUMER_NAME}
360363
return _do_extract_attributes(service_name, params)
361364

362365

0 commit comments

Comments
 (0)