Skip to content

Commit 4d125e6

Browse files
authored
implement specific instrumentation for AWS SQS (#1123)
* implement specific instrumentation for AWS SQS closes #1026 * added docs
1 parent 2edeed2 commit 4d125e6

File tree

4 files changed

+153
-5
lines changed

4 files changed

+153
-5
lines changed

docs/supported-technologies.asciidoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,12 @@ Additionally, some services collect more specific data
543543

544544
* Topic name
545545

546+
[float]
547+
[[automatic-instrumentation-sqs]]
548+
===== SQS
549+
550+
* Queue name
551+
546552
[float]
547553
[[automatic-instrumentation-template-engines]]
548554
=== Template Engines

elasticapm/instrumentation/packages/botocore.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,14 @@
3030

3131
from collections import namedtuple
3232

33+
from elasticapm.conf import constants
3334
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
3435
from elasticapm.traces import capture_span
3536
from elasticapm.utils.compat import urlparse
37+
from elasticapm.utils.logging import get_logger
38+
39+
logger = get_logger("elasticapm.instrument")
40+
3641

3742
HandlerInfo = namedtuple("HandlerInfo", ("signature", "span_type", "span_subtype", "span_action", "context"))
3843

@@ -81,7 +86,9 @@ def call(self, module, method, wrapped, instance, args, kwargs):
8186
span_subtype=handler_info.span_subtype,
8287
span_action=handler_info.span_action,
8388
extra=handler_info.context,
84-
):
89+
) as span:
90+
if service in span_modifiers:
91+
span_modifiers[service](span, args, kwargs)
8592
return wrapped(*args, **kwargs)
8693

8794

@@ -141,8 +148,46 @@ def handle_sns(operation_name, service, instance, args, kwargs, context):
141148
return HandlerInfo(signature, span_type, span_subtype, span_action, context)
142149

143150

144-
def handle_sqs(operation_name, service, instance, args, kwargs, destination):
145-
pass
151+
def handle_sqs(operation_name, service, instance, args, kwargs, context):
152+
if operation_name not in ("SendMessage", "SendMessageBatch", "ReceiveMessage"):
153+
# only "publish" is handled specifically, other endpoints get the default treatment
154+
return False
155+
span_type = "messaging"
156+
span_subtype = "sqs"
157+
span_action = "send" if operation_name in ("SendMessage", "SendMessageBatch") else "receive"
158+
topic_name = ""
159+
batch = "_BATCH" if operation_name == "SendMessageBatch" else ""
160+
signature_type = "RECEIVE from" if span_action == "receive" else f"SEND{batch} to"
161+
162+
if len(args) > 1:
163+
topic_name = args[1]["QueueUrl"].rsplit("/", maxsplit=1)[-1]
164+
signature = f"SQS {signature_type} {topic_name}".rstrip() if topic_name else f"SQS {signature_type}"
165+
context["destination"]["service"] = {
166+
"name": span_subtype,
167+
"resource": f"{span_subtype}/{topic_name}" if topic_name else span_subtype,
168+
"type": span_type,
169+
}
170+
return HandlerInfo(signature, span_type, span_subtype, span_action, context)
171+
172+
173+
def modify_span_sqs(span, args, kwargs):
174+
trace_parent = span.transaction.trace_parent.copy_from(span_id=span.id)
175+
attributes = {constants.TRACEPARENT_HEADER_NAME: {"DataType": "String", "StringValue": trace_parent.to_string()}}
176+
if trace_parent.tracestate:
177+
attributes[constants.TRACESTATE_HEADER_NAME] = {"DataType": "String", "StringValue": trace_parent.tracestate}
178+
if len(args) > 1:
179+
attributes_count = len(attributes)
180+
if "MessageAttributes" in args[1]:
181+
messages = [args[1]]
182+
# elif "Entries" in args[1]:
183+
# messages = args[1]["Entries"]
184+
else:
185+
messages = []
186+
for message in messages:
187+
if len(message["MessageAttributes"]) + attributes_count <= 10:
188+
message["MessageAttributes"].update(attributes)
189+
else:
190+
logger.info("Not adding disttracing headers to message due to attribute limit reached")
146191

147192

148193
def handle_default(operation_name, service, instance, args, kwargs, destination):
@@ -160,5 +205,10 @@ def handle_default(operation_name, service, instance, args, kwargs, destination)
160205
"S3": handle_s3,
161206
"DynamoDB": handle_dynamodb,
162207
"SNS": handle_sns,
208+
"SQS": handle_sqs,
163209
"default": handle_default,
164210
}
211+
212+
span_modifiers = {
213+
"SQS": modify_span_sqs,
214+
}

elasticapm/traces.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,16 @@ def set_success(self):
120120
def set_failure(self):
121121
self.outcome = "failure"
122122

123+
@staticmethod
124+
def get_dist_tracing_id():
125+
return "%016x" % random.getrandbits(64)
126+
123127

