Skip to content

Commit ee11ad0

Browse files
hghotraKyle-Verhoogagocsmajorgreys
authored
feat(botocore): inject trace context into more aws managed services (#3178)
Inject trace context into different AWS managed services: SNS (Publish & PublishBatch) Eventbridge (custom events) Kinesis Streams (PutRecord & PutRecords) Note: Unit tests for SNS PublishBatch require a newer localstack version, as well as a fix for this issue: localstack/localstack#5395 Co-authored-by: Kyle Verhoog <[email protected]> Co-authored-by: Christopher Agocs <[email protected]> Co-authored-by: Tahir H. Butt <[email protected]>
1 parent 306247e commit ee11ad0

File tree

6 files changed

+1150
-175
lines changed

6 files changed

+1150
-175
lines changed

ddtrace/contrib/botocore/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
1919
.. py:data:: ddtrace.config.botocore['distributed_tracing']
2020
21-
Whether to inject distributed tracing data to requests in SQS and Lambda.
21+
Whether to inject distributed tracing data to requests in SQS, SNS, EventBridge, Kinesis Streams and Lambda.
2222
2323
Can also be enabled with the ``DD_BOTOCORE_DISTRIBUTED_TRACING`` environment variable.
2424

ddtrace/contrib/botocore/patch.py

Lines changed: 178 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
"""
44
import base64
55
import json
6+
import typing
7+
from typing import Any
8+
from typing import Dict
9+
from typing import List
10+
from typing import Optional
611

712
import botocore.client
813

@@ -24,12 +29,18 @@
2429
from ..trace_utils import unwrap
2530

2631

32+
if typing.TYPE_CHECKING:
33+
from ddtrace import Span
34+
2735
# Original botocore client class
2836
_Botocore_client = botocore.client.BaseClient
2937

3038
ARGS_NAME = ("action", "params", "path", "verb")
3139
TRACED_ARGS = {"params", "path", "verb"}
3240

41+
MAX_KINESIS_DATA_SIZE = 1 << 20 # 1MB
42+
MAX_EVENTBRIDGE_DETAIL_SIZE = 1 << 18 # 256KB
43+
3344
log = get_logger(__name__)
3445

3546
# Botocore default settings
@@ -42,33 +53,175 @@
4253
)
4354

4455

56+
class TraceInjectionSizeExceed(Exception):
57+
pass
58+
59+
60+
class TraceInjectionDecodingError(Exception):
61+
pass
62+
63+
4564
def inject_trace_data_to_message_attributes(trace_data, entry):
65+
# type: (Dict[str, str], Dict[str, Any]) -> None
66+
"""
67+
:trace_data: trace headers to be stored in the entry's MessageAttributes
68+
:entry: an SQS or SNS record
69+
70+
Inject trace headers into the an SQS or SNS record's MessageAttributes
71+
"""
4672
if "MessageAttributes" not in entry:
4773
entry["MessageAttributes"] = {}
4874
# An Amazon SQS message can contain up to 10 metadata attributes.
4975
if len(entry["MessageAttributes"]) < 10:
5076
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
5177
else:
52-
log.debug("skipping trace injection, max number (10) of MessageAttributes exceeded")
78+
log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded")
5379

5480

55-
def inject_trace_to_sqs_batch_message(args, span):
81+
def inject_trace_to_sqs_or_sns_batch_message(params, span):
82+
# type: (Any, Span) -> None
83+
"""
84+
:params: contains the params for the current botocore action
85+
:span: the span which provides the trace context to be propagated
86+
87+
Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch
88+
"""
5689
trace_data = {}
5790
HTTPPropagator.inject(span.context, trace_data)
58-
params = args[1]
5991

60-
for entry in params["Entries"]:
92+
# An entry here is an SNS or SQS record, and depending on how it was published,
93+
# it could either show up under Entries (in case of PutRecords),
94+
# or PublishBatchRequestEntries (in case of PublishBatch).
95+
entries = params.get("Entries", params.get("PublishBatchRequestEntries", []))
96+
for entry in entries:
6197
inject_trace_data_to_message_attributes(trace_data, entry)
6298

6399

64-
def inject_trace_to_sqs_message(args, span):
100+
def inject_trace_to_sqs_or_sns_message(params, span):
101+
# type: (Any, Span) -> None
102+
"""
103+
:params: contains the params for the current botocore action
104+
:span: the span which provides the trace context to be propagated
105+
106+
Inject trace headers into MessageAttributes for the SQS or SNS record
107+
"""
65108
trace_data = {}
66109
HTTPPropagator.inject(span.context, trace_data)
67-
params = args[1]
68110

69111
inject_trace_data_to_message_attributes(trace_data, params)
70112

71113

114+
def inject_trace_to_eventbridge_detail(params, span):
115+
# type: (Any, Span) -> None
116+
"""
117+
:params: contains the params for the current botocore action
118+
:span: the span which provides the trace context to be propagated
119+
120+
Inject trace headers into the EventBridge record if the record's Detail object contains a JSON string
121+
Max size per event is 256KB (https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-putevent-size.html)
122+
"""
123+
if "Entries" not in params:
124+
log.warning("Unable to inject context. The Event Bridge event had no Entries.")
125+
return
126+
127+
for entry in params["Entries"]:
128+
detail = {}
129+
if "Detail" in entry:
130+
try:
131+
detail = json.loads(entry["Detail"])
132+
except ValueError:
133+
log.warning("Detail is not a valid JSON string")
134+
continue
135+
136+
detail["_datadog"] = {}
137+
HTTPPropagator.inject(span.context, detail["_datadog"])
138+
detail_json = json.dumps(detail)
139+
140+
# check if detail size will exceed max size with headers
141+
detail_size = len(detail_json)
142+
if detail_size >= MAX_EVENTBRIDGE_DETAIL_SIZE:
143+
log.warning("Detail with trace injection (%s) exceeds limit (%s)", detail_size, MAX_EVENTBRIDGE_DETAIL_SIZE)
144+
continue
145+
146+
entry["Detail"] = detail_json
147+
148+
149+
def get_kinesis_data_object(data):
150+
# type: (str, Optional[bool]) -> Optional[Dict[str, Any]]
151+
"""
152+
:data: the data from a kinesis stream
153+
:try_b64: whether we should try to decode the string as base64
154+
155+
The data from a kinesis stream comes as a string (could be json, base64 encoded, etc.)
156+
We support injecting our trace context in the following two cases:
157+
- json string
158+
- base64 encoded json string
159+
If it's neither of these, then we leave the message as it is.
160+
"""
161+
162+
# check if data is a json string
163+
try:
164+
return json.loads(data)
165+
except ValueError:
166+
pass
167+
168+
# check if data is a base64 encoded json string
169+
try:
170+
return json.loads(base64.b64decode(data).decode("ascii"))
171+
except ValueError:
172+
raise TraceInjectionDecodingError("Unable to parse kinesis streams data string")
173+
174+
175+
def inject_trace_to_kinesis_stream_data(record, span):
176+
# type: (Dict[str, Any], Span) -> None
177+
"""
178+
:record: contains args for the current botocore action, Kinesis record is at index 1
179+
:span: the span which provides the trace context to be propagated
180+
181+
Inject trace headers into the Kinesis record's Data field in addition to the existing
182+
data. Only possible if the existing data is JSON string or base64 encoded JSON string
183+
Max data size per record is 1MB (https://aws.amazon.com/kinesis/data-streams/faqs/)
184+
"""
185+
if "Data" not in record:
186+
log.warning("Unable to inject context. The kinesis stream has no data")
187+
return
188+
189+
data = record["Data"]
190+
data_obj = get_kinesis_data_object(data)
191+
data_obj["_datadog"] = {}
192+
HTTPPropagator.inject(span.context, data_obj["_datadog"])
193+
data_json = json.dumps(data_obj)
194+
195+
# check if data size will exceed max size with headers
196+
data_size = len(data_json)
197+
if data_size >= MAX_KINESIS_DATA_SIZE:
198+
raise TraceInjectionSizeExceed(
199+
"Data including trace injection ({}) exceeds ({})".format(data_size, MAX_KINESIS_DATA_SIZE)
200+
)
201+
202+
record["Data"] = data_json
203+
204+
205+
def inject_trace_to_kinesis_stream(params, span):
206+
# type: (List[Any], Span) -> None
207+
"""
208+
:params: contains the params for the current botocore action
209+
:span: the span which provides the trace context to be propagated
210+
211+
Inject trace headers into the Kinesis batch's first record's Data field.
212+
Only possible if the existing data is JSON string or base64 encoded JSON string
213+
Max data size per record is 1MB (https://aws.amazon.com/kinesis/data-streams/faqs/)
214+
"""
215+
if "Records" in params:
216+
records = params["Records"]
217+
218+
if records:
219+
record = records[0]
220+
inject_trace_to_kinesis_stream_data(record, span)
221+
elif "Data" in params:
222+
inject_trace_to_kinesis_stream_data(params, span)
223+
224+
72225
def modify_client_context(client_context_object, trace_headers):
73226
if config.botocore["invoke_with_legacy_context"]:
74227
trace_headers = {"_datadog": trace_headers}
@@ -79,11 +232,10 @@ def modify_client_context(client_context_object, trace_headers):
79232
client_context_object["custom"] = trace_headers
80233

81234

82-
def inject_trace_to_client_context(args, span):
235+
def inject_trace_to_client_context(params, span):
83236
trace_headers = {}
84237
HTTPPropagator.inject(span.context, trace_headers)
85238
client_context_object = {}
86-
params = args[1]
87239
if "ClientContext" in params:
88240
try:
89241
client_context_json = base64.b64decode(params["ClientContext"]).decode("utf-8")
@@ -130,18 +282,30 @@ def patched_api_call(original_func, instance, args, kwargs):
130282
operation = None
131283
if args:
132284
operation = get_argument_value(args, kwargs, 0, "operation_name")
285+
params = get_argument_value(args, kwargs, 1, "api_params")
133286
# DEV: join is the fastest way of concatenating strings that is compatible
134287
# across Python versions (see
135288
# https://stackoverflow.com/questions/1316887/what-is-the-most-efficient-string-concatenation-method-in-python)
136289
span.resource = ".".join((endpoint_name, operation.lower()))
137290

138291
if config.botocore["distributed_tracing"]:
139-
if endpoint_name == "lambda" and operation == "Invoke":
140-
inject_trace_to_client_context(args, span)
141-
if endpoint_name == "sqs" and operation == "SendMessage":
142-
inject_trace_to_sqs_message(args, span)
143-
if endpoint_name == "sqs" and operation == "SendMessageBatch":
144-
inject_trace_to_sqs_batch_message(args, span)
292+
try:
293+
if endpoint_name == "lambda" and operation == "Invoke":
294+
inject_trace_to_client_context(params, span)
295+
if endpoint_name == "sqs" and operation == "SendMessage":
296+
inject_trace_to_sqs_or_sns_message(params, span)
297+
if endpoint_name == "sqs" and operation == "SendMessageBatch":
298+
inject_trace_to_sqs_or_sns_batch_message(params, span)
299+
if endpoint_name == "events" and operation == "PutEvents":
300+
inject_trace_to_eventbridge_detail(params, span)
301+
if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"):
302+
inject_trace_to_kinesis_stream(params, span)
303+
if endpoint_name == "sns" and operation == "Publish":
304+
inject_trace_to_sqs_or_sns_message(params, span)
305+
if endpoint_name == "sns" and operation == "PublishBatch":
306+
inject_trace_to_sqs_or_sns_batch_message(params, span)
307+
except Exception:
308+
log.warning("Unable to inject trace context", exc_info=True)
145309

146310
else:
147311
span.resource = endpoint_name

ddtrace/ext/aws.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from ddtrace.span import Span
1212

1313

14-
EXCLUDED_ENDPOINT = frozenset({"kms", "sts"})
14+
EXCLUDED_ENDPOINT = frozenset({"kms", "sts", "sns", "kinesis", "events"})
1515
EXCLUDED_ENDPOINT_TAGS = {
1616
"firehose": frozenset({"params.Records"}),
1717
}

docs/spelling_wordlist.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,4 @@ workflow
161161
wsgi
162162
xfail
163163
yaaredis
164+
Kinesis
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
features:
3+
- |
4+
botocore: add distributed tracing support for AWS EventBridge, AWS SNS & AWS Kinesis.

0 commit comments

Comments
 (0)