diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f0debf7dd..cea2344f8 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -37,7 +37,8 @@ endif::[] [float] ===== Breaking changes -* Python 2.7 and 3.5 support has been deprecated. The Python agent now requires Python 3.6+ +* Python 2.7 and 3.5 support has been deprecated. The Python agent now requires Python 3.6+ {pull}1021[#1021] +* No longer collecting body for `elasticsearch-py` `update` and `delete_by_query` {pull}1013[#1013] [float] diff --git a/elasticapm/instrumentation/packages/asyncio/elasticsearch.py b/elasticapm/instrumentation/packages/asyncio/elasticsearch.py index 993f90a77..e7fcec282 100644 --- a/elasticapm/instrumentation/packages/asyncio/elasticsearch.py +++ b/elasticapm/instrumentation/packages/asyncio/elasticsearch.py @@ -30,7 +30,7 @@ import elasticapm from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule -from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin, ElasticsearchInstrumentation +from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractInstrumentedModule): @@ -55,18 +55,3 @@ async def call(self, module, method, wrapped, instance, args, kwargs): leaf=True, ): return await wrapped(*args, **kwargs) - - -class AsyncElasticsearchInstrumentation(ElasticsearchInstrumentation, AsyncAbstractInstrumentedModule): - name = "elasticsearch" - - instrument_list = [ - ("elasticsearch._async.client", "AsyncElasticsearch.delete_by_query"), - ("elasticsearch._async.client", "AsyncElasticsearch.search"), - ("elasticsearch._async.client", "AsyncElasticsearch.count"), - ("elasticsearch._async.client", "AsyncElasticsearch.update"), - ] - - async def call(self, module, method, wrapped, instance, args, kwargs): - kwargs = self.inject_apm_params(method, kwargs) - return await wrapped(*args, **kwargs) diff --git a/elasticapm/instrumentation/packages/elasticsearch.py b/elasticapm/instrumentation/packages/elasticsearch.py index b627b1991..a42c17eac 100644 --- a/elasticapm/instrumentation/packages/elasticsearch.py +++ b/elasticapm/instrumentation/packages/elasticsearch.py @@ -30,18 +30,15 @@ from __future__ import absolute_import -import json +import re import elasticapm from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule -from elasticapm.utils import compat from elasticapm.utils.logging import get_logger logger = get_logger("elasticapm.instrument") - -API_METHOD_KEY_NAME = "__elastic_apm_api_method_name" -BODY_REF_NAME = "__elastic_apm_body_ref" +should_capture_body_re = re.compile("/(_search|_msearch|_count|_async_search|_sql|_eql)(/|$)") class ElasticSearchConnectionMixin(object): @@ -56,13 +53,14 @@ def get_signature(self, args, kwargs): def get_context(self, instance, args, kwargs): args_len = len(args) + url = args[1] if args_len > 1 else kwargs.get("url") params = args[2] if args_len > 2 else kwargs.get("params") - body = params.pop(BODY_REF_NAME, None) if params else None body_serialized = args[3] if args_len > 3 else kwargs.get("body") - api_method = params.pop(API_METHOD_KEY_NAME, None) if params else None + should_capture_body = bool(should_capture_body_re.search(url)) + context = {"db": {"type": "elasticsearch"}} - if api_method in self.query_methods: + if should_capture_body: query = [] # using both q AND body is allowed in some API endpoints / ES versions, # but not in others. We simply capture both if they are there so the @@ -76,17 +74,8 @@ def get_context(self, instance, args, kwargs): query.append(body_serialized.decode("utf-8", errors="replace")) else: query.append(body_serialized) - elif body and isinstance(body, dict): - try: - query.append(json.dumps(body, default=compat.text_type)) - except TypeError: - pass if query: context["db"]["statement"] = "\n\n".join(query) - elif api_method == "update": - if isinstance(body, dict) and "script" in body: - # only get the `script` field from the body - context["db"]["statement"] = json.dumps({"script": body["script"]}) context["destination"] = { "address": instance.host, "service": {"name": "elasticsearch", "resource": "elasticsearch", "type": "db"}, @@ -116,49 +105,3 @@ def call(self, module, method, wrapped, instance, args, kwargs): leaf=True, ): return wrapped(*args, **kwargs) - - -class ElasticsearchInstrumentation(AbstractInstrumentedModule): - name = "elasticsearch" - - instrument_list = [ - ("elasticsearch.client", "Elasticsearch.delete_by_query"), - ("elasticsearch.client", "Elasticsearch.search"), - ("elasticsearch.client", "Elasticsearch.count"), - ("elasticsearch.client", "Elasticsearch.update"), - ] - - def __init__(self): - super(ElasticsearchInstrumentation, self).__init__() - try: - from elasticsearch import VERSION - - self.version = VERSION[0] - except ImportError: - self.version = None - - def instrument(self): - if self.version and not 2 <= self.version < 8: - logger.debug("Instrumenting version %s of Elasticsearch is not supported by Elastic APM", self.version) - return - super(ElasticsearchInstrumentation, self).instrument() - - def call(self, module, method, wrapped, instance, args, kwargs): - kwargs = self.inject_apm_params(method, kwargs) - return wrapped(*args, **kwargs) - - def inject_apm_params(self, method, kwargs): - params = kwargs.pop("params", {}) - - # make a copy of params in case the caller reuses them for some reason - params = params.copy() if params is not None else {} - - method_name = method.partition(".")[-1] - - # store a reference to the non-serialized body so we can use it in the connection layer - body = kwargs.get("body") - params[BODY_REF_NAME] = body - params[API_METHOD_KEY_NAME] = method_name - - kwargs["params"] = params - return kwargs diff --git a/elasticapm/instrumentation/register.py b/elasticapm/instrumentation/register.py index 8223201dd..6d7e5bf73 100644 --- a/elasticapm/instrumentation/register.py +++ b/elasticapm/instrumentation/register.py @@ -54,7 +54,6 @@ "elasticapm.instrumentation.packages.sqlite.SQLiteInstrumentation", "elasticapm.instrumentation.packages.urllib3.Urllib3Instrumentation", "elasticapm.instrumentation.packages.elasticsearch.ElasticsearchConnectionInstrumentation", - "elasticapm.instrumentation.packages.elasticsearch.ElasticsearchInstrumentation", "elasticapm.instrumentation.packages.cassandra.CassandraInstrumentation", "elasticapm.instrumentation.packages.pymssql.PyMSSQLInstrumentation", "elasticapm.instrumentation.packages.pyodbc.PyODBCInstrumentation", @@ -73,7 +72,6 @@ "elasticapm.instrumentation.packages.asyncio.aiohttp_client.AioHttpClientInstrumentation", "elasticapm.instrumentation.packages.asyncio.httpx.HttpxAsyncClientInstrumentation", "elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticSearchAsyncConnection", - "elasticapm.instrumentation.packages.asyncio.elasticsearch.AsyncElasticsearchInstrumentation", "elasticapm.instrumentation.packages.asyncio.aiopg.AioPGInstrumentation", "elasticapm.instrumentation.packages.asyncio.asyncpg.AsyncPGInstrumentation", "elasticapm.instrumentation.packages.tornado.TornadoRequestExecuteInstrumentation", diff --git a/tests/instrumentation/asyncio_tests/async_elasticsearch_client_tests.py b/tests/instrumentation/asyncio_tests/async_elasticsearch_client_tests.py index 95a30ce0e..9f7d7dc3d 100644 --- a/tests/instrumentation/asyncio_tests/async_elasticsearch_client_tests.py +++ b/tests/instrumentation/asyncio_tests/async_elasticsearch_client_tests.py @@ -175,24 +175,3 @@ async def test_count_body(instrument, elasticapm_client, async_elasticsearch): assert span["context"]["db"]["type"] == "elasticsearch" assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query": {"term": {"user": "kimchy"}}}') assert span["sync"] is False - - -async def test_delete_by_query_body(instrument, elasticapm_client, async_elasticsearch): - await async_elasticsearch.create( - index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True - ) - elasticapm_client.begin_transaction("test") - result = await async_elasticsearch.delete_by_query(index="tweets", body={"query": {"term": {"user": "kimchy"}}}) - elasticapm_client.end_transaction("test", "OK") - - transaction = elasticapm_client.events[TRANSACTION][0] - spans = elasticapm_client.spans_for_transaction(transaction) - - span = spans[0] - assert span["name"] == "ES POST /tweets/_delete_by_query" - assert span["type"] == "db" - assert span["subtype"] == "elasticsearch" - assert span["action"] == "query" - assert span["context"]["db"]["type"] == "elasticsearch" - assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query":{"term":{"user":"kimchy"}}}') - assert span["sync"] is False diff --git a/tests/instrumentation/elasticsearch_tests.py b/tests/instrumentation/elasticsearch_tests.py index 0c4850f4a..c479a2edc 100644 --- a/tests/instrumentation/elasticsearch_tests.py +++ b/tests/instrumentation/elasticsearch_tests.py @@ -295,34 +295,6 @@ def test_get_source(instrument, elasticapm_client, elasticsearch): assert "statement" not in span["context"]["db"] -@pytest.mark.skipif(ES_VERSION[0] < 5, reason="unsupported method") -@pytest.mark.integrationtest -def test_update_script(instrument, elasticapm_client, elasticsearch): - elasticsearch.create( - index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True - ) - elasticapm_client.begin_transaction("test") - r1 = elasticsearch.update( - index="tweets", id=1, doc_type=document_type, body={"script": "ctx._source.text = 'adios'"}, refresh=True - ) - elasticapm_client.end_transaction("test", "OK") - - transaction = elasticapm_client.events[TRANSACTION][0] - r2 = elasticsearch.get(index="tweets", doc_type=document_type, id=1) - assert r1["result"] == "updated" - assert r2["_source"] == {"user": "kimchy", "text": "adios"} - spans = elasticapm_client.spans_for_transaction(transaction) - assert len(spans) == 1 - - span = spans[0] - assert span["name"] == "ES POST /tweets/%s/1/_update" % document_type - assert span["type"] == "db" - assert span["subtype"] == "elasticsearch" - assert span["action"] == "query" - assert span["context"]["db"]["type"] == "elasticsearch" - assert span["context"]["db"]["statement"] == '{"script": "ctx._source.text = \'adios\'"}' - - @pytest.mark.integrationtest def test_update_document(instrument, elasticapm_client, elasticsearch): elasticsearch.create( @@ -356,7 +328,7 @@ def test_search_body(instrument, elasticapm_client, elasticsearch): ) elasticapm_client.begin_transaction("test") search_query = {"query": {"term": {"user": "kimchy"}}, "sort": ["userid"]} - result = elasticsearch.search(body=search_query, params=None) + result = elasticsearch.search(body=search_query) elasticapm_client.end_transaction("test", "OK") transaction = elasticapm_client.events[TRANSACTION][0] @@ -500,28 +472,6 @@ def test_delete(instrument, elasticapm_client, elasticsearch): assert span["context"]["db"]["type"] == "elasticsearch" -@pytest.mark.skipif(ES_VERSION[0] < 5, reason="unsupported method") -@pytest.mark.integrationtest -def test_delete_by_query_body(instrument, elasticapm_client, elasticsearch): - elasticsearch.create( - index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True - ) - elasticapm_client.begin_transaction("test") - result = elasticsearch.delete_by_query(index="tweets", body={"query": {"term": {"user": "kimchy"}}}) - elasticapm_client.end_transaction("test", "OK") - - transaction = elasticapm_client.events[TRANSACTION][0] - spans = elasticapm_client.spans_for_transaction(transaction) - - span = spans[0] - assert span["name"] == "ES POST /tweets/_delete_by_query" - assert span["type"] == "db" - assert span["subtype"] == "elasticsearch" - assert span["action"] == "query" - assert span["context"]["db"]["type"] == "elasticsearch" - assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query":{"term":{"user":"kimchy"}}}') - - @pytest.mark.integrationtest def test_multiple_indexes(instrument, elasticapm_client, elasticsearch): elasticsearch.create(index="tweets", doc_type="users", id=1, body={"user": "kimchy", "text": "hola"}, refresh=True) @@ -571,7 +521,7 @@ def test_custom_serializer(instrument, elasticapm_client, elasticsearch): elasticsearch.index(index="test-index", body={"2": 1}) elasticapm_client.begin_transaction("test") search_query = {"query": {"term": {NumberObj(2): {"value": 1}}}} - result = elasticsearch.search(index="test-index", body=search_query, params=None) + result = elasticsearch.search(index="test-index", body=search_query) elasticapm_client.end_transaction("test", "OK") transaction = elasticapm_client.events[TRANSACTION][0]