Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions elasticapm/instrumentation/packages/asyncio/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import elasticapm
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin, ElasticsearchInstrumentation
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin


class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractInstrumentedModule):
Expand All @@ -55,18 +55,3 @@ async def call(self, module, method, wrapped, instance, args, kwargs):
leaf=True,
):
return await wrapped(*args, **kwargs)


class AsyncElasticsearchInstrumentation(ElasticsearchInstrumentation, AsyncAbstractInstrumentedModule):
name = "elasticsearch"

instrument_list = [
("elasticsearch._async.client", "AsyncElasticsearch.delete_by_query"),
("elasticsearch._async.client", "AsyncElasticsearch.search"),
("elasticsearch._async.client", "AsyncElasticsearch.count"),
("elasticsearch._async.client", "AsyncElasticsearch.update"),
]

async def call(self, module, method, wrapped, instance, args, kwargs):
kwargs = self.inject_apm_params(method, kwargs)
return await wrapped(*args, **kwargs)
69 changes: 6 additions & 63 deletions elasticapm/instrumentation/packages/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,15 @@

from __future__ import absolute_import

import json
import re

import elasticapm
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
from elasticapm.utils import compat
from elasticapm.utils.logging import get_logger

logger = get_logger("elasticapm.instrument")


API_METHOD_KEY_NAME = "__elastic_apm_api_method_name"
BODY_REF_NAME = "__elastic_apm_body_ref"
should_capture_body_re = re.compile("_((search|msearch)(/template)?|count)$")


class ElasticSearchConnectionMixin(object):
Expand All @@ -56,13 +53,14 @@ def get_signature(self, args, kwargs):

def get_context(self, instance, args, kwargs):
args_len = len(args)
url = args[1] if args_len > 1 else kwargs.get("url")
params = args[2] if args_len > 2 else kwargs.get("params")
body = params.pop(BODY_REF_NAME, None) if params else None
body_serialized = args[3] if args_len > 3 else kwargs.get("body")

api_method = params.pop(API_METHOD_KEY_NAME, None) if params else None
should_capture_body = bool(should_capture_body_re.search(url))

context = {"db": {"type": "elasticsearch"}}
if api_method in self.query_methods:
if should_capture_body:
query = []
# using both q AND body is allowed in some API endpoints / ES versions,
# but not in others. We simply capture both if they are there so the
Expand All @@ -76,17 +74,8 @@ def get_context(self, instance, args, kwargs):
query.append(body_serialized.decode("utf-8", errors="replace"))
else:
query.append(body_serialized)
elif body and isinstance(body, dict):
try:
query.append(json.dumps(body, default=compat.text_type))
except TypeError:
pass
if query:
context["db"]["statement"] = "\n\n".join(query)
elif api_method == "update":
if isinstance(body, dict) and "script" in body:
# only get the `script` field from the body
context["db"]["statement"] = json.dumps({"script": body["script"]})
context["destination"] = {
"address": instance.host,
"service": {"name": "elasticsearch", "resource": "elasticsearch", "type": "db"},
Expand Down Expand Up @@ -116,49 +105,3 @@ def call(self, module, method, wrapped, instance, args, kwargs):
leaf=True,
):
return wrapped(*args, **kwargs)


class ElasticsearchInstrumentation(AbstractInstrumentedModule):
name = "elasticsearch"

instrument_list = [
("elasticsearch.client", "Elasticsearch.delete_by_query"),
("elasticsearch.client", "Elasticsearch.search"),
("elasticsearch.client", "Elasticsearch.count"),
("elasticsearch.client", "Elasticsearch.update"),
]

def __init__(self):
super(ElasticsearchInstrumentation, self).__init__()
try:
from elasticsearch import VERSION

self.version = VERSION[0]
except ImportError:
self.version = None

def instrument(self):
if self.version and not 2 <= self.version < 8:
logger.debug("Instrumenting version %s of Elasticsearch is not supported by Elastic APM", self.version)
return
super(ElasticsearchInstrumentation, self).instrument()

def call(self, module, method, wrapped, instance, args, kwargs):
kwargs = self.inject_apm_params(method, kwargs)
return wrapped(*args, **kwargs)

def inject_apm_params(self, method, kwargs):
params = kwargs.pop("params", {})

# make a copy of params in case the caller reuses them for some reason
params = params.copy() if params is not None else {}

method_name = method.partition(".")[-1]

# store a reference to the non-serialized body so we can use it in the connection layer
body = kwargs.get("body")
params[BODY_REF_NAME] = body
params[API_METHOD_KEY_NAME] = method_name

