Skip to content

Commit ade3559

Browse files
github-actions[bot]tabgokmabdinur
authored
fix(datastreams): support datastream propagation with raw message delivery enabled [backport 1.18] (#6800)
Backport b2bd70f from #6735 to 1.18. When a subscription is set up between SNS and a consumer (such as SQS), there is an option to enable "Raw Message Delivery" which changes the format of delivered data, primarily the MessageAttributes and MessageBody sections. Prior to this change, datastream context was dropped/ignored on the receive because the code assumed the format of delivered messages when Raw Delivery was disabled. After this change, the patch handles both raw message delivery and non-raw delivery. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: Teague Bick <[email protected]> Co-authored-by: Munir Abdinur <[email protected]>
1 parent 2e75621 commit ade3559

File tree

3 files changed

+57
-5
lines changed

3 files changed

+57
-5
lines changed

ddtrace/contrib/botocore/patch.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,14 @@ class TraceInjectionDecodingError(Exception):
9494
pass
9595

9696

97+
def _encode_data(trace_data):
98+
"""
99+
This method exists solely to enable us to patch the value in tests, since
100+
moto doesn't support auto-encoded SNS -> SQS as binary with RawDelivery enabled
101+
"""
102+
return json.dumps(trace_data)
103+
104+
97105
def inject_trace_data_to_message_attributes(trace_data, entry, endpoint_service=None):
98106
# type: (Dict[str, str], Dict[str, Any], Optional[str]) -> None
99107
"""
@@ -110,12 +118,12 @@ def inject_trace_data_to_message_attributes(trace_data, entry, endpoint_service=
110118
if endpoint_service == "sqs":
111119
# Use String since changing this to Binary would be a breaking
112120
# change as other tracers expect this to be a String.
113-
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
121+
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": _encode_data(trace_data)}
114122
elif endpoint_service == "sns":
115123
# Use Binary since SNS subscription filter policies fail silently
116124
# with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269
117125
# AWS will encode our value if it sees "Binary"
118-
entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)}
126+
entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": _encode_data(trace_data)}
119127
else:
120128
log.warning("skipping trace injection, endpoint is not SNS or SQS")
121129
else:
@@ -640,6 +648,18 @@ def patched_api_call(original_func, instance, args, kwargs):
640648
# The message originated from SQS
641649
message_body = message
642650
context_json = json.loads(message_body["MessageAttributes"]["_datadog"]["StringValue"])
651+
elif (
652+
"MessageAttributes" in message
653+
and "_datadog" in message["MessageAttributes"]
654+
and "BinaryValue" in message["MessageAttributes"]["_datadog"]
655+
):
656+
# Raw message delivery
657+
message_body = message
658+
context_json = json.loads(
659+
message_body["MessageAttributes"]["_datadog"]["BinaryValue"].decode()
660+
)
661+
else:
662+
log.debug("DataStreams did not handle message: %r", message)
643663

644664
pathway = context_json.get(PROPAGATION_KEY_BASE_64, None) if context_json else None
645665
ctx = pin.tracer.data_streams_processor.decode_pathway_b64(pathway)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
fixes:
3+
- |
4+
data_streams: This fix resolves an issue where data stream context propagation would not propagate via SNS if raw
5+
message delivery was enabled.

tests/contrib/botocore/test.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import datetime
33
import io
44
import json
5+
import sys
56
import unittest
67
import zipfile
78

@@ -1087,10 +1088,29 @@ def test_double_patch(self):
10871088
assert spans
10881089
assert len(spans) == 1
10891090

1090-
@mock_sns
1091-
@mock_sqs
10921091
@TracerTestCase.run_in_subprocess(env_overrides=dict(DD_DATA_STREAMS_ENABLED="True"))
10931092
def test_data_streams_sns_to_sqs(self):
1093+
self._test_data_streams_sns_to_sqs(False)
1094+
1095+
@mock.patch.object(sys.modules["ddtrace.contrib.botocore.patch"], "_encode_data")
1096+
@TracerTestCase.run_in_subprocess(env_overrides=dict(DD_DATA_STREAMS_ENABLED="True"))
1097+
def test_data_streams_sns_to_sqs_raw_delivery(self, mock_encode):
1098+
"""
1099+
Moto doesn't currently handle raw delivery message handling quite correctly.
1100+
In the real world, AWS will encode data for us. Moto does not.
1101+
1102+
So, we patch our code here to encode the data
1103+
"""
1104+
1105+
def _moto_compatible_encode(trace_data):
1106+
return base64.b64encode(json.dumps(trace_data).encode("utf-8"))
1107+
1108+
mock_encode.side_effect = _moto_compatible_encode
1109+
self._test_data_streams_sns_to_sqs(True)
1110+
1111+
@mock_sns
1112+
@mock_sqs
1113+
def _test_data_streams_sns_to_sqs(self, use_raw_delivery):
10941114
# DEV: We want to mock time to ensure we only create a single bucket
10951115
with mock.patch("time.time") as mt:
10961116
mt.return_value = 1642544540
@@ -1104,7 +1124,14 @@ def test_data_streams_sns_to_sqs(self):
11041124
sqs_url = self.sqs_test_queue["QueueUrl"]
11051125
url_parts = sqs_url.split("/")
11061126
sqs_arn = "arn:aws:sqs:{}:{}:{}".format("us-east-1", url_parts[-2], url_parts[-1])
1107-
sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_arn)
1127+
subscription = sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_arn)
1128+
1129+
if use_raw_delivery:
1130+
sns.set_subscription_attributes(
1131+
SubscriptionArn=subscription["SubscriptionArn"],
1132+
AttributeName="RawMessageDelivery",
1133+
AttributeValue="true",
1134+
)
11081135

11091136
Pin.get_from(sns).clone(tracer=self.tracer).onto(sns)
11101137
Pin.get_from(self.sqs_client).clone(tracer=self.tracer).onto(self.sqs_client)

0 commit comments

Comments
 (0)