124128
class Transaction(BaseSpan):
125129
def __init__(
126130
self, tracer, transaction_type="custom", trace_parent=None, is_sampled=True, start=None, sample_rate=None
127131
):
128-
self.id = "%016x" % random.getrandbits(64)
132+
self.id = self.get_dist_tracing_id()
129133
self.trace_parent = trace_parent
130134
if start:
131135
self.timestamp = self.start_time = start
@@ -402,7 +406,7 @@ def __init__(
402406
:param start: timestamp, mostly useful for testing
403407
"""
404408
self.start_time = start or _time_func()
405-
self.id = "%016x" % random.getrandbits(64)
409+
self.id = self.get_dist_tracing_id()
406410
self.transaction = transaction
407411
self.name = name
408412
self.context = context if context is not None else {}

tests/instrumentation/botocore_tests.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,17 @@ def dynamodb():
7676
db.delete_table(TableName="Movies")
7777

7878

79+
@pytest.fixture()
80+
def sqs_client_and_queue():
81+
sqs = boto3.client("sqs", endpoint_url=LOCALSTACK_ENDPOINT)
82+
response = sqs.create_queue(
83+
QueueName="myqueue", Attributes={"DelaySeconds": "60", "MessageRetentionPeriod": "86400"}
84+
)
85+
queue_url = response["QueueUrl"]
86+
yield sqs, queue_url
87+
sqs.delete_queue(QueueUrl=queue_url)
88+
89+
7990
def test_botocore_instrumentation(instrument, elasticapm_client):
8091
elasticapm_client.begin_transaction("transaction.test")
8192
ec2 = boto3.client("ec2", endpoint_url=LOCALSTACK_ENDPOINT)
@@ -190,3 +201,80 @@ def test_sns(instrument, elasticapm_client):
190201
assert spans[2]["context"]["destination"]["service"]["name"] == "sns"
191202
assert spans[2]["context"]["destination"]["service"]["resource"] == "sns/mytopic"
192203
assert spans[2]["context"]["destination"]["service"]["type"] == "messaging"
204+
205+
206+
def test_sqs_send(instrument, elasticapm_client, sqs_client_and_queue):
207+
sqs, queue_url = sqs_client_and_queue
208+
elasticapm_client.begin_transaction("test")
209+
sqs.send_message(
210+
QueueUrl=queue_url,
211+
MessageAttributes={
212+
"Title": {"DataType": "String", "StringValue": "foo"},
213+
},
214+
MessageBody=("bar"),
215+
)
216+
elasticapm_client.end_transaction("test", "test")
217+
span = elasticapm_client.events[constants.SPAN][0]
218+
assert span["name"] == "SQS SEND to myqueue"
219+
assert span["type"] == "messaging"
220+
assert span["subtype"] == "sqs"
221+
assert span["action"] == "send"
222+
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
223+
assert span["context"]["destination"]["service"]["name"] == "sqs"
224+
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
225+
assert span["context"]["destination"]["service"]["type"] == "messaging"
226+
227+
228+
def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queue):
229+
sqs, queue_url = sqs_client_and_queue
230+
elasticapm_client.begin_transaction("test")
231+
response = sqs.send_message_batch(
232+
QueueUrl=queue_url,
233+
Entries=[
234+
{
235+
"Id": "foo",
236+
"MessageBody": "foo",
237+
"DelaySeconds": 123,
238+
"MessageAttributes": {"string": {"StringValue": "foo", "DataType": "String"}},
239+
},
240+
],
241+
)
242+
elasticapm_client.end_transaction("test", "test")
243+
span = elasticapm_client.events[constants.SPAN][0]
244+
assert span["name"] == "SQS SEND_BATCH to myqueue"
245+
assert span["type"] == "messaging"
246+
assert span["subtype"] == "sqs"
247+
assert span["action"] == "send"
248+
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
249+
assert span["context"]["destination"]["service"]["name"] == "sqs"
250+
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
251+
assert span["context"]["destination"]["service"]["type"] == "messaging"
252+
253+
254+
def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue):
255+
sqs, queue_url = sqs_client_and_queue
256+
sqs.send_message(
257+
QueueUrl=queue_url,
258+
MessageAttributes={
259+
"Title": {"DataType": "String", "StringValue": "foo"},
260+
},
261+
MessageBody=("bar"),
262+
)
263+
elasticapm_client.begin_transaction("test")
264+
response = sqs.receive_message(
265+
QueueUrl=queue_url,
266+
AttributeNames=["All"],
267+
MessageAttributeNames=[
268+
"All",
269+
],
270+
)
271+
elasticapm_client.end_transaction("test", "test")
272+
span = elasticapm_client.events[constants.SPAN][0]
273+
assert span["name"] == "SQS RECEIVE from myqueue"
274+
assert span["type"] == "messaging"
275+
assert span["subtype"] == "sqs"
276+
assert span["action"] == "receive"
277+
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
278+
assert span["context"]["destination"]["service"]["name"] == "sqs"
279+
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
280+
assert span["context"]["destination"]["service"]["type"] == "messaging"

0 commit comments

Comments
 (0)