Skip to content

Commit 617b958

Browse files
fix(datastreams): support datastream propagation with raw message delivery enabled [backport 1.17] (#6799)
Backport b2bd70f from #6735 to 1.17. When a subscription is set up between SNS and a consumer (such as SQS), there is an option to enable "Raw Message Delivery" which changes the format of delivered data, primarily the MessageAttributes and MessageBody sections. Prior to this change, datastream context was dropped/ignored on the receive because the code assumed the format of delivered messages when Raw Delivery was disabled. After this change, the patch handles both raw message delivery and non-raw delivery. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: Teague Bick <[email protected]>
1 parent 2904bbb commit 617b958

File tree

3 files changed

+57
-5
lines changed

3 files changed

+57
-5
lines changed

ddtrace/contrib/botocore/patch.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ class TraceInjectionDecodingError(Exception):
9393
pass
9494

9595

96+
def _encode_data(trace_data):
97+
"""
98+
This method exists solely to enable us to patch the value in tests, since
99+
moto doesn't support auto-encoded SNS -> SQS as binary with RawDelivery enabled
100+
"""
101+
return json.dumps(trace_data)
102+
103+
96104
def inject_trace_data_to_message_attributes(trace_data, entry, endpoint_service=None):
97105
# type: (Dict[str, str], Dict[str, Any], Optional[str]) -> None
98106
"""
@@ -109,12 +117,12 @@ def inject_trace_data_to_message_attributes(trace_data, entry, endpoint_service=
109117
if endpoint_service == "sqs":
110118
# Use String since changing this to Binary would be a breaking
111119
# change as other tracers expect this to be a String.
112-
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
120+
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": _encode_data(trace_data)}
113121
elif endpoint_service == "sns":
114122
# Use Binary since SNS subscription filter policies fail silently
115123
# with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269
116124
# AWS will encode our value if it sees "Binary"
117-
entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)}
125+
entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": _encode_data(trace_data)}
118126
else:
119127
log.warning("skipping trace injection, endpoint is not SNS or SQS")
120128
else:
@@ -589,6 +597,18 @@ def patched_api_call(original_func, instance, args, kwargs):
589597
# The message originated from SQS
590598
message_body = message
591599
context_json = json.loads(message_body["MessageAttributes"]["_datadog"]["StringValue"])
600+
elif (
601+
"MessageAttributes" in message
602+
and "_datadog" in message["MessageAttributes"]
603+
and "BinaryValue" in message["MessageAttributes"]["_datadog"]
604+
):
605+
# Raw message delivery
606+
message_body = message
607+
context_json = json.loads(
608+
message_body["MessageAttributes"]["_datadog"]["BinaryValue"].decode()
609+
)
610+
else:
611+
log.debug("DataStreams did not handle message: %r", message)
592612

593613
pathway = context_json.get(PROPAGATION_KEY_BASE_64, None) if context_json else None
594614
ctx = pin.tracer.data_streams_processor.decode_pathway_b64(pathway)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
fixes:
3+
- |
4+
data_streams: This fix resolves an issue where data stream context propagation would not propagate via SNS if raw
5+
message delivery was enabled.

tests/contrib/botocore/test.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import datetime
33
import io
44
import json
5+
import sys
56
import unittest
67
import zipfile
78

@@ -1087,10 +1088,29 @@ def test_double_patch(self):
10871088
assert spans
10881089
assert len(spans) == 1
10891090

1090-
@mock_sns
1091-
@mock_sqs
10921091
@TracerTestCase.run_in_subprocess(env_overrides=dict(DD_DATA_STREAMS_ENABLED="True"))
10931092
def test_data_streams_sns_to_sqs(self):
1093+
self._test_data_streams_sns_to_sqs(False)
1094+
1095+
@mock.patch.object(sys.modules["ddtrace.contrib.botocore.patch"], "_encode_data")
1096+
@TracerTestCase.run_in_subprocess(env_overrides=dict(DD_DATA_STREAMS_ENABLED="True"))
1097+
def test_data_streams_sns_to_sqs_raw_delivery(self, mock_encode):
1098+
"""
1099+
Moto doesn't currently handle raw delivery message handling quite correctly.
1100+
In the real world, AWS will encode data for us. Moto does not.
1101+
1102+
So, we patch our code here to encode the data
1103+
"""
1104+
1105+
def _moto_compatible_encode(trace_data):
1106+
return base64.b64encode(json.dumps(trace_data).encode("utf-8"))
1107+
1108+
mock_encode.side_effect = _moto_compatible_encode
1109+
self._test_data_streams_sns_to_sqs(True)
1110+
1111+
@mock_sns
1112+
@mock_sqs
1113+
def _test_data_streams_sns_to_sqs(self, use_raw_delivery):
10941114
# DEV: We want to mock time to ensure we only create a single bucket
10951115
with mock.patch("time.time") as mt:
10961116
mt.return_value = 1642544540
@@ -1104,7 +1124,14 @@ def test_data_streams_sns_to_sqs(self):
11041124
sqs_url = self.sqs_test_queue["QueueUrl"]
11051125
url_parts = sqs_url.split("/")
11061126
sqs_arn = "arn:aws:sqs:{}:{}:{}".format("us-east-1", url_parts[-2], url_parts[-1])
1107-
sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_arn)
1127+
subscription = sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_arn)
1128+
1129+
if use_raw_delivery:
1130+
sns.set_subscription_attributes(
1131+
SubscriptionArn=subscription["SubscriptionArn"],
1132+
AttributeName="RawMessageDelivery",
1133+
AttributeValue="true",
1134+
)
11081135

11091136
Pin.get_from(sns).clone(tracer=self.tracer).onto(sns)
11101137
Pin.get_from(self.sqs_client).clone(tracer=self.tracer).onto(self.sqs_client)

0 commit comments

Comments
 (0)