Skip to content

Commit 5a7debf

Browse files
Yun-Kimhghotramergify[bot]brettlangdonmajorgreys
authored
fix(botocore): keep newlines in json data for kinesis records [backport #4700 to 1.6] (#4819)
## Description Backports #4700 to 1.6. During our serialization & de-serialization of kinesis records, any `\n` at the end of the record is stripped by the json library. Customer pipelines rely on this `\n` delimiter to distinguish between different records. This change appends the line break to the end of the record string if it was originally there. ## Checklist - [ ] Followed the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines) when writing a release note. - [ ] Add additional sections for `feat` and `fix` pull requests. - [ ] [Library documentation](https://github.com/DataDog/dd-trace-py/tree/1.x/docs) and/or [Datadog's documentation site](https://github.com/DataDog/documentation/) is updated. Link to doc PR in description. <!-- Copy and paste the relevant snippet based on the type of pull request --> <!-- START feat --> ## Motivation <!-- Expand on why the change is required, include relevant context for reviewers --> Add support for Databricks which relies on the `\n` delimiter to be present at the end of the record. Look at the related issue for more details. ## Design <!-- Include benefits from the change as well as possible drawbacks and trade-offs --> If the incoming record has a `\n` at the end of the string, we add the `\n` into the string post serialization since it was stripped during the de-serialization process by the json library. ## Testing strategy <!-- Describe the automated tests and/or the steps for manual testing. <!-- END feat --> - two unit tests have been added ``` pytest -k test_kinesis_put_records_newline_base64_trace_injection tests/contrib/botocore/test.py pytest -k test_kinesis_put_records_newline_json_trace_injection tests/contrib/botocore/test.py ``` <!-- START fix --> ## Relevant issue(s) <!-- Link the pull request to any issues related to the fix. Use keywords for links to automate closing the issues once the pull request is merged. --> Fixes #4317 ## Testing strategy <!-- Describe any added regression tests and/or the manual testing performed. --> <!-- END fix --> ## Reviewer Checklist - [x] Title is accurate. - [x] Description motivates each change. - [x] No unnecessary changes were introduced in this PR. - [x] Avoid breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Tests provided or description of manual testing performed is included in the code or PR. - [x] Release note has been added and follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines), or else `changelog/no-changelog` label added. - [x] All relevant GitHub issues are correctly linked. - [x] Backports are identified and tagged with Mergifyio. Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Brett Langdon <[email protected]> Co-authored-by: Tahir H. Butt <[email protected]> Co-authored-by: Yun Kim <[email protected]> Co-authored-by: Munir Abdinur <[email protected]> ## Description <!-- Briefly describe the change and why it was required. --> <!-- If this is a breaking change, explain why it is necessary. Breaking changes must append `!` after the type/scope. See https://ddtrace.readthedocs.io/en/stable/contributing.html for more details. --> ## Checklist - [ ] Followed the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines) when writing a release note. - [ ] Add additional sections for `feat` and `fix` pull requests. - [ ] [Library documentation](https://github.com/DataDog/dd-trace-py/tree/1.x/docs) and/or [Datadog's documentation site](https://github.com/DataDog/documentation/) is updated. Link to doc PR in description. <!-- Copy and paste the relevant snippet based on the type of pull request --> <!-- START feat --> ## Motivation <!-- Expand on why the change is required, include relevant context for reviewers --> ## Design <!-- Include benefits from the change as well as possible drawbacks and trade-offs --> ## Testing strategy <!-- Describe the automated tests and/or the steps for manual testing. <!-- END feat --> <!-- START fix --> ## Relevant issue(s) <!-- Link the pull request to any issues related to the fix. Use keywords for links to automate closing the issues once the pull request is merged. --> ## Testing strategy <!-- Describe any added regression tests and/or the manual testing performed. --> <!-- END fix --> ## Reviewer Checklist - [ ] Title is accurate. - [ ] Description motivates each change. - [ ] No unnecessary changes were introduced in this PR. - [ ] Avoid breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [ ] Tests provided or description of manual testing performed is included in the code or PR. - [ ] Release note has been added and follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines), or else `changelog/no-changelog` label added. - [ ] All relevant GitHub issues are correctly linked. - [ ] Backports are identified and tagged with Mergifyio. Co-authored-by: Harvinder Ghotra <[email protected]> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Brett Langdon <[email protected]> Co-authored-by: Tahir H. Butt <[email protected]> Co-authored-by: Munir Abdinur <[email protected]>
1 parent 1c9d59d commit 5a7debf

File tree

3 files changed

+143
-4
lines changed

3 files changed

+143
-4
lines changed

ddtrace/contrib/botocore/patch.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import Dict
1111
from typing import List
1212
from typing import Optional
13+
from typing import Tuple
1314

1415
import botocore.client
1516
import botocore.exceptions
@@ -45,6 +46,8 @@
4546
MAX_KINESIS_DATA_SIZE = 1 << 20 # 1MB
4647
MAX_EVENTBRIDGE_DETAIL_SIZE = 1 << 18 # 256KB
4748

49+
LINE_BREAK = "\n"
50+
4851
log = get_logger(__name__)
4952

5053

@@ -176,8 +179,17 @@ def inject_trace_to_eventbridge_detail(params, span):
176179
entry["Detail"] = detail_json
177180

178181

182+
def get_json_from_str(data_str):
183+
# type: (str) -> Tuple[str, Optional[Dict[str, Any]]]
184+
data_obj = json.loads(data_str)
185+
186+
if data_str.endswith(LINE_BREAK):
187+
return LINE_BREAK, data_obj
188+
return "", data_obj
189+
190+
179191
def get_kinesis_data_object(data):
180-
# type: (str) -> Optional[Dict[str, Any]]
192+
# type: (str) -> Tuple[str, Optional[Dict[str, Any]]]
181193
"""
182194
:data: the data from a kinesis stream
183195
@@ -190,13 +202,14 @@ def get_kinesis_data_object(data):
190202

191203
# check if data is a json string
192204
try:
193-
return json.loads(data)
205+
return get_json_from_str(data)
194206
except ValueError:
195207
pass
196208

197209
# check if data is a base64 encoded json string
198210
try:
199-
return json.loads(base64.b64decode(data).decode("ascii"))
211+
data_str = base64.b64decode(data).decode("ascii")
212+
return get_json_from_str(data_str)
200213
except ValueError:
201214
raise TraceInjectionDecodingError("Unable to parse kinesis streams data string")
202215

@@ -216,11 +229,15 @@ def inject_trace_to_kinesis_stream_data(record, span):
216229
return
217230

218231
data = record["Data"]
219-
data_obj = get_kinesis_data_object(data)
232+
line_break, data_obj = get_kinesis_data_object(data)
220233
data_obj["_datadog"] = {}
221234
HTTPPropagator.inject(span.context, data_obj["_datadog"])
222235
data_json = json.dumps(data_obj)
223236

237+
# if original string had a line break, add it back
238+
if line_break:
239+
data_json += line_break
240+
224241
# check if data size will exceed max size with headers
225242
data_size = len(data_json)
226243
if data_size >= MAX_KINESIS_DATA_SIZE:
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
botocore: Before this change, the botocore integration stripped newlines from the JSON string encoded in the data blob of Amazon Kinesis records. This change includes a terminating newline if it is present in the decoded data.

tests/contrib/botocore/test.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,6 +1795,124 @@ def test_kinesis_put_records_base64_trace_injection(self):
17951795

17961796
client.delete_stream(StreamName=stream_name)
17971797

1798+
@mock_kinesis
1799+
def test_kinesis_put_records_newline_json_trace_injection(self):
1800+
client = self.session.create_client("kinesis", region_name="us-east-1")
1801+
1802+
stream_name = "test"
1803+
client.create_stream(StreamName=stream_name, ShardCount=1)
1804+
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
1805+
shard_id = stream["Shards"][0]["ShardId"]
1806+
1807+
partition_key = "1234"
1808+
data = [
1809+
{"Data": json.dumps({"Hello": "World"}) + "\n", "PartitionKey": partition_key},
1810+
{"Data": json.dumps({"foo": "bar"}) + "\n", "PartitionKey": partition_key},
1811+
]
1812+
1813+
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(client)
1814+
client.put_records(StreamName=stream_name, Records=data)
1815+
1816+
# check if the appropriate span was generated
1817+
spans = self.get_spans()
1818+
assert spans
1819+
span = spans[0]
1820+
assert len(spans) == 1
1821+
assert span.get_tag("aws.region") == "us-east-1"
1822+
assert span.get_tag("aws.operation") == "PutRecords"
1823+
assert span.get_tag("params.MessageBody") is None
1824+
assert_is_measured(span)
1825+
assert_span_http_status_code(span, 200)
1826+
assert span.service == "test-botocore-tracing.kinesis"
1827+
assert span.resource == "kinesis.putrecords"
1828+
records = span.get_tag("params.Records")
1829+
assert records is None
1830+
1831+
resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON")
1832+
shard_iterator = resp["ShardIterator"]
1833+
1834+
# ensure headers are present in received message
1835+
resp = client.get_records(ShardIterator=shard_iterator)
1836+
assert len(resp["Records"]) == 2
1837+
records = resp["Records"]
1838+
record = records[0]
1839+
data_str = record["Data"].decode("ascii")
1840+
assert data_str.endswith("\n")
1841+
data = json.loads(data_str)
1842+
headers = data["_datadog"]
1843+
assert headers is not None
1844+
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
1845+
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)
1846+
1847+
record = records[1]
1848+
data_str = record["Data"].decode("ascii")
1849+
assert data_str.endswith("\n")
1850+
data = json.loads(data_str)
1851+
assert "_datadog" not in data
1852+
1853+
client.delete_stream(StreamName=stream_name)
1854+
1855+
@mock_kinesis
1856+
def test_kinesis_put_records_newline_base64_trace_injection(self):
1857+
client = self.session.create_client("kinesis", region_name="us-east-1")
1858+
1859+
stream_name = "test"
1860+
client.create_stream(StreamName=stream_name, ShardCount=1)
1861+
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
1862+
shard_id = stream["Shards"][0]["ShardId"]
1863+
1864+
partition_key = "1234"
1865+
sample_string = json.dumps({"Hello": "World"}) + "\n"
1866+
sample_string_bytes = sample_string.encode("ascii")
1867+
base64_bytes = base64.b64encode(sample_string_bytes)
1868+
data_str = base64_bytes.decode("ascii")
1869+
data = [
1870+
{"Data": data_str, "PartitionKey": partition_key},
1871+
{"Data": data_str, "PartitionKey": partition_key},
1872+
]
1873+
1874+
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(client)
1875+
client.put_records(StreamName=stream_name, Records=data)
1876+
1877+
# check if the appropriate span was generated
1878+
spans = self.get_spans()
1879+
assert spans
1880+
span = spans[0]
1881+
assert len(spans) == 1
1882+
assert span.get_tag("aws.region") == "us-east-1"
1883+
assert span.get_tag("aws.operation") == "PutRecords"
1884+
assert span.get_tag("params.MessageBody") is None
1885+
assert_is_measured(span)
1886+
assert_span_http_status_code(span, 200)
1887+
assert span.service == "test-botocore-tracing.kinesis"
1888+
assert span.resource == "kinesis.putrecords"
1889+
records = span.get_tag("params.Records")
1890+
assert records is None
1891+
1892+
resp = client.get_shard_iterator(StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON")
1893+
shard_iterator = resp["ShardIterator"]
1894+
1895+
# ensure headers are present in received message
1896+
resp = client.get_records(ShardIterator=shard_iterator)
1897+
assert len(resp["Records"]) == 2
1898+
records = resp["Records"]
1899+
record = records[0]
1900+
data_str = record["Data"].decode("ascii")
1901+
assert data_str.endswith("\n")
1902+
data = json.loads(data_str)
1903+
headers = data["_datadog"]
1904+
assert headers is not None
1905+
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
1906+
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)
1907+
1908+
record = records[1]
1909+
data_str = base64.b64decode(record["Data"]).decode("ascii")
1910+
assert data_str.endswith("\n")
1911+
data = json.loads(data_str)
1912+
assert "_datadog" not in data
1913+
1914+
client.delete_stream(StreamName=stream_name)
1915+
17981916
@unittest.skipIf(PY2, "Skipping for Python 2.7 since older moto doesn't support secretsmanager")
17991917
def test_secretsmanager(self):
18001918
from moto import mock_secretsmanager

0 commit comments

Comments
 (0)