Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

package io.opentelemetry.instrumentation.awssdk.v2_2.internal;

import static io.opentelemetry.api.common.AttributeKey.booleanKey;
import static io.opentelemetry.api.common.AttributeKey.doubleKey;
import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringArrayKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsSdkRequestType.BEDROCK_RUNTIME;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsSdkRequestType.DYNAMODB;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsSdkRequestType.KINESIS;
Expand All @@ -13,7 +18,7 @@
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsSdkRequestType.SECRETSMANAGER;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsSdkRequestType.SNS;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsSdkRequestType.SQS;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsSdkRequestType.STEPFUNCTIONS;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsSdkRequestType.STEP_FUNCTIONS;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.FieldMapping.request;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.FieldMapping.response;

Expand All @@ -40,97 +45,118 @@ enum AwsSdkRequest {
KinesisRequest(KINESIS, "KinesisRequest"),
LambdaRequest(LAMBDA, "LambdaRequest"),
SecretsManagerRequest(SECRETSMANAGER, "SecretsManagerRequest"),
StepFunctionsRequest(STEPFUNCTIONS, "SfnRequest"),
StepFunctionsRequest(STEP_FUNCTIONS, "SfnRequest"),
// specific requests
BatchGetItem(
DYNAMODB,
"dynamodb.model.BatchGetItemRequest",
request("aws.dynamodb.table_names", "RequestItems"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity")),
request(stringArrayKey("aws.dynamodb.table_names"), "RequestItems"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity")),
BatchWriteItem(
DYNAMODB,
"dynamodb.model.BatchWriteItemRequest",
request("aws.dynamodb.table_names", "RequestItems"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "ItemCollectionMetrics")),
request(stringArrayKey("aws.dynamodb.table_names"), "RequestItems"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity"),
response(stringKey("aws.dynamodb.item_collection_metrics"), "ItemCollectionMetrics")),
CreateTable(
DYNAMODB,
"dynamodb.model.CreateTableRequest",
request("aws.dynamodb.global_secondary_indexes", "GlobalSecondaryIndexes"),
request("aws.dynamodb.local_secondary_indexes", "LocalSecondaryIndexes"),
request(stringArrayKey("aws.dynamodb.table_names"), "TableName"),
request(stringArrayKey("aws.dynamodb.global_secondary_indexes"), "GlobalSecondaryIndexes"),
request(stringArrayKey("aws.dynamodb.local_secondary_indexes"), "LocalSecondaryIndexes"),
request(
"aws.dynamodb.provisioned_throughput.read_capacity_units",
doubleKey("aws.dynamodb.provisioned_read_capacity"),
"ProvisionedThroughput.ReadCapacityUnits"),
request(
"aws.dynamodb.provisioned_throughput.write_capacity_units",
"ProvisionedThroughput.WriteCapacityUnits")),
doubleKey("aws.dynamodb.provisioned_write_capacity"),
"ProvisionedThroughput.WriteCapacityUnits"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity"),
response(stringKey("aws.dynamodb.item_collection_metrics"), "ItemCollectionMetrics")),
DeleteItem(
DYNAMODB,
"dynamodb.model.DeleteItemRequest",
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "ItemCollectionMetrics")),
request(stringArrayKey("aws.dynamodb.table_names"), "TableName"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity"),
response(stringKey("aws.dynamodb.item_collection_metrics"), "ItemCollectionMetrics")),
DeleteTable(
DYNAMODB,
"dynamodb.model.DeleteTableRequest",
request(stringArrayKey("aws.dynamodb.table_names"), "TableName")),
DescribeTable(
DYNAMODB,
"dynamodb.model.DescribeTableRequest",
request(stringArrayKey("aws.dynamodb.table_names"), "TableName")),
GetItem(
DYNAMODB,
"dynamodb.model.GetItemRequest",
request("aws.dynamodb.projection_expression", "ProjectionExpression"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
request("aws.dynamodb.consistent_read", "ConsistentRead")),
request(stringArrayKey("aws.dynamodb.table_names"), "TableName"),
request(stringKey("aws.dynamodb.projection"), "ProjectionExpression"),
request(booleanKey("aws.dynamodb.consistent_read"), "ConsistentRead"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity")),
ListTables(
DYNAMODB,
"dynamodb.model.ListTablesRequest",
request("aws.dynamodb.exclusive_start_table_name", "ExclusiveStartTableName"),
response("aws.dynamodb.table_count", "TableNames"),
request("aws.dynamodb.limit", "Limit")),
request(stringKey("aws.dynamodb.exclusive_start_table"), "ExclusiveStartTableName"),
response(longKey("aws.dynamodb.table_count"), "TableNames"),
request(longKey("aws.dynamodb.limit"), "Limit")),
PutItem(
DYNAMODB,
"dynamodb.model.PutItemRequest",
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "ItemCollectionMetrics")),
request(stringArrayKey("aws.dynamodb.table_names"), "TableName"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity"),
response(stringKey("aws.dynamodb.item_collection_metrics"), "ItemCollectionMetrics")),
Query(
DYNAMODB,
"dynamodb.model.QueryRequest",
request("aws.dynamodb.attributes_to_get", "AttributesToGet"),
request("aws.dynamodb.consistent_read", "ConsistentRead"),
request("aws.dynamodb.index_name", "IndexName"),
request("aws.dynamodb.limit", "Limit"),
request("aws.dynamodb.projection_expression", "ProjectionExpression"),
request("aws.dynamodb.scan_index_forward", "ScanIndexForward"),
request("aws.dynamodb.select", "Select"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity")),
request(stringArrayKey("aws.dynamodb.table_names"), "TableName"),
request(stringArrayKey("aws.dynamodb.attributes_to_get"), "AttributesToGet"),
request(booleanKey("aws.dynamodb.consistent_read"), "ConsistentRead"),
request(stringKey("aws.dynamodb.index_name"), "IndexName"),
request(longKey("aws.dynamodb.limit"), "Limit"),
request(stringKey("aws.dynamodb.projection"), "ProjectionExpression"),
request(booleanKey("aws.dynamodb.scan_forward"), "ScanIndexForward"),
request(stringKey("aws.dynamodb.select"), "Select"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity")),
Scan(
DYNAMODB,
"dynamodb.model.ScanRequest",
request("aws.dynamodb.attributes_to_get", "AttributesToGet"),
request("aws.dynamodb.consistent_read", "ConsistentRead"),
request("aws.dynamodb.index_name", "IndexName"),
request("aws.dynamodb.limit", "Limit"),
request("aws.dynamodb.projection_expression", "ProjectionExpression"),
request("aws.dynamodb.segment", "Segment"),
request("aws.dynamodb.select", "Select"),
request("aws.dynamodb.total_segments", "TotalSegments"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.count", "Count"),
response("aws.dynamodb.scanned_count", "ScannedCount")),
request(stringArrayKey("aws.dynamodb.table_names"), "TableName"),
request(stringArrayKey("aws.dynamodb.attributes_to_get"), "AttributesToGet"),
request(booleanKey("aws.dynamodb.consistent_read"), "ConsistentRead"),
request(stringKey("aws.dynamodb.index_name"), "IndexName"),
request(longKey("aws.dynamodb.limit"), "Limit"),
request(stringKey("aws.dynamodb.projection"), "ProjectionExpression"),
request(longKey("aws.dynamodb.segment"), "Segment"),
request(stringKey("aws.dynamodb.select"), "Select"),
request(longKey("aws.dynamodb.total_segments"), "TotalSegments"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity"),
response(longKey("aws.dynamodb.count"), "Count"),
response(longKey("aws.dynamodb.scanned_count"), "ScannedCount")),
UpdateItem(
DYNAMODB,
"dynamodb.model.UpdateItemRequest",
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "ItemCollectionMetrics")),
request(stringArrayKey("aws.dynamodb.table_names"), "TableName"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity"),
response(stringKey("aws.dynamodb.item_collection_metrics"), "ItemCollectionMetrics")),
UpdateTable(
DYNAMODB,
"dynamodb.model.UpdateTableRequest",
request("aws.dynamodb.attribute_definitions", "AttributeDefinitions"),
request("aws.dynamodb.global_secondary_index_updates", "GlobalSecondaryIndexUpdates"),
request(stringArrayKey("aws.dynamodb.table_names"), "TableName"),
request(stringArrayKey("aws.dynamodb.attribute_definitions"), "AttributeDefinitions"),
request(
stringArrayKey("aws.dynamodb.global_secondary_index_updates"),
"GlobalSecondaryIndexUpdates"),
request(
"aws.dynamodb.provisioned_throughput.read_capacity_units",
doubleKey("aws.dynamodb.provisioned_read_capacity"),
"ProvisionedThroughput.ReadCapacityUnits"),
request(
"aws.dynamodb.provisioned_throughput.write_capacity_units",
"ProvisionedThroughput.WriteCapacityUnits")),
doubleKey("aws.dynamodb.provisioned_write_capacity"),
"ProvisionedThroughput.WriteCapacityUnits"),
response(stringArrayKey("aws.dynamodb.consumed_capacity"), "ConsumedCapacity")),
ConverseRequest(
BEDROCK_RUNTIME,
"bedrockruntime.model.ConverseRequest",
request("gen_ai.request.model", "modelId"));
request(stringKey("gen_ai.request.model"), "modelId"));

private final AwsSdkRequestType type;
private final String requestClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,40 @@
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsExperimentalAttributes.AWS_LAMBDA_ARN;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.AwsExperimentalAttributes.AWS_LAMBDA_NAME;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.FieldMapping.request;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.FieldMapping.requestExperimental;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.FieldMapping.response;
import static io.opentelemetry.instrumentation.awssdk.v2_2.internal.FieldMapping.responseExperimental;

import io.opentelemetry.api.common.AttributeKey;
import java.util.Collections;
import java.util.List;
import java.util.Map;

enum AwsSdkRequestType {
S3(request("aws.bucket.name", "Bucket")),
SQS(request("aws.queue.url", "QueueUrl"), request("aws.queue.name", "QueueName")),
KINESIS(request("aws.stream.name", "StreamName")),
DYNAMODB(request("aws.table.name", "TableName")),
S3(request(AttributeKeys.AWS_S3_BUCKET, "Bucket")),
SQS(
request(AttributeKeys.AWS_SQS_QUEUE_URL, "QueueUrl"),
requestExperimental(AttributeKey.stringKey("aws.queue.name"), "QueueName")),
KINESIS(request(AttributeKeys.AWS_KINESIS_STREAM_NAME, "StreamName")),
DYNAMODB(),
BEDROCK_RUNTIME(),
LAMBDA(
request(AWS_LAMBDA_NAME.getKey(), "FunctionName"),
request(AttributeKeys.AWS_LAMBDA_RESOURCE_MAPPING_ID.getKey(), "UUID"),
response(AWS_LAMBDA_ARN.getKey(), "Configuration.FunctionArn"),
response(AttributeKeys.AWS_LAMBDA_RESOURCE_MAPPING_ID.getKey(), "UUID")),
SECRETSMANAGER(response(AttributeKeys.AWS_SECRETSMANAGER_SECRET_ARN.getKey(), "ARN")),
requestExperimental(AWS_LAMBDA_NAME, "FunctionName"),
request(AttributeKeys.AWS_LAMBDA_RESOURCE_MAPPING_ID, "UUID"),
responseExperimental(AWS_LAMBDA_ARN, "Configuration.FunctionArn"),
response(AttributeKeys.AWS_LAMBDA_RESOURCE_MAPPING_ID, "UUID")),
SECRETSMANAGER(response(AttributeKeys.AWS_SECRETSMANAGER_SECRET_ARN, "ARN")),
SNS(
/*
* Only one of TopicArn and TargetArn are permitted on an SNS request.
*/
request(AttributeKeys.MESSAGING_DESTINATION_NAME.getKey(), "TargetArn"),
request(AttributeKeys.MESSAGING_DESTINATION_NAME.getKey(), "TopicArn"),
request(AttributeKeys.AWS_SNS_TOPIC_ARN.getKey(), "TopicArn"),
response(AttributeKeys.AWS_SNS_TOPIC_ARN.getKey(), "TopicArn")),
STEPFUNCTIONS(
request(AttributeKeys.AWS_STEP_FUNCTIONS_STATE_MACHINE_ARN.getKey(), "stateMachineArn"),
request(AttributeKeys.AWS_STEP_FUNCTIONS_ACTIVITY_ARN.getKey(), "activityArn"));
request(AttributeKeys.MESSAGING_DESTINATION_NAME, "TargetArn"),
request(AttributeKeys.MESSAGING_DESTINATION_NAME, "TopicArn"),
request(AttributeKeys.AWS_SNS_TOPIC_ARN, "TopicArn"),
response(AttributeKeys.AWS_SNS_TOPIC_ARN, "TopicArn")),
STEP_FUNCTIONS(
request(AttributeKeys.AWS_STEP_FUNCTIONS_STATE_MACHINE_ARN, "stateMachineArn"),
request(AttributeKeys.AWS_STEP_FUNCTIONS_ACTIVITY_ARN, "activityArn"));

// Wrapping in unmodifiableMap
@SuppressWarnings("ImmutableEnumChecker")
Expand All @@ -63,6 +67,10 @@ private static class AttributeKeys {
stringKey("aws.step_functions.activity.arn");
static final AttributeKey<String> AWS_STEP_FUNCTIONS_STATE_MACHINE_ARN =
stringKey("aws.step_functions.state_machine.arn");
static final AttributeKey<String> AWS_S3_BUCKET = stringKey("aws.s3.bucket");
static final AttributeKey<String> AWS_SQS_QUEUE_URL = stringKey("aws.sqs.queue.url");
static final AttributeKey<String> AWS_KINESIS_STREAM_NAME =
stringKey("aws.kinesis.stream_name");

// copied from MessagingIncubatingAttributes
static final AttributeKey<String> MESSAGING_DESTINATION_NAME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package io.opentelemetry.instrumentation.awssdk.v2_2.internal;

import io.opentelemetry.api.trace.Span;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
Expand All @@ -17,15 +20,24 @@ class FieldMapper {

private final Serializer serializer;
private final MethodHandleFactory methodHandleFactory;
private final boolean captureExperimentalSpanAttributes;

FieldMapper() {
serializer = new Serializer();
methodHandleFactory = new MethodHandleFactory();
FieldMapper(boolean captureExperimentalSpanAttributes) {
this(new Serializer(), new MethodHandleFactory(), captureExperimentalSpanAttributes);
}

// visible for testing
FieldMapper(Serializer serializer, MethodHandleFactory methodHandleFactory) {
this(serializer, methodHandleFactory, true);
}

private FieldMapper(
Serializer serializer,
MethodHandleFactory methodHandleFactory,
boolean captureExperimentalSpanAttributes) {
this.methodHandleFactory = methodHandleFactory;
this.serializer = serializer;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
}

void mapToAttributes(SdkRequest sdkRequest, AwsSdkRequest request, Span span) {
Expand Down Expand Up @@ -59,16 +71,63 @@ private void mapToAttributes(

private void mapToAttributes(
Function<String, Object> fieldValueProvider, FieldMapping fieldMapping, Span span) {
if (!captureExperimentalSpanAttributes && fieldMapping.isExperimental()) {
return;
}

// traverse path
List<String> path = fieldMapping.getFields();
Object target = fieldValueProvider.apply(path.get(0));
for (int i = 1; i < path.size() && target != null; i++) {
target = next(target, path.get(i));
}
if (target != null) {
String value = serializer.serialize(target);
if (!StringUtils.isEmpty(value)) {
span.setAttribute(fieldMapping.getAttribute(), value);
switch (fieldMapping.getAttributeType()) {
case STRING:
String stringValue = serializer.serialize(target);
if (!StringUtils.isEmpty(stringValue)) {
span.setAttribute(fieldMapping.getAttributeKey(), stringValue);
}
break;
case DOUBLE:
if (target instanceof Number) {
span.setAttribute(fieldMapping.getAttributeKey(), ((Number) target).doubleValue());
}
break;
case LONG:
if (target instanceof Number) {
span.setAttribute(fieldMapping.getAttributeKey(), ((Number) target).longValue());
} else if (target instanceof Collection) {
// map to collection size
span.setAttribute(fieldMapping.getAttributeKey(), ((Collection<?>) target).size());
}
break;
case BOOLEAN:
if (target instanceof Boolean) {
span.setAttribute(fieldMapping.getAttributeKey(), (Boolean) target);
}
break;
case STRING_ARRAY:
if (target instanceof Map) {
target = ((Map<?, ?>) target).keySet();
}
if (target instanceof Collection) {
List<String> value = serializer.serializeCollection((Collection<?>) target);
if (!value.isEmpty()) {
span.setAttribute(fieldMapping.getAttributeKey(), value);
}
} else {
String value = serializer.serialize(target);
if (!StringUtils.isEmpty(value)) {
span.setAttribute(fieldMapping.getAttributeKey(), Collections.singletonList(value));
}
}
break;
default:
// shouldn't reach here because FieldMapping constructor already rejects other attribute
// types
throw new IllegalStateException(
"Unsupported attribute type: " + fieldMapping.getAttributeType());
}
}
}
Expand Down
Loading
Loading