Skip to content

Commit 7671f4e

Browse files
authored
feat: All AWS services emit traces (#1150)
* feat: All V3 services emit traces * refactor: Clean up Handler and MessageHelper * Revise based on feedback * Fix CI * Correctly handle V2 service clients * Add comments * Fix loaded_service check * Fix loaded_service check to be compatible with V3
1 parent a174d03 commit 7671f4e

File tree

5 files changed

+116
-189
lines changed

5 files changed

+116
-189
lines changed

instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/handler.rb

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,32 @@ module Instrumentation
99
module AwsSdk
1010
# Generates Spans for all interactions with AwsSdk
1111
class Handler < Seahorse::Client::Handler
12-
SQS_SEND_MESSAGE = 'SQS.SendMessage'
13-
SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
14-
SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
15-
SNS_PUBLISH = 'SNS.Publish'
16-
1712
def call(context)
1813
return super unless context
1914

20-
service_name = service_name(context)
15+
service_id = service_name(context)
2116
operation = context.operation&.name
22-
client_method = "#{service_name}.#{operation}"
23-
attributes = {
24-
'aws.region' => context.config.region,
25-
OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
26-
OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation,
27-
OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_name
28-
}
29-
attributes[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_name == 'DynamoDB'
30-
MessagingHelper.apply_sqs_attributes(attributes, context, client_method) if service_name == 'SQS'
31-
MessagingHelper.apply_sns_attributes(attributes, context, client_method) if service_name == 'SNS'
17+
client_method = "#{service_id}.#{operation}"
18+
19+
tracer.in_span(
20+
span_name(context, client_method, service_id),
21+
attributes: attributes(context, client_method, service_id, operation),
22+
kind: span_kind(client_method, service_id)
23+
) do |span|
24+
if instrumentation_config[:inject_messaging_context] &&
25+
%w[SQS SNS].include?(service_id)
26+
MessagingHelper.inject_context(context, client_method)
27+
end
3228

33-
tracer.in_span(span_name(context, client_method), attributes: attributes, kind: span_kind(client_method)) do |span|
34-
inject_context(context, client_method)
3529
if instrumentation_config[:suppress_internal_instrumentation]
3630
OpenTelemetry::Common::Utilities.untraced { super }
3731
else
3832
super
3933
end.tap do |response|
40-
span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE,
41-
context.http_response.status_code)
34+
span.set_attribute(
35+
OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE,
36+
context.http_response.status_code
37+
)
4238

4339
if (err = response.error)
4440
span.record_exception(err)
@@ -65,48 +61,40 @@ def service_name(context)
6561
context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1]
6662
end
6763

68-
SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze
69-
def inject_context(context, client_method)
70-
return unless SEND_MESSAGE_CLIENT_METHODS.include? client_method
71-
return unless instrumentation_config[:inject_messaging_context]
72-
73-
if client_method == SQS_SEND_MESSAGE_BATCH
74-
context.params[:entries].each do |entry|
75-
entry[:message_attributes] ||= {}
76-
OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter)
77-
end
64+
def span_kind(client_method, service_id)
65+
case service_id
66+
when 'SQS', 'SNS'
67+
MessagingHelper.span_kind(client_method)
7868
else
79-
context.params[:message_attributes] ||= {}
80-
OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
69+
OpenTelemetry::Trace::SpanKind::CLIENT
8170
end
8271
end
8372

84-
def span_kind(client_method)
85-
case client_method
86-
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
87-
OpenTelemetry::Trace::SpanKind::PRODUCER
88-
when SQS_RECEIVE_MESSAGE
89-
OpenTelemetry::Trace::SpanKind::CONSUMER
73+
def span_name(context, client_method, service_id)
74+
case service_id
75+
when 'SQS', 'SNS'
76+
MessagingHelper.legacy_span_name(context, client_method)
9077
else
91-
OpenTelemetry::Trace::SpanKind::CLIENT
78+
client_method
9279
end
9380
end
9481

