Skip to content

Commit 50d4651

Browse files
authored
capture destination host for instrumentations of services (#618)
Implementation of span destination information as per elastic/apm#180
1 parent 4e17b24 commit 50d4651

40 files changed

+485
-55
lines changed

.ci/.jenkins_framework.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ FRAMEWORK:
3333
- zerorpc-0.4
3434
- mysql_connector-newest
3535
- pymysql-newest
36+
- mysqlclient-newest
3637
- aiohttp-newest
3738
- aiopg-newest
3839
- tornado-newest

CHANGELOG.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ endif::[]
2222
2323
// Unreleased changes go here
2424
// When the next release happens, nest these changes under the "Python Agent version 5.x" heading
25+
[float]
26+
===== Features
27+
* Added destination information to database/HTTP spans, used for service maps {pull}618[#618]
2528
2629
[float]
2730
===== Deprecations

elasticapm/instrumentation/packages/asyncio/aiohttp_client.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from elasticapm.conf import constants
3333
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
3434
from elasticapm.traces import DroppedSpan, execution_context
35-
from elasticapm.utils import get_host_from_url, sanitize_url
35+
from elasticapm.utils import get_host_from_url, sanitize_url, url_to_destination
3636
from elasticapm.utils.disttracing import TracingOptions
3737

3838

@@ -45,13 +45,18 @@ async def call(self, module, method, wrapped, instance, args, kwargs):
4545
method = kwargs["method"] if "method" in kwargs else args[0]
4646
url = kwargs["url"] if "url" in kwargs else args[1]
4747
url = str(url)
48+
destination = url_to_destination(url)
4849

4950
signature = " ".join([method.upper(), get_host_from_url(url)])
5051
url = sanitize_url(url)
5152
transaction = execution_context.get_transaction()
5253

5354
async with async_capture_span(
54-
signature, span_type="external", span_subtype="http", extra={"http": {"url": url}}, leaf=True
55+
signature,
56+
span_type="external",
57+
span_subtype="http",
58+
extra={"http": {"url": url}, "destination": destination},
59+
leaf=True,
5560
) as span:
5661
leaf_span = span
5762
while isinstance(leaf_span, DroppedSpan):

elasticapm/instrumentation/packages/asyncio/elasticsearch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractIn
4040

4141
async def call(self, module, method, wrapped, instance, args, kwargs):
4242
signature = self.get_signature(args, kwargs)
43-
context = self.get_context(args, kwargs)
43+
context = self.get_context(instance, args, kwargs)
4444

4545
async with elasticapm.async_capture_span(
4646
signature,

elasticapm/instrumentation/packages/cassandra.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,14 @@ class CassandraInstrumentation(AbstractInstrumentedModule):
4141

4242
def call(self, module, method, wrapped, instance, args, kwargs):
4343
name = self.get_wrapped_name(wrapped, instance, method)
44-
context = None
44+
context = {}
4545
if method == "Cluster.connect":
4646
span_action = "connect"
47+
host = instance.endpoints_resolved[0].address
48+
port = instance.endpoints_resolved[0].port
4749
else:
50+
host = instance.hosts[0].endpoint.address
51+
port = instance.hosts[0].endpoint.port
4852
span_action = "query"
4953
query = args[0] if args else kwargs.get("query")
5054
if hasattr(query, "query_string"):
@@ -57,7 +61,12 @@ def call(self, module, method, wrapped, instance, args, kwargs):
5761
query_str = None
5862
if query_str:
5963
name = extract_signature(query_str)
60-
context = {"db": {"type": "sql", "statement": query_str}}
64+
context["db"] = {"type": "sql", "statement": query_str}
65+
context["destination"] = {
66+
"address": host,
67+
"port": port,
68+
"service": {"name": "cassandra", "resource": "cassandra", "type": "db"},
69+
}
6170

6271
with capture_span(name, span_type="db", span_subtype="cassandra", span_action=span_action, extra=context):
6372
return wrapped(*args, **kwargs)

elasticapm/instrumentation/packages/dbapi2.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
"""Provides classes to instrument dbapi2 providers
2-
3-
https://www.python.org/dev/peps/pep-0249/
4-
"""
51
# BSD 3-Clause License
62
#
73
# Copyright (c) 2019, Elasticsearch BV
@@ -32,6 +28,11 @@
3228
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
3329
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3430

31+
"""Provides classes to instrument dbapi2 providers
32+
33+
https://www.python.org/dev/peps/pep-0249/
34+
"""
35+
3536
import re
3637

3738
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
@@ -197,6 +198,10 @@ def extract_signature(sql):
197198
class CursorProxy(wrapt.ObjectProxy):
198199
provider_name = None
199200

201+
def __init__(self, wrapped, destination_info=None):
202+
super(CursorProxy, self).__init__(wrapped)
203+
self._self_destination_info = destination_info
204+
200205
def callproc(self, procname, params=None):
201206
return self._trace_sql(self.__wrapped__.callproc, procname, params, action=EXEC_ACTION)
202207

@@ -224,7 +229,7 @@ def _trace_sql(self, method, sql, params, action=QUERY_ACTION):
224229
span_type="db",
225230
span_subtype=self.provider_name,
226231
span_action=action,
227-
extra={"db": {"type": "sql", "statement": sql_string}},
232+
extra={"db": {"type": "sql", "statement": sql_string}, "destination": self._self_destination_info},
228233
skip_frames=1,
229234
):
230235
if params is None:
@@ -239,8 +244,12 @@ def extract_signature(self, sql):
239244
class ConnectionProxy(wrapt.ObjectProxy):
240245
cursor_proxy = CursorProxy
241246

247+
def __init__(self, wrapped, destination_info=None):
248+
super(ConnectionProxy, self).__init__(wrapped)
249+
self._self_destination_info = destination_info
250+
242251
def cursor(self, *args, **kwargs):
243-
return self.cursor_proxy(self.__wrapped__.cursor(*args, **kwargs))
252+
return self.cursor_proxy(self.__wrapped__.cursor(*args, **kwargs), self._self_destination_info)
244253

245254

246255
class DbApi2Instrumentation(AbstractInstrumentedModule):

elasticapm/instrumentation/packages/elasticsearch.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def get_signature(self, args, kwargs):
5454

5555
return "ES %s %s" % (http_method, http_path)
5656

57-
def get_context(self, args, kwargs):
57+
def get_context(self, instance, args, kwargs):
5858
args_len = len(args)
5959
params = args[2] if args_len > 2 else kwargs.get("params")
6060
body = params.pop(BODY_REF_NAME, None) if params else None
@@ -78,7 +78,10 @@ def get_context(self, args, kwargs):
7878
if isinstance(body, dict) and "script" in body:
7979
# only get the `script` field from the body
8080
context["db"]["statement"] = json.dumps({"script": body["script"]})
81-
# TODO: add instance.base_url to context once we agreed on a format
81+
context["destination"] = {
82+
"address": instance.host,
83+
"service": {"name": "elasticsearch", "resource": "elasticsearch", "type": "db"},
84+
}
8285
return context
8386

8487

@@ -92,7 +95,7 @@ class ElasticsearchConnectionInstrumentation(ElasticSearchConnectionMixin, Abstr
9295

9396
def call(self, module, method, wrapped, instance, args, kwargs):
9497
signature = self.get_signature(args, kwargs)
95-
context = self.get_context(args, kwargs)
98+
context = self.get_context(instance, args, kwargs)
9699

97100
with elasticapm.capture_span(
98101
signature,

elasticapm/instrumentation/packages/mysql.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
DbApi2Instrumentation,
3535
extract_signature,
3636
)
37+
from elasticapm.utils import default_ports
3738

3839

3940
class MySQLCursorProxy(CursorProxy):
@@ -53,4 +54,9 @@ class MySQLInstrumentation(DbApi2Instrumentation):
5354
instrument_list = [("MySQLdb", "connect")]
5455

5556
def call(self, module, method, wrapped, instance, args, kwargs):
56-
return MySQLConnectionProxy(wrapped(*args, **kwargs))
57+
destination_info = {
58+
"address": args[0] if len(args) else kwargs.get("host", "localhost"),
59+
"port": args[4] if len(args) > 4 else int(kwargs.get("port", default_ports.get("mysql"))),
60+
"service": {"name": "mysql", "resource": "mysql", "type": "db"},
61+
}
62+
return MySQLConnectionProxy(wrapped(*args, **kwargs), destination_info=destination_info)

elasticapm/instrumentation/packages/mysql_connector.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
DbApi2Instrumentation,
3636
extract_signature,
3737
)
38+
from elasticapm.utils import default_ports
3839

3940

4041
class MySQLCursorProxy(CursorProxy):
@@ -54,4 +55,9 @@ class MySQLConnectorInstrumentation(DbApi2Instrumentation):
5455
instrument_list = [("mysql.connector", "connect")]
5556

5657
def call(self, module, method, wrapped, instance, args, kwargs):
57-
return MySQLConnectionProxy(wrapped(*args, **kwargs))
58+
destination_info = {
59+
"address": kwargs.get("host", "localhost"),
60+
"port": int(kwargs.get("port", default_ports.get("mysql"))),
61+
"service": {"name": "mysql", "resource": "mysql", "type": "db"},
62+
}
63+
return MySQLConnectionProxy(wrapped(*args, **kwargs), destination_info=destination_info)

elasticapm/instrumentation/packages/psycopg2.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
extract_signature,
3636
)
3737
from elasticapm.traces import capture_span
38-
from elasticapm.utils import default_ports
38+
from elasticapm.utils import compat, default_ports
3939

4040

4141
class PGCursorProxy(CursorProxy):
@@ -52,14 +52,14 @@ def extract_signature(self, sql):
5252
return extract_signature(sql)
5353

5454
def __enter__(self):
55-
return PGCursorProxy(self.__wrapped__.__enter__())
55+
return PGCursorProxy(self.__wrapped__.__enter__(), destination_info=self._self_destination_info)
5656

5757

5858
class PGConnectionProxy(ConnectionProxy):
5959
cursor_proxy = PGCursorProxy
6060

6161
def __enter__(self):
62-
return PGConnectionProxy(self.__wrapped__.__enter__())
62+
return PGConnectionProxy(self.__wrapped__.__enter__(), destination_info=self._self_destination_info)
6363

6464

6565
class Psycopg2Instrumentation(DbApi2Instrumentation):
@@ -72,19 +72,30 @@ def call(self, module, method, wrapped, instance, args, kwargs):
7272

7373
host = kwargs.get("host")
7474
if host:
75-
signature += " " + str(host)
75+
signature += " " + compat.text_type(host)
7676

7777
port = kwargs.get("port")
7878
if port:
7979
port = str(port)
8080
if int(port) != default_ports.get("postgresql"):
81-
signature += ":" + port
81+
host += ":" + port
82+
signature += " " + compat.text_type(host)
8283
else:
8384
# Parse connection string and extract host/port
8485
pass
85-
86-
with capture_span(signature, span_type="db", span_subtype="postgresql", span_action="connect"):
87-
return PGConnectionProxy(wrapped(*args, **kwargs))
86+
destination_info = {
87+
"address": kwargs.get("host", "localhost"),
88+
"port": int(kwargs.get("port", default_ports.get("postgresql"))),
89+
"service": {"name": "postgresql", "resource": "postgresql", "type": "db"},
90+
}
91+
with capture_span(
92+
signature,
93+
span_type="db",
94+
span_subtype="postgresql",
95+
span_action="connect",
96+
extra={"destination": destination_info},
97+
):
98+
return PGConnectionProxy(wrapped(*args, **kwargs), destination_info=destination_info)
8899

89100

90101
class Psycopg2ExtensionsInstrumentation(DbApi2Instrumentation):

0 commit comments

Comments
 (0)