Skip to content

Commit 8ee719b

Browse files
authored
Added Async support for DBAPI2 w/ implementation for psycopg (#1944)
* Introducing AsyncIO DBAPIv2 + Psycopg async support * Introducing AsyncIO DBAPIv2 + Psycopg async support * Fixed incorrect import
1 parent 0adfefc commit 8ee719b

File tree

3 files changed

+231
-0
lines changed

3 files changed

+231
-0
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2019, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
31+
"""Provides classes to instrument dbapi2 providers
32+
33+
https://www.python.org/dev/peps/pep-0249/
34+
"""
35+
36+
import wrapt
37+
38+
from elasticapm.contrib.asyncio.traces import async_capture_span
39+
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
40+
from elasticapm.instrumentation.packages.dbapi2 import EXEC_ACTION, QUERY_ACTION
41+
from elasticapm.utils.encoding import shorten
42+
43+
44+
class AsyncCursorProxy(wrapt.ObjectProxy):
45+
provider_name = None
46+
DML_QUERIES = ("INSERT", "DELETE", "UPDATE")
47+
48+
def __init__(self, wrapped, destination_info=None):
49+
super(AsyncCursorProxy, self).__init__(wrapped)
50+
self._self_destination_info = destination_info or {}
51+
52+
async def callproc(self, procname, params=None):
53+
return await self._trace_sql(self.__wrapped__.callproc, procname, params, action=EXEC_ACTION)
54+
55+
async def execute(self, sql, params=None):
56+
return await self._trace_sql(self.__wrapped__.execute, sql, params)
57+
58+
async def executemany(self, sql, param_list):
59+
return await self._trace_sql(self.__wrapped__.executemany, sql, param_list)
60+
61+
def _bake_sql(self, sql):
62+
"""
63+
Method to turn the "sql" argument into a string. Most database backends simply return
64+
the given object, as it is already a string
65+
"""
66+
return sql
67+
68+
async def _trace_sql(self, method, sql, params, action=QUERY_ACTION):
69+
sql_string = self._bake_sql(sql)
70+
if action == EXEC_ACTION:
71+
signature = sql_string + "()"
72+
else:
73+
signature = self.extract_signature(sql_string)
74+
75+
# Truncate sql_string to 10000 characters to prevent large queries from
76+
# causing an error to APM server.
77+
sql_string = shorten(sql_string, string_length=10000)
78+
79+
async with async_capture_span(
80+
signature,
81+
span_type="db",
82+
span_subtype=self.provider_name,
83+
span_action=action,
84+
extra={
85+
"db": {"type": "sql", "statement": sql_string, "instance": getattr(self, "_self_database", None)},
86+
"destination": self._self_destination_info,
87+
},
88+
skip_frames=1,
89+
leaf=True,
90+
) as span:
91+
if params is None:
92+
result = await method(sql)
93+
else:
94+
result = await method(sql, params)
95+
# store "rows affected", but only for DML queries like insert/update/delete
96+
if span and self.rowcount not in (-1, None) and signature.startswith(self.DML_QUERIES):
97+
span.update_context("db", {"rows_affected": self.rowcount})
98+
return result
99+
100+
def extract_signature(self, sql):
101+
raise NotImplementedError()
102+
103+
104+
class AsyncConnectionProxy(wrapt.ObjectProxy):
105+
cursor_proxy = AsyncCursorProxy
106+
107+
def __init__(self, wrapped, destination_info=None):
108+
super(AsyncConnectionProxy, self).__init__(wrapped)
109+
self._self_destination_info = destination_info
110+
111+
def cursor(self, *args, **kwargs):
112+
return self.cursor_proxy(self.__wrapped__.cursor(*args, **kwargs), self._self_destination_info)
113+
114+
115+
class AsyncDbApi2Instrumentation(AsyncAbstractInstrumentedModule):
116+
connect_method = None
117+
118+
async def call(self, module, method, wrapped, instance, args, kwargs):
119+
return AsyncConnectionProxy(await wrapped(*args, **kwargs))
120+
121+
async def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
122+
# Contrasting to the superclass implementation, we *always* want to
123+
# return a proxied connection, even if there is no ongoing elasticapm
124+
# transaction yet. This ensures that we instrument the cursor once
125+
# the transaction started.
126+
return await self.call(module, method, wrapped, instance, args, kwargs)
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2019, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
from __future__ import absolute_import
31+
32+
from elasticapm.contrib.asyncio.traces import async_capture_span
33+
from elasticapm.instrumentation.packages.asyncio.dbapi2_asyncio import (
34+
AsyncConnectionProxy,
35+
AsyncCursorProxy,
36+
AsyncDbApi2Instrumentation,
37+
)
38+
from elasticapm.instrumentation.packages.dbapi2 import extract_signature
39+
from elasticapm.instrumentation.packages.psycopg2 import get_destination_info
40+
41+
42+
class PGAsyncCursorProxy(AsyncCursorProxy):
43+
provider_name = "postgresql"
44+
45+
def _bake_sql(self, sql):
46+
# If this is a Composable object, use its `as_string` method.
47+
# See https://www.psycopg.org/psycopg3/docs/api/sql.html
48+
if hasattr(sql, "as_string"):
49+
sql = sql.as_string(self.__wrapped__)
50+
# If the sql string is already a byte string, we need to decode it using the connection encoding
51+
if isinstance(sql, bytes):
52+
sql = sql.decode(self.connection.info.encoding)
53+
return sql
54+
55+
def extract_signature(self, sql):
56+
return extract_signature(sql)
57+
58+
async def __aenter__(self):
59+
return PGAsyncCursorProxy(await self.__wrapped__.__aenter__(), destination_info=self._self_destination_info)
60+
61+
async def __aexit__(self, *args):
62+
return PGAsyncCursorProxy(await self.__wrapped__.__aexit__(*args), destination_info=self._self_destination_info)
63+
64+
@property
65+
def _self_database(self):
66+
return self.connection.info.dbname or ""
67+
68+
69+
class PGAsyncConnectionProxy(AsyncConnectionProxy):
70+
cursor_proxy = PGAsyncCursorProxy
71+
72+
async def __aenter__(self):
73+
return PGAsyncConnectionProxy(await self.__wrapped__.__aenter__(), destination_info=self._self_destination_info)
74+
75+
async def __aexit__(self, *args):
76+
return PGAsyncConnectionProxy(
77+
await self.__wrapped__.__aexit__(*args), destination_info=self._self_destination_info
78+
)
79+
80+
81+
class AsyncPsycopgInstrumentation(AsyncDbApi2Instrumentation):
82+
name = "psycopg_async"
83+
84+
instrument_list = [("psycopg.connection_async", "AsyncConnection.connect")]
85+
86+
async def call(self, module, method, wrapped, instance, args, kwargs):
87+
signature = "psycopg.connect_async"
88+
89+
host, port = get_destination_info(kwargs.get("host"), kwargs.get("port"))
90+
database = kwargs.get("dbname")
91+
signature = f"{signature} {host}:{port}" # noqa: E231
92+
destination_info = {
93+
"address": host,
94+
"port": port,
95+
}
96+
async with async_capture_span(
97+
signature,
98+
span_type="db",
99+
span_subtype="postgresql",
100+
span_action="connect",
101+
leaf=True,
102+
extra={"destination": destination_info, "db": {"type": "sql", "instance": database}},
103+
):
104+
return PGAsyncConnectionProxy(await wrapped(*args, **kwargs), destination_info=destination_info)

elasticapm/instrumentation/register.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
"elasticapm.instrumentation.packages.asyncio.starlette.StarletteServerErrorMiddlewareInstrumentation",
9595
"elasticapm.instrumentation.packages.asyncio.redis_asyncio.RedisAsyncioInstrumentation",
9696
"elasticapm.instrumentation.packages.asyncio.redis_asyncio.RedisPipelineInstrumentation",
97+
"elasticapm.instrumentation.packages.asyncio.psycopg_async.AsyncPsycopgInstrumentation",
9798
"elasticapm.instrumentation.packages.grpc.GRPCAsyncServerInstrumentation",
9899
]
99100
)

0 commit comments

Comments
 (0)