@@ -61,28 +61,41 @@ class TraceInjectionDecodingError(Exception):
6161 pass
6262
6363
64- def inject_trace_data_to_message_attributes (trace_data , entry ):
65- # type: (Dict[str, str], Dict[str, Any]) -> None
64+ def inject_trace_data_to_message_attributes (trace_data , entry , endpoint = None ):
65+ # type: (Dict[str, str], Dict[str, Any], Optional[str] ) -> None
6666 """
6767 :trace_data: trace headers to be stored in the entry's MessageAttributes
6868 :entry: an SQS or SNS record
69+ :endpoint: endpoint of message, "sqs" or "sns"
6970
7071 Inject trace headers into the an SQS or SNS record's MessageAttributes
7172 """
7273 if "MessageAttributes" not in entry :
7374 entry ["MessageAttributes" ] = {}
74- # An Amazon SQS message can contain up to 10 metadata attributes.
75+ # Max of 10 message attributes.
7576 if len (entry ["MessageAttributes" ]) < 10 :
76- entry ["MessageAttributes" ]["_datadog" ] = {"DataType" : "String" , "StringValue" : json .dumps (trace_data )}
77+ if endpoint == "sqs" :
78+ # Use String since changing this to Binary would be a breaking
79+ # change as other tracers expect this to be a String.
80+ entry ["MessageAttributes" ]["_datadog" ] = {"DataType" : "String" , "StringValue" : json .dumps (trace_data )}
81+ elif endpoint == "sns" :
82+ # Use Binary since SNS subscription filter policies fail silently
83+ # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269
84+ # AWS will encode our value if it sees "Binary"
85+ entry ["MessageAttributes" ]["_datadog" ] = {"DataType" : "Binary" , "BinaryValue" : json .dumps (trace_data )}
86+ else :
87+ log .warning ("skipping trace injection, endpoint is not SNS or SQS" )
7788 else :
89+ # In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute
7890 log .warning ("skipping trace injection, max number (10) of MessageAttributes exceeded" )
7991
8092
81- def inject_trace_to_sqs_or_sns_batch_message (params , span ):
82- # type: (Any, Span) -> None
93+ def inject_trace_to_sqs_or_sns_batch_message (params , span , endpoint = None ):
94+ # type: (Any, Span, Optional[str] ) -> None
8395 """
8496 :params: contains the params for the current botocore action
8597 :span: the span which provides the trace context to be propagated
98+ :endpoint: endpoint of message, "sqs" or "sns"
8699
87100 Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch
88101 """
@@ -94,21 +107,22 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span):
94107 # or PublishBatchRequestEntries (in case of PublishBatch).
95108 entries = params .get ("Entries" , params .get ("PublishBatchRequestEntries" , []))
96109 for entry in entries :
97- inject_trace_data_to_message_attributes (trace_data , entry )
110+ inject_trace_data_to_message_attributes (trace_data , entry , endpoint )
98111
99112
100- def inject_trace_to_sqs_or_sns_message (params , span ):
101- # type: (Any, Span) -> None
113+ def inject_trace_to_sqs_or_sns_message (params , span , endpoint = None ):
114+ # type: (Any, Span, Optional[str] ) -> None
102115 """
103116 :params: contains the params for the current botocore action
104117 :span: the span which provides the trace context to be propagated
118+ :endpoint: endpoint of message, "sqs" or "sns"
105119
106120 Inject trace headers into MessageAttributes for the SQS or SNS record
107121 """
108122 trace_data = {}
109123 HTTPPropagator .inject (span .context , trace_data )
110124
111- inject_trace_data_to_message_attributes (trace_data , params )
125+ inject_trace_data_to_message_attributes (trace_data , params , endpoint )
112126
113127
114128def inject_trace_to_eventbridge_detail (params , span ):
@@ -293,17 +307,17 @@ def patched_api_call(original_func, instance, args, kwargs):
293307 if endpoint_name == "lambda" and operation == "Invoke" :
294308 inject_trace_to_client_context (params , span )
295309 if endpoint_name == "sqs" and operation == "SendMessage" :
296- inject_trace_to_sqs_or_sns_message (params , span )
310+ inject_trace_to_sqs_or_sns_message (params , span , endpoint_name )
297311 if endpoint_name == "sqs" and operation == "SendMessageBatch" :
298- inject_trace_to_sqs_or_sns_batch_message (params , span )
312+ inject_trace_to_sqs_or_sns_batch_message (params , span , endpoint_name )
299313 if endpoint_name == "events" and operation == "PutEvents" :
300314 inject_trace_to_eventbridge_detail (params , span )
301315 if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords" ):
302316 inject_trace_to_kinesis_stream (params , span )
303317 if endpoint_name == "sns" and operation == "Publish" :
304- inject_trace_to_sqs_or_sns_message (params , span )
318+ inject_trace_to_sqs_or_sns_message (params , span , endpoint_name )
305319 if endpoint_name == "sns" and operation == "PublishBatch" :
306- inject_trace_to_sqs_or_sns_batch_message (params , span )
320+ inject_trace_to_sqs_or_sns_batch_message (params , span , endpoint_name )
307321 except Exception :
308322 log .warning ("Unable to inject trace context" , exc_info = True )
309323
0 commit comments