kwargs["params"] = params
return kwargs
2 changes: 0 additions & 2 deletions elasticapm/instrumentation/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"elasticapm.instrumentation.packages.sqlite.SQLiteInstrumentation",
"elasticapm.instrumentation.packages.urllib3.Urllib3Instrumentation",
"elasticapm.instrumentation.packages.elasticsearch.ElasticsearchConnectionInstrumentation",
"elasticapm.instrumentation.packages.elasticsearch.ElasticsearchInstrumentation",
"elasticapm.instrumentation.packages.cassandra.CassandraInstrumentation",
"elasticapm.instrumentation.packages.pymssql.PyMSSQLInstrumentation",
"elasticapm.instrumentation.packages.pyodbc.PyODBCInstrumentation",
Expand All @@ -73,7 +72,6 @@
"elasticapm.instrumentation.packages.asyncio.aiohttp_client.AioHttpClientInstrumentation",
"elasticapm.instrumentation.packages.asyncio.httpx.HttpxAsyncClientInstrumentation",
"elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticSearchAsyncConnection",
"elasticapm.instrumentation.packages.asyncio.elasticsearch.AsyncElasticsearchInstrumentation",
"elasticapm.instrumentation.packages.asyncio.aiopg.AioPGInstrumentation",
"elasticapm.instrumentation.packages.asyncio.asyncpg.AsyncPGInstrumentation",
"elasticapm.instrumentation.packages.tornado.TornadoRequestExecuteInstrumentation",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,24 +175,3 @@ async def test_count_body(instrument, elasticapm_client, async_elasticsearch):
assert span["context"]["db"]["type"] == "elasticsearch"
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query": {"term": {"user": "kimchy"}}}')
assert span["sync"] is False


async def test_delete_by_query_body(instrument, elasticapm_client, async_elasticsearch):
await async_elasticsearch.create(
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
)
elasticapm_client.begin_transaction("test")
result = await async_elasticsearch.delete_by_query(index="tweets", body={"query": {"term": {"user": "kimchy"}}})
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]
spans = elasticapm_client.spans_for_transaction(transaction)

span = spans[0]
assert span["name"] == "ES POST /tweets/_delete_by_query"
assert span["type"] == "db"
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["context"]["db"]["type"] == "elasticsearch"
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query":{"term":{"user":"kimchy"}}}')
assert span["sync"] is False
54 changes: 2 additions & 52 deletions tests/instrumentation/elasticsearch_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,34 +295,6 @@ def test_get_source(instrument, elasticapm_client, elasticsearch):
assert "statement" not in span["context"]["db"]


@pytest.mark.skipif(ES_VERSION[0] < 5, reason="unsupported method")
@pytest.mark.integrationtest
def test_update_script(instrument, elasticapm_client, elasticsearch):
elasticsearch.create(
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
)
elasticapm_client.begin_transaction("test")
r1 = elasticsearch.update(
index="tweets", id=1, doc_type=document_type, body={"script": "ctx._source.text = 'adios'"}, refresh=True
)
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]
r2 = elasticsearch.get(index="tweets", doc_type=document_type, id=1)
assert r1["result"] == "updated"
assert r2["_source"] == {"user": "kimchy", "text": "adios"}
spans = elasticapm_client.spans_for_transaction(transaction)
assert len(spans) == 1

span = spans[0]
assert span["name"] == "ES POST /tweets/%s/1/_update" % document_type
assert span["type"] == "db"
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["context"]["db"]["type"] == "elasticsearch"
assert span["context"]["db"]["statement"] == '{"script": "ctx._source.text = \'adios\'"}'


@pytest.mark.integrationtest
def test_update_document(instrument, elasticapm_client, elasticsearch):
elasticsearch.create(
Expand Down Expand Up @@ -356,7 +328,7 @@ def test_search_body(instrument, elasticapm_client, elasticsearch):
)
elasticapm_client.begin_transaction("test")
search_query = {"query": {"term": {"user": "kimchy"}}, "sort": ["userid"]}
result = elasticsearch.search(body=search_query, params=None)
result = elasticsearch.search(body=search_query)
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]
Expand Down Expand Up @@ -500,28 +472,6 @@ def test_delete(instrument, elasticapm_client, elasticsearch):
assert span["context"]["db"]["type"] == "elasticsearch"


@pytest.mark.skipif(ES_VERSION[0] < 5, reason="unsupported method")
@pytest.mark.integrationtest
def test_delete_by_query_body(instrument, elasticapm_client, elasticsearch):
elasticsearch.create(
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
)
elasticapm_client.begin_transaction("test")
result = elasticsearch.delete_by_query(index="tweets", body={"query": {"term": {"user": "kimchy"}}})
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]
spans = elasticapm_client.spans_for_transaction(transaction)

span = spans[0]
assert span["name"] == "ES POST /tweets/_delete_by_query"
assert span["type"] == "db"
assert span["subtype"] == "elasticsearch"
assert span["action"] == "query"
assert span["context"]["db"]["type"] == "elasticsearch"
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query":{"term":{"user":"kimchy"}}}')


@pytest.mark.integrationtest
def test_multiple_indexes(instrument, elasticapm_client, elasticsearch):
elasticsearch.create(index="tweets", doc_type="users", id=1, body={"user": "kimchy", "text": "hola"}, refresh=True)
Expand Down Expand Up @@ -571,7 +521,7 @@ def test_custom_serializer(instrument, elasticapm_client, elasticsearch):
elasticsearch.index(index="test-index", body={"2": 1})
elasticapm_client.begin_transaction("test")
search_query = {"query": {"term": {NumberObj(2): {"value": 1}}}}
result = elasticsearch.search(index="test-index", body=search_query, params=None)
result = elasticsearch.search(index="test-index", body=search_query)
elasticapm_client.end_transaction("test", "OK")

transaction = elasticapm_client.events[TRANSACTION][0]
Expand Down