Skip to content

Commit 9be21a8

Browse files
authored
Fix for json-incompatible elasticsearch bodies (#998)
* Use the elasticsearch serializer for better compat * Use the `body` provided with perform_request * Check for bytes body * CHANGELOG
1 parent 3b1bff0 commit 9be21a8

File tree

4 files changed

+102
-25
lines changed

4 files changed

+102
-25
lines changed

CHANGELOG.asciidoc

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,22 @@ endif::[]
3030
//===== Bug fixes
3131
//
3232
33+
=== Unreleased
34+
35+
// Unreleased changes go here
36+
// When the next release happens, nest these changes under the "Python Agent version 5.x" heading
37+
[float]
38+
===== Features
39+
40+
41+
[float]
42+
===== Bug fixes
43+
44+
* Fix for custom serializers in elasticsearch-py {pull}998[#998]
45+
* Fix large query truncation in psycopg2 {pull}994[#994]
46+
* Fix memory metrics reporting when `memory.usage_in_bytes` is unavailable {pull}987[#987]
47+
48+
3349
[[release-notes-5.x]]
3450
=== Python Agent version 5.x
3551
@@ -38,14 +54,14 @@ endif::[]
3854
[float]
3955
===== Features
4056
41-
* Implement "sample_rate" property for transactions and spans, and propagate through tracestate {pull}891[#891]
42-
* Add support for callbacks on config changes {pull}912[#912]
43-
* Override `sys.excepthook` to catch all exceptions {pull}943[#943]
44-
* Implement `log_level` config (supports central config) {pull}946[#946]
45-
* Implement `transaction_ignore_urls` config (supports central config) {pull}923[#923]
46-
* Add public API to retrieve trace parent header {pull}956[#956]
47-
* Added support for cgroup memory metrics {pull}846[#846]
48-
57+
* Implement "sample_rate" property for transactions and spans, and propagate through tracestate {pull}891[#891]
58+
* Add support for callbacks on config changes {pull}912[#912]
59+
* Override `sys.excepthook` to catch all exceptions {pull}943[#943]
60+
* Implement `log_level` config (supports central config) {pull}946[#946]
61+
* Implement `transaction_ignore_urls` config (supports central config) {pull}923[#923]
62+
* Add public API to retrieve trace parent header {pull}956[#956]
63+
* Added support for cgroup memory metrics {pull}846[#846]
64+
4965
5066
[float]
5167
===== Bug fixes
@@ -62,9 +78,9 @@ endif::[]
6278
[float]
6379
===== Features
6480
65-
* Add instrumentation support for https://github.com/encode/httpx[`httpx`] and https://github.com/encode/httpcore[`httpcore`] {pull}898[#898]
66-
* Implement "outcome" property for transactions and spans {pull}899[#899]
67-
* Add support for `asyncpg` {pull}889[#889]
81+
* Add instrumentation support for https://github.com/encode/httpx[`httpx`] and https://github.com/encode/httpcore[`httpcore`] {pull}898[#898]
82+
* Implement "outcome" property for transactions and spans {pull}899[#899]
83+
* Add support for `asyncpg` {pull}889[#889]
6884
6985
[float]
7086
===== Bug fixes

elasticapm/instrumentation/packages/elasticsearch.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def get_context(self, instance, args, kwargs):
5858
args_len = len(args)
5959
params = args[2] if args_len > 2 else kwargs.get("params")
6060
body = params.pop(BODY_REF_NAME, None) if params else None
61+
body_serialized = args[3] if args_len > 3 else kwargs.get("body")
6162

6263
api_method = params.pop(API_METHOD_KEY_NAME, None) if params else None
6364
context = {"db": {"type": "elasticsearch"}}
@@ -70,8 +71,16 @@ def get_context(self, instance, args, kwargs):
7071
# 'q' is already encoded to a byte string at this point
7172
# we assume utf8, which is the default
7273
query.append("q=" + params["q"].decode("utf-8", errors="replace"))
73-
if body and isinstance(body, dict):
74-
query.append(json.dumps(body, default=compat.text_type))
74+
if body_serialized:
75+
if isinstance(body_serialized, bytes):
76+
query.append(body_serialized.decode("utf-8", errors="replace"))
77+
else:
78+
query.append(body_serialized)
79+
elif body and isinstance(body, dict):
80+
try:
81+
query.append(json.dumps(body, default=compat.text_type))
82+
except TypeError:
83+
pass
7584
if query:
7685
context["db"]["statement"] = "\n\n".join(query)
7786
elif api_method == "update":

tests/instrumentation/asyncio_tests/async_elasticsearch_client_tests.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
pytest.importorskip("elasticsearch._async") # isort:skip
3434

35+
import json
3536
import os
3637

3738
from elasticsearch import VERSION as ES_VERSION
@@ -145,9 +146,10 @@ async def test_search_body(instrument, elasticapm_client, async_elasticsearch):
145146
assert span["subtype"] == "elasticsearch"
146147
assert span["action"] == "query"
147148
assert span["context"]["db"]["type"] == "elasticsearch"
148-
assert (
149-
span["context"]["db"]["statement"] == '{"sort": ["userid"], "query": {"term": {"user": "kimchy"}}}'
150-
or span["context"]["db"]["statement"] == '{"query": {"term": {"user": "kimchy"}}, "sort": ["userid"]}'
149+
assert json.loads(span["context"]["db"]["statement"]) == json.loads(
150+
'{"sort": ["userid"], "query": {"term": {"user": "kimchy"}}}'
151+
) or json.loads(span["context"]["db"]["statement"]) == json.loads(
152+
'{"query": {"term": {"user": "kimchy"}}, "sort": ["userid"]}'
151153
)
152154
assert span["sync"] is False
153155

@@ -171,7 +173,7 @@ async def test_count_body(instrument, elasticapm_client, async_elasticsearch):
171173
assert span["subtype"] == "elasticsearch"
172174
assert span["action"] == "query"
173175
assert span["context"]["db"]["type"] == "elasticsearch"
174-
assert span["context"]["db"]["statement"] == '{"query": {"term": {"user": "kimchy"}}}'
176+
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query": {"term": {"user": "kimchy"}}}')
175177
assert span["sync"] is False
176178

177179

@@ -192,5 +194,5 @@ async def test_delete_by_query_body(instrument, elasticapm_client, async_elastic
192194
assert span["subtype"] == "elasticsearch"
193195
assert span["action"] == "query"
194196
assert span["context"]["db"]["type"] == "elasticsearch"
195-
assert span["context"]["db"]["statement"] == '{"query": {"term": {"user": "kimchy"}}}'
197+
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query":{"term":{"user":"kimchy"}}}')
196198
assert span["sync"] is False

tests/instrumentation/elasticsearch_tests.py

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,15 @@
3232

3333
pytest.importorskip("elasticsearch") # isort:skip
3434

35+
import json
3536
import os
3637

3738
from elasticsearch import VERSION as ES_VERSION
3839
from elasticsearch import Elasticsearch
40+
from elasticsearch.serializer import JSONSerializer
3941

4042
from elasticapm.conf.constants import TRANSACTION
43+
from elasticapm.utils import compat
4144

4245
pytestmark = [pytest.mark.elasticsearch]
4346

@@ -48,10 +51,39 @@
4851
document_type = "_doc" if ES_VERSION[0] >= 6 else "doc"
4952

5053

54+
class NumberObj:
55+
def __init__(self, value):
56+
self.value = value
57+
58+
59+
class SpecialEncoder(JSONSerializer):
60+
def default(self, obj):
61+
if isinstance(obj, NumberObj):
62+
return obj.value
63+
return JSONSerializer.default(self, obj)
64+
65+
def force_key_encoding(self, obj):
66+
if isinstance(obj, dict):
67+
68+
def yield_key_value(d):
69+
for key, value in compat.iteritems(d):
70+
try:
71+
yield self.default(key), self.force_key_encoding(value)
72+
except TypeError:
73+
yield key, self.force_key_encoding(value)
74+
75+
return dict(yield_key_value(obj))
76+
else:
77+
return obj
78+
79+
def dumps(self, obj):
80+
return super(SpecialEncoder, self).dumps(self.force_key_encoding(obj))
81+
82+
5183
@pytest.fixture
5284
def elasticsearch(request):
5385
"""Elasticsearch client fixture."""
54-
client = Elasticsearch(hosts=os.environ["ES_URL"])
86+
client = Elasticsearch(hosts=os.environ["ES_URL"], serializer=SpecialEncoder())
5587
try:
5688
yield client
5789
finally:
@@ -338,9 +370,10 @@ def test_search_body(instrument, elasticapm_client, elasticsearch):
338370
assert span["subtype"] == "elasticsearch"
339371
assert span["action"] == "query"
340372
assert span["context"]["db"]["type"] == "elasticsearch"
341-
assert (
342-
span["context"]["db"]["statement"] == '{"sort": ["userid"], "query": {"term": {"user": "kimchy"}}}'
343-
or span["context"]["db"]["statement"] == '{"query": {"term": {"user": "kimchy"}}, "sort": ["userid"]}'
373+
assert json.loads(span["context"]["db"]["statement"]) == json.loads(
374+
'{"sort": ["userid"], "query": {"term": {"user": "kimchy"}}}'
375+
) or json.loads(span["context"]["db"]["statement"]) == json.loads(
376+
'{"query": {"term": {"user": "kimchy"}}, "sort": ["userid"]}'
344377
)
345378

346379

@@ -393,7 +426,7 @@ def test_search_both(instrument, elasticapm_client, elasticsearch):
393426
assert span["subtype"] == "elasticsearch"
394427
assert span["action"] == "query"
395428
assert span["context"]["db"]["type"] == "elasticsearch"
396-
assert span["context"]["db"]["statement"] == 'q=text:hola\n\n{"query": {"term": {"user": "kimchy"}}}'
429+
assert span["context"]["db"]["statement"].startswith('q=text:hola\n\n{"query":')
397430

398431

399432
@pytest.mark.integrationtest
@@ -419,7 +452,7 @@ def test_count_body(instrument, elasticapm_client, elasticsearch):
419452
assert span["subtype"] == "elasticsearch"
420453
assert span["action"] == "query"
421454
assert span["context"]["db"]["type"] == "elasticsearch"
422-
assert span["context"]["db"]["statement"] == '{"query": {"term": {"user": "kimchy"}}}'
455+
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query": {"term": {"user": "kimchy"}}}')
423456

424457

425458
@pytest.mark.integrationtest
@@ -486,7 +519,7 @@ def test_delete_by_query_body(instrument, elasticapm_client, elasticsearch):
486519
assert span["subtype"] == "elasticsearch"
487520
assert span["action"] == "query"
488521
assert span["context"]["db"]["type"] == "elasticsearch"
489-
assert span["context"]["db"]["statement"] == '{"query": {"term": {"user": "kimchy"}}}'
522+
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query":{"term":{"user":"kimchy"}}}')
490523

491524

492525
@pytest.mark.integrationtest
@@ -528,3 +561,20 @@ def test_multiple_indexes_doctypes(instrument, elasticapm_client, elasticsearch)
528561
assert span["subtype"] == "elasticsearch"
529562
assert span["action"] == "query"
530563
assert span["context"]["db"]["type"] == "elasticsearch"
564+
565+
566+
@pytest.mark.integrationtest
567+
def test_custom_serializer(instrument, elasticapm_client, elasticsearch):
568+
if ES_VERSION[0] < 7:
569+
elasticsearch.index("test-index", document_type, {"2": 1})
570+
else:
571+
elasticsearch.index(index="test-index", body={"2": 1})
572+
elasticapm_client.begin_transaction("test")
573+
search_query = {"query": {"term": {NumberObj(2): {"value": 1}}}}
574+
result = elasticsearch.search(index="test-index", body=search_query, params=None)
575+
elasticapm_client.end_transaction("test", "OK")
576+
577+
transaction = elasticapm_client.events[TRANSACTION][0]
578+
spans = elasticapm_client.spans_for_transaction(transaction)
579+
span = spans[0]
580+
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query":{"term":{"2":{"value":1}}}}')

0 commit comments

Comments
 (0)