Skip to content

Commit 197117a

Browse files
authored
fix issue with adding disttracing to SQS messages when dropping spans (#1170)
also, closing some gaps in SQS testing closes #1169
1 parent 465fbc2 commit 197117a

File tree

2 files changed

+93
-13
lines changed

2 files changed

+93
-13
lines changed

elasticapm/instrumentation/packages/botocore.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@
3232

3333
from elasticapm.conf import constants
3434
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
35-
from elasticapm.traces import capture_span
35+
from elasticapm.traces import capture_span, execution_context
3636
from elasticapm.utils.compat import urlparse
3737
from elasticapm.utils.logging import get_logger
3838

3939
logger = get_logger("elasticapm.instrument")
4040

41+
SQS_MAX_ATTRIBUTES = 10
42+
4143

4244
HandlerInfo = namedtuple("HandlerInfo", ("signature", "span_type", "span_subtype", "span_action", "context"))
4345

@@ -171,20 +173,25 @@ def handle_sqs(operation_name, service, instance, args, kwargs, context):
171173

172174

173175
def modify_span_sqs(span, args, kwargs):
174-
trace_parent = span.transaction.trace_parent.copy_from(span_id=span.id)
176+
if span.id:
177+
trace_parent = span.transaction.trace_parent.copy_from(span_id=span.id)
178+
else:
179+
# this is a dropped span, use transaction id instead
180+
transaction = execution_context.get_transaction()
181+
trace_parent = transaction.trace_parent.copy_from(span_id=transaction.id)
175182
attributes = {constants.TRACEPARENT_HEADER_NAME: {"DataType": "String", "StringValue": trace_parent.to_string()}}
176183
if trace_parent.tracestate:
177184
attributes[constants.TRACESTATE_HEADER_NAME] = {"DataType": "String", "StringValue": trace_parent.tracestate}
178185
if len(args) > 1:
179186
attributes_count = len(attributes)
180187
if "MessageAttributes" in args[1]:
181188
messages = [args[1]]
182-
# elif "Entries" in args[1]:
183-
# messages = args[1]["Entries"]
189+
elif "Entries" in args[1]:
190+
messages = args[1]["Entries"]
184191
else:
185192
messages = []
186193
for message in messages:
187-
if len(message["MessageAttributes"]) + attributes_count <= 10:
194+
if len(message["MessageAttributes"]) + attributes_count <= SQS_MAX_ATTRIBUTES:
188195
message["MessageAttributes"].update(attributes)
189196
else:
190197
logger.info("Not adding disttracing headers to message due to attribute limit reached")

tests/instrumentation/botocore_tests.py

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@
2929
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3030
import os
3131

32-
import mock
3332
import pytest
3433

34+
import elasticapm
3535
from elasticapm.conf import constants
36-
from elasticapm.instrumentation.packages.botocore import BotocoreInstrumentation
36+
from elasticapm.instrumentation.packages.botocore import SQS_MAX_ATTRIBUTES
3737
from elasticapm.utils.compat import urlparse
38+
from tests.utils import assert_any_record_contains
3839

3940
boto3 = pytest.importorskip("boto3")
4041

@@ -79,9 +80,7 @@ def dynamodb():
7980
@pytest.fixture()
8081
def sqs_client_and_queue():
8182
sqs = boto3.client("sqs", endpoint_url=LOCALSTACK_ENDPOINT)
82-
response = sqs.create_queue(
83-
QueueName="myqueue", Attributes={"DelaySeconds": "60", "MessageRetentionPeriod": "86400"}
84-
)
83+
response = sqs.create_queue(QueueName="myqueue", Attributes={"MessageRetentionPeriod": "86400"})
8584
queue_url = response["QueueUrl"]
8685
yield sqs, queue_url
8786
sqs.delete_queue(QueueUrl=queue_url)
@@ -213,7 +212,7 @@ def test_sqs_send(instrument, elasticapm_client, sqs_client_and_queue):
213212
},
214213
MessageBody=("bar"),
215214
)
216-
elasticapm_client.end_transaction("test", "test")
215+
transaction = elasticapm_client.end_transaction("test", "test")
217216
span = elasticapm_client.events[constants.SPAN][0]
218217
assert span["name"] == "SQS SEND to myqueue"
219218
assert span["type"] == "messaging"
@@ -224,6 +223,19 @@ def test_sqs_send(instrument, elasticapm_client, sqs_client_and_queue):
224223
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
225224
assert span["context"]["destination"]["service"]["type"] == "messaging"
226225

