Skip to content

Commit 54213dc

Browse files
mabdinurduncanistaemmettbutler
authored
fix(botocore): fix kinesis typeerror [backports #5317 to 1.8] (#5334)
Backports: #5317 ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines) are followed. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Author is aware of the performance implications of this PR as reported in the benchmarks PR comment. ## Reviewer Checklist - [ ] Title is accurate. - [ ] No unnecessary changes are introduced. - [ ] Description motivates each change. - [ ] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [ ] Testing strategy adequately addresses listed risk(s). - [ ] Change is maintainable (easy to change, telemetry, documentation). - [ ] Release note makes sense to a user of the library. - [ ] Reviewer is aware of, and discussed the performance implications of this PR as reported in the benchmarks PR comment. Co-authored-by: Jordan Gonzalez <[email protected]> Co-authored-by: Emmett Butler <[email protected]>
1 parent faa3ba5 commit 54213dc

File tree

3 files changed

+184
-306
lines changed

3 files changed

+184
-306
lines changed

ddtrace/contrib/botocore/patch.py

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def get_json_from_str(data_str):
185185

186186
if data_str.endswith(LINE_BREAK):
187187
return LINE_BREAK, data_obj
188-
return "", data_obj
188+
return None, data_obj
189189

190190

191191
def get_kinesis_data_object(data):
@@ -194,24 +194,34 @@ def get_kinesis_data_object(data):
194194
:data: the data from a kinesis stream
195195
196196
The data from a kinesis stream comes as a string (could be json, base64 encoded, etc.)
197-
We support injecting our trace context in the following two cases:
197+
We support injecting our trace context in the following three cases:
198198
- json string
199+
- byte encoded json string
199200
- base64 encoded json string
200201
If it's neither of these, then we leave the message as it is.
201202
"""
202203

203204
# check if data is a json string
204205
try:
205206
return get_json_from_str(data)
206-
except ValueError:
207-
pass
207+
except Exception:
208+
log.debug("Kinesis data is not a JSON string. Trying Byte encoded JSON string.")
209+
210+
# check if data is an encoded json string
211+
try:
212+
data_str = data.decode("ascii")
213+
return get_json_from_str(data_str)
214+
except Exception:
215+
log.debug("Kinesis data is not a JSON string encoded. Trying Base64 encoded JSON string.")
208216

209217
# check if data is a base64 encoded json string
210218
try:
211219
data_str = base64.b64decode(data).decode("ascii")
212220
return get_json_from_str(data_str)
213-
except ValueError:
214-
raise TraceInjectionDecodingError("Unable to parse kinesis streams data string")
221+
except Exception:
222+
log.error("Unable to parse payload, unable to inject trace context.")
223+
224+
return None, None
215225

216226

217227
def inject_trace_to_kinesis_stream_data(record, span):
@@ -230,22 +240,23 @@ def inject_trace_to_kinesis_stream_data(record, span):
230240

231241
data = record["Data"]
232242
line_break, data_obj = get_kinesis_data_object(data)
233-
data_obj["_datadog"] = {}
234-
HTTPPropagator.inject(span.context, data_obj["_datadog"])
235-
data_json = json.dumps(data_obj)
236-
237-
# if original string had a line break, add it back
238-
if line_break:
239-
data_json += line_break
240-
241-
# check if data size will exceed max size with headers
242-
data_size = len(data_json)
243-
if data_size >= MAX_KINESIS_DATA_SIZE:
244-
raise TraceInjectionSizeExceed(
245-
"Data including trace injection ({}) exceeds ({})".format(data_size, MAX_KINESIS_DATA_SIZE)
246-
)
247-
248-
record["Data"] = data_json
243+
if data_obj is not None:
244+
data_obj["_datadog"] = {}
245+
HTTPPropagator.inject(span.context, data_obj["_datadog"])
246+
data_json = json.dumps(data_obj)
247+
248+
# if original string had a line break, add it back
249+
if line_break is not None:
250+
data_json += line_break
251+
252+
# check if data size will exceed max size with headers
253+
data_size = len(data_json)
254+
if data_size >= MAX_KINESIS_DATA_SIZE:
255+
raise TraceInjectionSizeExceed(
256+
"Data including trace injection ({}) exceeds ({})".format(data_size, MAX_KINESIS_DATA_SIZE)
257+
)
258+
259+
record["Data"] = data_json
249260

250261

251262
def inject_trace_to_kinesis_stream(params, span):
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
botocore: Fix TypeError raised by injecting trace context into Kinesis messages.

0 commit comments

Comments
 (0)