Skip to content

Commit f8ed932

Browse files
authored
feat: add asyncpg integration (backport #3410) (#3428) (#3434)
* feat: add asyncpg integration (#3410) # Trace Integration This PR adds support for [`asyncpg`](https://magicstack.github.io/asyncpg/) starting with `v0.18.0`. [Connections](https://magicstack.github.io/asyncpg/current/api/index.html#connection), [cursors](https://magicstack.github.io/asyncpg/current/api/index.html#cursors) are traced. Note that this integration cannot use the `dbapi` patching that we commonly do due to asyncpg not following the api (dbapi in general is not compatible with async usage). Work for this was started in #394, thanks again to @thehesiod!! As a follow up, support can be added for [connection pools](https://magicstack.github.io/asyncpg/current/api/index.html#connection-pools). ## Links - Integration docs: https://magicstack.github.io/asyncpg/ - Corp docs PR: TODO - https://github.com/MagicStack/asyncpg - https://cs.github.com/MagicStack/asyncpg/ - https://pypi.org/project/asyncpg - https://github.com/encode/databases - [Connection source code](https://cs.github.com/MagicStack/asyncpg/blob/a2f093df6aceec7842709eaf92c5ff9df093efae/asyncpg/connection.py) - [Cursor source code](https://cs.github.com/MagicStack/asyncpg/blob/a2f093df6aceec7842709eaf92c5ff9df093efae/asyncpg/cursor.py ) ## Checklist - [x] Usage and configuration documentation added in `__init__.py`, `docs/index.rst` and `docs/integrations.rst`. - [ ] [Corp docs](https://github.com/Datadog/documentation) PR to add new integration to documentation. - [x] Span metadata - [x] Service (use [`int_service`](https://github.com/DataDog/dd-trace-py/blob/90d1d5981c72ea312c21ac04e5be47521d0f0f2e/ddtrace/contrib/trace_utils.py#L55) or [`ext_service`](https://github.com/DataDog/dd-trace-py/blob/90d1d5981c72ea312c21ac04e5be47521d0f0f2e/ddtrace/contrib/trace_utils.py#L87)). - [x] Span type should be one of [these](https://github.com/DataDog/dd-trace-py/blob/90d1d5981c72ea312c21ac04e5be47521d0f0f2e/ddtrace/ext/__init__.py#L7). - [x] Resource - [x] Measured tag - [x] ~Sample analytics~ - [x] Global configuration - [x] `ddtrace.config` entry is specified. - [x] Environment variables are provided for config options. - [x] Instance configuration - [x] Pin overriding. - [x] Service name override (if applicable). - [x] Async - [x] Span parenting behaves as expected. - [x] Context propagation across async contexts. - [x] Tests - [x] Use `pytest` fixtures found in `tests/conftest.py` or the test helpers on `TracerTestCase` if writing `unittest` style test cases. - [x] Tests are provided for all the above. - [x] Tests are added to CI (`.circleci/config.yml`). - [x] Functionality is maintained from original library by adding the library's test suite with the tracer enabled to our CI. - [x] Patch test cases are added (see `test_django_patch.py` for an example). - [x] All Python versions that the library supports are tested. - [x] All significant library versions (including the latest) are tested. This typically includes every minor release going back a few years. (cherry picked from commit a0515ec) * Update ddtrace/contrib/asyncpg/__init__.py Co-authored-by: Kyle Verhoog <[email protected]> Co-authored-by: Brett Langdon <[email protected]> (cherry picked from commit 4a18ed0) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent 99bc267 commit f8ed932

22 files changed

+1372
-0
lines changed

.circleci/config.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,14 @@ jobs:
461461
- run_tox_scenario:
462462
pattern: '^asyncio_contrib-'
463463

464+
asyncpg:
465+
<<: *machine_executor
466+
steps:
467+
- run_test:
468+
pattern: 'asyncpg'
469+
snapshot: true
470+
docker_services: 'postgres'
471+
464472
pylons:
465473
<<: *contrib_job
466474
steps:
@@ -975,6 +983,7 @@ requires_tests: &requires_tests
975983
- aiopg
976984
- aioredis
977985
- asyncio
986+
- asyncpg
978987
- algoliasearch
979988
- asgi
980989
- benchmarks
@@ -1062,6 +1071,7 @@ workflows:
10621071
- aiopg: *requires_base_venvs
10631072
- aioredis: *requires_base_venvs
10641073
- asyncio: *requires_base_venvs
1074+
- asyncpg: *requires_base_venvs
10651075
- algoliasearch: *requires_base_venvs
10661076
- asgi: *requires_base_venvs
10671077
- benchmarks: *requires_base_venvs

.github/workflows/test_frameworks.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,3 +278,35 @@ jobs:
278278
run: "make init"
279279
- name: Run tests
280280
run: ddtrace-run pytest -p no:warnings tests
281+
282+
asyncpg-testsuite-0_25_0:
283+
# https://github.com/MagicStack/asyncpg/blob/v0.25.0/.github/workflows/tests.yml#L125
284+
runs-on: "ubuntu-latest"
285+
env:
286+
DD_TESTING_RAISE: true
287+
DD_PROFILING_ENABLED: true
288+
defaults:
289+
run:
290+
working-directory: asyncpg
291+
steps:
292+
- uses: actions/setup-python@v3
293+
with:
294+
python-version: '3.9'
295+
- uses: actions/checkout@v3
296+
with:
297+
path: ddtrace
298+
- uses: actions/checkout@v3
299+
with:
300+
repository: magicstack/asyncpg
301+
ref: v0.25.0
302+
path: asyncpg
303+
fetch-depth: 50
304+
submodules: true
305+
- name: Install ddtrace
306+
run: pip install ../ddtrace
307+
- name: Install dependencies
308+
run: |
309+
python -m pip install -U pip setuptools wheel
310+
python -m pip install -e .[test]
311+
- name: Run tests
312+
run: ddtrace-run python setup.py test

ddtrace/_monkey.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
"dogpile_cache": True,
7878
"yaaredis": True,
7979
"aiohttp_jinja2": False, # disabled as this is handled by aiohttp for now.
80+
"asyncpg": True,
8081
}
8182

8283
_LOCK = threading.Lock()
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""
2+
The ``asyncpg`` integration traces database requests made using connection
3+
and cursor objects.
4+
5+
6+
Enabling
7+
~~~~~~~~
8+
9+
The integration is enabled automatically when using
10+
:ref:`ddtrace-run<ddtracerun>` or :ref:`patch_all()<patch_all>`.
11+
12+
Or use :ref:`patch()<patch>` to manually enable the integration::
13+
14+
from ddtrace import patch
15+
patch(asyncpg=True)
16+
17+
18+
Global Configuration
19+
~~~~~~~~~~~~~~~~~~~~
20+
21+
.. py:data:: ddtrace.config.asyncpg['service']
22+
23+
The service name reported by default for asyncpg connections.
24+
25+
This option can also be set with the ``DD_ASYNCPG_SERVICE``
26+
environment variable.
27+
28+
Default: ``postgres``
29+
30+
31+
Instance Configuration
32+
~~~~~~~~~~~~~~~~~~~~~~
33+
34+
Service
35+
^^^^^^^
36+
37+
To configure the service name used by the asyncpg integration on a per-instance
38+
basis use the ``Pin`` API::
39+
40+
import asyncpg
41+
from ddtrace import Pin
42+
43+
conn = asyncpg.connect("postgres://localhost:5432")
44+
Pin.override(conn, service="custom-service")
45+
"""
46+
from ...internal.utils.importlib import require_modules
47+
48+
49+
required_modules = ["asyncpg"]
50+
51+
with require_modules(required_modules) as missing_modules:
52+
if not missing_modules:
53+
from .patch import patch
54+
from .patch import unpatch
55+
56+
__all__ = [
57+
"patch",
58+
"unpatch",
59+
]

ddtrace/contrib/asyncpg/patch.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
from typing import TYPE_CHECKING
2+
3+
from ddtrace import Pin
4+
from ddtrace import config
5+
from ddtrace.vendor import wrapt
6+
7+
from ...constants import SPAN_MEASURED_KEY
8+
from ...ext import SpanTypes
9+
from ...ext import db
10+
from ...ext import net
11+
from ...internal.logger import get_logger
12+
from ...internal.utils import get_argument_value
13+
from ..trace_utils import ext_service
14+
from ..trace_utils import unwrap
15+
from ..trace_utils import wrap
16+
from ..trace_utils_async import with_traced_module
17+
18+
19+
if TYPE_CHECKING:
20+
from types import ModuleType
21+
from typing import Dict
22+
from typing import Union
23+
24+
import asyncpg
25+
from asyncpg.prepared_stmt import PreparedStatement
26+
27+
28+
config._add(
29+
"asyncpg",
30+
dict(
31+
_default_service="postgres",
32+
),
33+
)
34+
35+
36+
log = get_logger(__name__)
37+
38+
39+
def _get_connection_tags(conn):
40+
# type: (asyncpg.Connection) -> Dict[str, str]
41+
addr = conn._addr
42+
params = conn._params
43+
host = port = ""
44+
if isinstance(addr, tuple) and len(addr) == 2:
45+
host, port = addr
46+
return {
47+
net.TARGET_HOST: host,
48+
net.TARGET_PORT: port,
49+
db.USER: params.user,
50+
db.NAME: params.database,
51+
}
52+
53+
54+
class _TracedConnection(wrapt.ObjectProxy):
55+
def __init__(self, conn, pin):
56+
super(_TracedConnection, self).__init__(conn)
57+
conn_pin = pin.clone(tags=_get_connection_tags(conn))
58+
# Keep the pin on the protocol
59+
conn_pin.onto(self._protocol)
60+
61+
def __setddpin__(self, pin):
62+
pin.onto(self._protocol)
63+
64+
def __getddpin__(self):
65+
return Pin.get_from(self._protocol)
66+
67+
68+
@with_traced_module
69+
async def _traced_connect(asyncpg, pin, func, instance, args, kwargs):
70+
"""Traced asyncpg.connect().
71+
72+
connect() is instrumented and patched to return a connection proxy.
73+
"""
74+
with pin.tracer.trace(
75+
"postgres.connect", span_type=SpanTypes.SQL, service=ext_service(pin, config.asyncpg)
76+
) as span:
77+
# Need an ObjectProxy since Connection uses slots
78+
conn = _TracedConnection(await func(*args, **kwargs), pin)
79+
span.set_tags(_get_connection_tags(conn))
80+
return conn
81+
82+
83+
async def _traced_query(pin, method, query, args, kwargs):
84+
with pin.tracer.trace(
85+
"postgres.query", resource=query, service=ext_service(pin, config.asyncpg), span_type=SpanTypes.SQL
86+
) as span:
87+
span.set_tag(SPAN_MEASURED_KEY)
88+
span.set_tags(pin.tags)
89+
return await method(*args, **kwargs)
90+
91+
92+
@with_traced_module
93+
async def _traced_protocol_execute(asyncpg, pin, func, instance, args, kwargs):
94+
state = get_argument_value(args, kwargs, 0, "state") # type: Union[str, PreparedStatement]
95+
query = state if isinstance(state, str) else state.query
96+
return await _traced_query(pin, func, query, args, kwargs)
97+
98+
99+
def _patch(asyncpg):
100+
# type: (ModuleType) -> None
101+
wrap(asyncpg, "connect", _traced_connect(asyncpg))
102+
for method in ("execute", "bind_execute", "query", "bind_execute_many"):
103+
wrap(asyncpg.protocol, "Protocol.%s" % method, _traced_protocol_execute(asyncpg))
104+
105+
106+
def patch():
107+
# type: () -> None
108+
import asyncpg
109+
110+
if getattr(asyncpg, "_datadog_patch", False):
111+
return
112+
113+
Pin().onto(asyncpg)
114+
_patch(asyncpg)
115+
116+
setattr(asyncpg, "_datadog_patch", True)
117+
118+
119+
def _unpatch(asyncpg):
120+
# type: (ModuleType) -> None
121+
unwrap(asyncpg, "connect")
122+
for method in ("execute", "bind_execute", "query", "bind_execute_many"):
123+
unwrap(asyncpg.protocol.Protocol, method)
124+
125+
126+
def unpatch():
127+
# type: () -> None
128+
import asyncpg
129+
130+
if not getattr(asyncpg, "_datadog_patch", False):
131+
return
132+
133+
_unpatch(asyncpg)
134+
135+
setattr(asyncpg, "_datadog_patch", False)

docs/index.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ contacting support.
5959
+--------------------------------------------------+---------------+----------------+
6060
| :ref:`asyncio` | \* | Yes [4]_ |
6161
+--------------------------------------------------+---------------+----------------+
62+
| :ref:`asyncpg` | >= 0.18.0 | Yes |
63+
+--------------------------------------------------+---------------+----------------+
6264
| :ref:`boto2` | >= 2.29.0 | Yes |
6365
+--------------------------------------------------+---------------+----------------+
6466
| :ref:`botocore` | >= 1.4.51 | Yes |

docs/integrations.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ asyncio
6464
.. automodule:: ddtrace.contrib.asyncio
6565

6666

67+
.. _asyncpg:
68+
69+
asyncpg
70+
^^^^^^^
71+
.. automodule:: ddtrace.contrib.asyncpg
72+
73+
6774
.. _botocore:
6875

6976
botocore

docs/spelling_wordlist.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ aredis
2020
args
2121
asgi
2222
asyncio
23+
asyncpg
2324
attrs
2425
autodetected
2526
autopatching
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
features:
3+
- |
4+
asyncpg: add integration supporting v0.18.0 and above. See :ref:`the docs<asyncpg>` for more information.

riotfile.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,5 +1616,46 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION):
16161616
],
16171617
},
16181618
),
1619+
Venv(
1620+
name="asyncpg",
1621+
command="pytest {cmdargs} tests/contrib/asyncpg",
1622+
pkgs={
1623+
"pytest-asyncio": latest,
1624+
},
1625+
venvs=[
1626+
Venv(
1627+
pys=select_pys(min_version="3.6", max_version="3.8"),
1628+
pkgs={
1629+
"asyncpg": [
1630+
"~=0.18.0",
1631+
"~=0.20.0",
1632+
"~=0.22.0",
1633+
"~=0.24.0",
1634+
latest,
1635+
],
1636+
},
1637+
),
1638+
Venv(
1639+
pys=["3.9"],
1640+
pkgs={
1641+
"asyncpg": [
1642+
"~=0.20.0",
1643+
"~=0.22.0",
1644+
"~=0.24.0",
1645+
latest,
1646+
],
1647+
},
1648+
),
1649+
Venv(
1650+
pys=select_pys(min_version="3.10"),
1651+
pkgs={
1652+
"asyncpg": [
1653+
"~=0.24.0",
1654+
latest,
1655+
],
1656+
},
1657+
),
1658+
],
1659+
),
16191660
],
16201661
)

0 commit comments

Comments
 (0)