226+
messages = sqs.receive_message(
227+
QueueUrl=queue_url,
228+
AttributeNames=["All"],
229+
MessageAttributeNames=[
230+
"All",
231+
],
232+
)
233+
message = messages["Messages"][0]
234+
assert "traceparent" in message["MessageAttributes"]
235+
traceparent = message["MessageAttributes"]["traceparent"]["StringValue"]
236+
assert transaction.trace_parent.trace_id in traceparent
237+
assert span["id"] in traceparent
238+
227239

228240
def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queue):
229241
sqs, queue_url = sqs_client_and_queue
@@ -234,12 +246,11 @@ def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queue):
234246
{
235247
"Id": "foo",
236248
"MessageBody": "foo",
237-
"DelaySeconds": 123,
238249
"MessageAttributes": {"string": {"StringValue": "foo", "DataType": "String"}},
239250
},
240251
],
241252
)
242-
elasticapm_client.end_transaction("test", "test")
253+
transaction = elasticapm_client.end_transaction("test", "test")
243254
span = elasticapm_client.events[constants.SPAN][0]
244255
assert span["name"] == "SQS SEND_BATCH to myqueue"
245256
assert span["type"] == "messaging"
@@ -249,6 +260,68 @@ def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queue):
249260
assert span["context"]["destination"]["service"]["name"] == "sqs"
250261
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
251262
assert span["context"]["destination"]["service"]["type"] == "messaging"
263+
messages = sqs.receive_message(
264+
QueueUrl=queue_url,
265+
AttributeNames=["All"],
266+
MessageAttributeNames=[
267+
"All",
268+
],
269+
)
270+
message = messages["Messages"][0]
271+
assert "traceparent" in message["MessageAttributes"]
272+
traceparent = message["MessageAttributes"]["traceparent"]["StringValue"]
273+
assert transaction.trace_parent.trace_id in traceparent
274+
assert span["id"] in traceparent
275+
276+
277+
def test_sqs_send_too_many_attributes_for_disttracing(instrument, elasticapm_client, sqs_client_and_queue, caplog):
278+
sqs, queue_url = sqs_client_and_queue
279+
attributes = {str(i): {"DataType": "String", "StringValue": str(i)} for i in range(SQS_MAX_ATTRIBUTES)}
280+
elasticapm_client.begin_transaction("test")
281+
with caplog.at_level("INFO"):
282+
sqs.send_message(
283+
QueueUrl=queue_url,
284+
MessageAttributes=attributes,
285+
MessageBody=("bar"),
286+
)
287+
elasticapm_client.end_transaction("test", "test")
288+
messages = sqs.receive_message(
289+
QueueUrl=queue_url,
290+
AttributeNames=["All"],
291+
MessageAttributeNames=[
292+
"All",
293+
],
294+
)
295+
message = messages["Messages"][0]
296+
assert "traceparent" not in message["MessageAttributes"]
297+
assert_any_record_contains(caplog.records, "Not adding disttracing headers")
298+
299+
300+
def test_sqs_send_disttracing_dropped_span(instrument, elasticapm_client, sqs_client_and_queue):
301+
sqs, queue_url = sqs_client_and_queue
302+
elasticapm_client.begin_transaction("test")
303+
with elasticapm.capture_span("test", leaf=True):
304+
sqs.send_message(
305+
QueueUrl=queue_url,
306+
MessageAttributes={
307+
"Title": {"DataType": "String", "StringValue": "foo"},
308+
},
309+
MessageBody=("bar"),
310+
)
311+
transaction = elasticapm_client.end_transaction("test", "test")
312+
assert len(elasticapm_client.events[constants.SPAN]) == 1
313+
messages = sqs.receive_message(
314+
QueueUrl=queue_url,
315+
AttributeNames=["All"],
316+
MessageAttributeNames=[
317+
"All",
318+
],
319+
)
320+
message = messages["Messages"][0]
321+
assert "traceparent" in message["MessageAttributes"]
322+
traceparent = message["MessageAttributes"]["traceparent"]["StringValue"]
323+
assert transaction.trace_parent.trace_id in traceparent
324+
assert transaction.id in traceparent # due to DroppedSpan, transaction.id is used instead of span.id
252325

253326

254327
def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue):

0 commit comments

Comments
 (0)