Skip to content

Commit ca17bbe

Browse files
mabdinurduncanistaemmettbutler
authored
fix(botocore): fix kinesis typeerror [backports #5317 to 1.9] (#5333)
Backports: #5317 ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines) are followed. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Author is aware of the performance implications of this PR as reported in the benchmarks PR comment. ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer is aware of, and discussed the performance implications of this PR as reported in the benchmarks PR comment. Co-authored-by: Jordan Gonzalez <[email protected]> Co-authored-by: Emmett Butler <[email protected]>
1 parent 63d99e6 commit ca17bbe

File tree

3 files changed

+184
-306
lines changed

3 files changed

+184
-306
lines changed

ddtrace/contrib/botocore/patch.py

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def get_json_from_str(data_str):
186186

187187
if data_str.endswith(LINE_BREAK):
188188
return LINE_BREAK, data_obj
189-
return "", data_obj
189+
return None, data_obj
190190

191191

192192
def get_kinesis_data_object(data):
@@ -195,24 +195,34 @@ def get_kinesis_data_object(data):
195195
:data: the data from a kinesis stream
196196
197197
The data from a kinesis stream comes as a string (could be json, base64 encoded, etc.)
198-
We support injecting our trace context in the following two cases:
198+
We support injecting our trace context in the following three cases:
199199
- json string
200+
- byte encoded json string
200201
- base64 encoded json string
201202
If it's neither of these, then we leave the message as it is.
202203
"""
203204

204205
# check if data is a json string
205206
try:
206207
return get_json_from_str(data)
207-
except ValueError:
208-
pass
208+
except Exception:
209+
log.debug("Kinesis data is not a JSON string. Trying Byte encoded JSON string.")
210+
211+
# check if data is an encoded json string
212+
try:
213+
data_str = data.decode("ascii")
214+
return get_json_from_str(data_str)
215+
except Exception:
216+
log.debug("Kinesis data is not a JSON string encoded. Trying Base64 encoded JSON string.")
209217

210218
# check if data is a base64 encoded json string
211219
try:
212220
data_str = base64.b64decode(data).decode("ascii")
213221
return get_json_from_str(data_str)
214-
except ValueError:
215-
raise TraceInjectionDecodingError("Unable to parse kinesis streams data string")
222+
except Exception:
223+
log.error("Unable to parse payload, unable to inject trace context.")
224+
225+
return None, None
216226

217227

218228
def inject_trace_to_kinesis_stream_data(record, span):
@@ -231,22 +241,23 @@ def inject_trace_to_kinesis_stream_data(record, span):
231241

232242
data = record["Data"]
233243
line_break, data_obj = get_kinesis_data_object(data)
234-
data_obj["_datadog"] = {}
235-
HTTPPropagator.inject(span.context, data_obj["_datadog"])
236-
data_json = json.dumps(data_obj)
237-
238-
# if original string had a line break, add it back
239-
if line_break:
240-
data_json += line_break
241-
242-
# check if data size will exceed max size with headers
243-
data_size = len(data_json)
244-
if data_size >= MAX_KINESIS_DATA_SIZE:
245-
raise TraceInjectionSizeExceed(
246-
"Data including trace injection ({}) exceeds ({})".format(data_size, MAX_KINESIS_DATA_SIZE)
247-
)
248-
249-
record["Data"] = data_json
244+
if data_obj is not None:
245+
data_obj["_datadog"] = {}
246+
HTTPPropagator.inject(span.context, data_obj["_datadog"])
247+
data_json = json.dumps(data_obj)
248+
249+
# if original string had a line break, add it back
250+
if line_break is not None:
251+
data_json += line_break
252+
253+
# check if data size will exceed max size with headers
254+
data_size = len(data_json)
255+
if data_size >= MAX_KINESIS_DATA_SIZE:
256+
raise TraceInjectionSizeExceed(
257+
"Data including trace injection ({}) exceeds ({})".format(data_size, MAX_KINESIS_DATA_SIZE)
258+
)
259+
260+
record["Data"] = data_json
250261

251262

252263
def inject_trace_to_kinesis_stream(params, span):
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
botocore: Fix TypeError raised by injecting trace context into Kinesis messages.

0 commit comments

Comments
 (0)