95-
def span_name(context, client_method)
96-
case client_method
97-
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
98-
"#{MessagingHelper.queue_name(context)} publish"
99-
when SQS_RECEIVE_MESSAGE
100-
"#{MessagingHelper.queue_name(context)} receive"
101-
else
102-
client_method
82+
def attributes(context, client_method, service_id, operation)
83+
{
84+
'aws.region' => context.config.region,
85+
OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
86+
OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation,
87+
OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_id
88+
}.tap do |attrs|
89+
attrs[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_id == 'DynamoDB'
90+
MessagingHelper.apply_span_attributes(context, attrs, client_method, service_id) if %w[SQS SNS].include?(service_id)
10391
end
10492
end
10593
end
10694

10795
# A Seahorse::Client::Plugin that enables instrumentation for all AWS services
10896
class Plugin < Seahorse::Client::Plugin
109-
def add_handlers(handlers, config)
97+
def add_handlers(handlers, _config)
11098
# run before Seahorse::Client::Plugin::ParamValidator (priority 50)
11199
handlers.add Handler, step: :validate, priority: 49
112100
end

instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/instrumentation.rb

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base
1313

1414
install do |_config|
1515
require_dependencies
16-
add_plugin(Seahorse::Client::Base, *loaded_constants)
16+
add_plugins(Seahorse::Client::Base, *loaded_service_clients)
1717
end
1818

1919
present do
@@ -41,31 +41,39 @@ def gem_version
4141

4242
def require_dependencies
4343
require_relative 'handler'
44-
require_relative 'services'
4544
require_relative 'message_attributes'
4645
require_relative 'messaging_helper'
4746
end
4847

49-
def add_plugin(*targets)
48+
def add_plugins(*targets)
5049
targets.each { |klass| klass.add_plugin(AwsSdk::Plugin) }
5150
end
5251

53-
def loaded_constants
54-
# Cross-check services against loaded AWS constants
55-
# Module#const_get can return a constant from ancestors when there's a miss.
56-
# If this conincidentally matches another constant, it will attempt to patch
57-
# the wrong constant, resulting in patch failure.
58-
available_services = ::Aws.constants & SERVICES.map(&:to_sym)
59-
available_services.each_with_object([]) do |service, constants|
60-
next if ::Aws.autoload?(service)
52+
def loaded_service_clients
53+
::Aws.constants.each_with_object([]) do |c, constants|
54+
m = ::Aws.const_get(c)
55+
next unless loaded_service?(c, m)
6156

6257
begin
63-
constants << ::Aws.const_get(service, false).const_get(:Client, false)
58+
constants << m.const_get(:Client)
6459
rescue StandardError => e
6560
OpenTelemetry.logger.warn("Constant could not be loaded: #{e}")
6661
end
6762
end
6863
end
64+
65+
# This check does the following:
66+
# 1 - Checks if the service client is autoload or not
67+
# 2 - Validates whether if is a service client
68+
# note that Seahorse::Client::Base is a superclass for V3 clients
69+
# but for V2, it is Aws::Client
70+
def loaded_service?(constant, service_module)
71+
!::Aws.autoload?(constant) &&
72+
service_module.is_a?(Module) &&
73+
service_module.const_defined?(:Client) &&
74+
(service_module.const_get(:Client).superclass == Seahorse::Client::Base ||
75+
service_module.const_get(:Client).superclass == Aws::Client)
76+
end
6977
end
7078
end
7179
end

instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/messaging_helper.rb

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,15 @@
77
module OpenTelemetry
88
module Instrumentation
99
module AwsSdk
10-
# MessagingHelper class provides methods for calculating messaging span attributes
10+
# An utility class to help SQS/SNS-related span attributes/context injection
1111
class MessagingHelper
1212
class << self
13+
SQS_SEND_MESSAGE = 'SQS.SendMessage'
14+
SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
15+
SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
16+
SNS_PUBLISH = 'SNS.Publish'
17+
SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze
18+
1319
def queue_name(context)
1420
topic_arn = context.params[:topic_arn]
1521
target_arn = context.params[:target_arn]
@@ -28,19 +34,64 @@ def queue_name(context)
2834
'unknown'
2935
end
3036

37+
def legacy_span_name(context, client_method)
38+
case client_method
39+
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
40+
"#{MessagingHelper.queue_name(context)} publish"
41+
when SQS_RECEIVE_MESSAGE
42+
"#{MessagingHelper.queue_name(context)} receive"
43+
else
44+
client_method
45+
end
46+
end
47+
48+
def apply_span_attributes(context, attrs, client_method, service_id)
49+
case service_id
50+
when 'SQS'
51+
apply_sqs_attributes(attrs, context, client_method)
52+
when 'SNS'
53+
apply_sns_attributes(attrs, context, client_method)
54+
end
55+
end
56+
57+
def span_kind(client_method)
58+
case client_method
59+
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
60+
OpenTelemetry::Trace::SpanKind::PRODUCER
61+
when SQS_RECEIVE_MESSAGE
62+
OpenTelemetry::Trace::SpanKind::CONSUMER
63+
else
64+
OpenTelemetry::Trace::SpanKind::CLIENT
65+
end
66+
end
67+
68+
def inject_context(context, client_method)
69+
return unless SEND_MESSAGE_CLIENT_METHODS.include?(client_method)
70+
71+
if client_method == SQS_SEND_MESSAGE_BATCH
72+
context.params[:entries].each do |entry|
73+
entry[:message_attributes] ||= {}
74+
OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter)
75+
end
76+
else
77+
context.params[:message_attributes] ||= {}
78+
OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
79+
end
80+
end
81+
82+
private
83+
3184
def apply_sqs_attributes(attributes, context, client_method)
3285
attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sqs'
3386
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'queue'
3487
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context)
3588
attributes[SemanticConventions::Trace::MESSAGING_URL] = context.params[:queue_url] if context.params[:queue_url]
36-
37-
attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == 'SQS.ReceiveMessage'
89+
attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == SQS_RECEIVE_MESSAGE
3890
end
3991

4092
def apply_sns_attributes(attributes, context, client_method)
4193
attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sns'
42-
43-
return unless client_method == 'SNS.Publish'
94+
return unless client_method == SNS_PUBLISH
4495

4596
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'topic'
4697
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context)

instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/services.rb

Lines changed: 0 additions & 121 deletions
This file was deleted.

0 commit comments

Comments
 (0)