Skip to content

Commit b9fc5c0

Browse files
mtoffl01Kyle-Verhoogbrettlangdonmabdinurmajorgreys
authored
aioredis integration
* initialized aioredis integration, added aioredis to patch_modules in _monkey.py * added pin instance * changed spacing * commit before switching branches * filled out wrapper functions in aioredis. added aioredis to integrations.rst, and created a release note * removed superfluous import * Pin() takes no arguments (`app` is outdated) Co-authored-by: Kyle Verhoog <[email protected]> * Pin().onto method takes in the class Co-authored-by: Kyle Verhoog <[email protected]> * updated formatting for index.rst to include aioredis * updated aioredis intg. to be compatible with version 2.0 and 1.3. updated aioredis test suite and helper functions to reflect these changes * added more tests and limited support for version 1.3 to basic commands * separated tests out to apply to only v 2.0+ or 1.3 * altered release note to reflect differences btwn support for 1.3 and 2 * updated SUPVER for aioredis integration * updated integration documentation to reflect partial support for verison 1.3 * finalized tests for aioreids * added aioredis to circleci; also moved the config for aredis in the .cricleci/config because it was out of order, alphabetically * Reverted aredis changes made within this branch, as it polluted the github diffs * formatted with riot * added aioredis to spelling list * remove extra print Co-authored-by: Kyle Verhoog <[email protected]> * Remove superfluous comment Co-authored-by: Kyle Verhoog <[email protected]> * update wording on integration documentation Co-authored-by: Kyle Verhoog <[email protected]> * Remove superfluous comment Co-authored-by: Kyle Verhoog <[email protected]> * remove superfluous print Co-authored-by: Kyle Verhoog <[email protected]> * update wording on release note Co-authored-by: Kyle Verhoog <[email protected]> * removed superfluous check on traced_execute_pipeline fn * tried to revert formatting changes to circleci/config * tried to revert formatting changes to circleci/config * Update ddtrace/contrib/aioredis/__init__.py Co-authored-by: Kyle Verhoog <[email protected]> * Update docs/index.rst Co-authored-by: Brett Langdon <[email protected]> * Update tests/contrib/aioredis/test_aioredis.py Co-authored-by: Brett Langdon <[email protected]> * wip: code review changes * remove empty snapshots and mark.asyncio annotations * isort * add pytest.mark.asyncio to all tests * fix snapshots * fix docs ref error and add instance arg back to redis util with an attribute check * clean up aioredis tests, reduce duplication * clean up aioredis tests, reduce duplication * clean up aioredis snapshots, remove duplicate tests and traces * clean up aioredis snapshots, remove duplicate tests and traces * update deprecated import * Update ddtrace/contrib/aredis/patch.py Co-authored-by: Tahir H. Butt <[email protected]> * Update ddtrace/contrib/redis/util.py Co-authored-by: Gabriele N. Tornetta <[email protected]> * refactor redundant check * black * revert stringify change and update conn_tag helper * move byte decode to aioredis patch * isort * format strings in aioredis and not in the generic helper * Update aioredis 1.3 support * add missing import * Update ddtrace/contrib/aioredis/patch.py Co-authored-by: Kyle Verhoog <[email protected]> * use _start_span() instead of trace() to create an inactive span * verify that piplines creates sisters spans and not child spans * Update ddtrace/contrib/aioredis/patch.py Co-authored-by: Kyle Verhoog <[email protected]> * helpful comment about activate=False * create test with parent span and two child spans * add service name to parent span * Update ddtrace/contrib/aioredis/__init__.py Co-authored-by: Kyle Verhoog <[email protected]> * Update releasenotes/notes/aioredis-integration-65c72aefff9d814c.yaml Co-authored-by: Kyle Verhoog <[email protected]> * Update releasenotes/notes/aioredis-integration-65c72aefff9d814c.yaml Co-authored-by: Kyle Verhoog <[email protected]> * Update ddtrace/contrib/aioredis/patch.py Co-authored-by: Brett Langdon <[email protected]> * Update __init__.py * Update patch.py * add connection tags Co-authored-by: Kyle Verhoog <[email protected]> Co-authored-by: Brett Langdon <[email protected]> Co-authored-by: Munir Abdinur <[email protected]> Co-authored-by: Tahir H. Butt <[email protected]> Co-authored-by: Gabriele N. Tornetta <[email protected]>
1 parent 11c5797 commit b9fc5c0

20 files changed

+788
-2
lines changed

.circleci/config.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,15 @@ jobs:
802802
wait: postgres
803803
pattern: 'aiopg'
804804

805+
aioredis:
806+
<<: *machine_executor
807+
parallelism: 4
808+
steps:
809+
- run_test:
810+
docker_services: 'redis'
811+
pattern: 'aioredis$'
812+
snapshot: true
813+
805814
aredis:
806815
<<: *machine_executor
807816
parallelism: 4
@@ -946,6 +955,7 @@ requires_tests: &requires_tests
946955
- aiobotocore
947956
- aiohttp
948957
- aiopg
958+
- aioredis
949959
- asyncio
950960
- algoliasearch
951961
- asgi
@@ -1031,6 +1041,7 @@ workflows:
10311041
- aiobotocore: *requires_base_venvs
10321042
- aiohttp: *requires_base_venvs
10331043
- aiopg: *requires_base_venvs
1044+
- aioredis: *requires_base_venvs
10341045
- asyncio: *requires_base_venvs
10351046
- algoliasearch: *requires_base_venvs
10361047
- asgi: *requires_base_venvs

ddtrace/_monkey.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
# Default set of modules to automatically patch or not
2020
PATCH_MODULES = {
21+
"aioredis": True,
2122
"aredis": True,
2223
"asyncio": True,
2324
"boto": True,
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""
2+
The aioredis integration instruments aioredis requests. Version 1.3 and above are fully
3+
supported.
4+
5+
6+
Enabling
7+
~~~~~~~~
8+
9+
The aioredis integration is enabled automatically when using
10+
:ref:`ddtrace-run <ddtracerun>` or :ref:`patch_all() <patch_all>`.
11+
12+
Or use :ref:`patch() <patch>` to manually enable the integration::
13+
14+
from ddtrace import patch
15+
patch(aioredis=True)
16+
17+
18+
Global Configuration
19+
~~~~~~~~~~~~~~~~~~~~
20+
21+
.. py:data:: ddtrace.config.aioredis["service"]
22+
23+
The service name reported by default for aioredis instances.
24+
25+
This option can also be set with the ``DD_AIOREDIS_SERVICE`` environment
26+
variable.
27+
28+
Default: ``"redis"``
29+
30+
31+
Instance Configuration
32+
~~~~~~~~~~~~~~~~~~~~~~
33+
34+
To configure the aioredis integration on an per-instance basis use the
35+
``Pin`` API::
36+
37+
import aioredis
38+
from ddtrace import Pin
39+
40+
myaioredis = aioredis.Aioredis()
41+
Pin.override(myaioredis, service="myaioredis")
42+
"""
43+
from ...internal.utils.importlib import require_modules
44+
45+
46+
required_modules = ["aioredis"]
47+
48+
with require_modules(required_modules) as missing_modules:
49+
if not missing_modules:
50+
from .patch import patch
51+
from .patch import unpatch
52+
53+
__all__ = ["patch", "unpatch"]

ddtrace/contrib/aioredis/patch.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import sys
2+
3+
import aioredis
4+
5+
from ddtrace import config
6+
from ddtrace.internal.utils.wrappers import unwrap as _u
7+
from ddtrace.pin import Pin
8+
from ddtrace.vendor.wrapt import wrap_function_wrapper as _w
9+
10+
from .. import trace_utils
11+
from ...constants import ANALYTICS_SAMPLE_RATE_KEY
12+
from ...constants import SPAN_MEASURED_KEY
13+
from ...ext import SpanTypes
14+
from ...ext import net
15+
from ...ext import redis as redisx
16+
from ..redis.util import _trace_redis_cmd
17+
from ..redis.util import _trace_redis_execute_pipeline
18+
from ..redis.util import format_command_args
19+
20+
21+
try:
22+
from aioredis.commands.transaction import _RedisBuffer
23+
except ImportError:
24+
_RedisBuffer = None
25+
26+
config._add("aioredis", dict(_default_service="redis"))
27+
28+
aioredis_version_str = getattr(aioredis, "__version__", "0.0.0")
29+
aioredis_version = tuple([int(i) for i in aioredis_version_str.split(".")])
30+
31+
32+
def patch():
33+
if getattr(aioredis, "_datadog_patch", False):
34+
return
35+
setattr(aioredis, "_datadog_patch", True)
36+
pin = Pin()
37+
if aioredis_version >= (2, 0):
38+
_w("aioredis.client", "Redis.execute_command", traced_execute_command)
39+
_w("aioredis.client", "Redis.pipeline", traced_pipeline)
40+
_w("aioredis.client", "Pipeline.execute", traced_execute_pipeline)
41+
pin.onto(aioredis.client.Redis)
42+
else:
43+
_w("aioredis", "Redis.execute", traced_13_execute_command)
44+
_w("aioredis", "Redis.pipeline", traced_13_pipeline)
45+
_w("aioredis.commands.transaction", "Pipeline.execute", traced_13_execute_pipeline)
46+
pin.onto(aioredis.Redis)
47+
48+
49+
def unpatch():
50+
if not getattr(aioredis, "_datadog_patch", False):
51+
return
52+
53+
setattr(aioredis, "_datadog_patch", False)
54+
if aioredis_version >= (2, 0):
55+
_u(aioredis.client.Redis, "execute_command")
56+
_u(aioredis.client.Redis, "pipeline")
57+
_u(aioredis.client.Pipeline, "execute")
58+
else:
59+
_u(aioredis.Redis, "execute")
60+
_u(aioredis.Redis, "pipeline")
61+
_u(aioredis.commands.transaction.Pipeline, "execute")
62+
63+
64+
async def traced_execute_command(func, instance, args, kwargs):
65+
pin = Pin.get_from(instance)
66+
if not pin or not pin.enabled():
67+
return await func(*args, **kwargs)
68+
69+
decoded_args = [arg.decode() if isinstance(arg, bytes) else arg for arg in args]
70+
with _trace_redis_cmd(pin, config.aioredis, instance, decoded_args):
71+
return await func(*args, **kwargs)
72+
73+
74+
async def traced_pipeline(func, instance, args, kwargs):
75+
pipeline = await func(*args, **kwargs)
76+
pin = Pin.get_from(instance)
77+
if pin:
78+
pin.onto(pipeline)
79+
return pipeline
80+
81+
82+
async def traced_execute_pipeline(func, instance, args, kwargs):
83+
pin = Pin.get_from(instance)
84+
if not pin or not pin.enabled():
85+
return await func(*args, **kwargs)
86+
87+
cmds = [format_command_args(c) for c, _ in instance.command_stack]
88+
resource = "\n".join(cmds)
89+
with _trace_redis_execute_pipeline(pin, config.aioredis, resource, instance):
90+
return await func(*args, **kwargs)
91+
92+
93+
def traced_13_pipeline(func, instance, args, kwargs):
94+
pipeline = func(*args, **kwargs)
95+
pin = Pin.get_from(instance)
96+
if pin:
97+
pin.onto(pipeline)
98+
return pipeline
99+
100+
101+
def traced_13_execute_command(func, instance, args, kwargs):
102+
# If we have a _RedisBuffer then we are in a pipeline
103+
if isinstance(instance.connection, _RedisBuffer):
104+
return func(*args, **kwargs)
105+
106+
pin = Pin.get_from(instance)
107+
if not pin or not pin.enabled():
108+
return func(*args, **kwargs)
109+
110+
decoded_args = [arg.decode() if isinstance(arg, bytes) else arg for arg in args]
111+
# Don't activate the span since this operation is performed as a future which concludes sometime later on in
112+
# execution so subsequent operations in the stack are not necessarily semantically related
113+
# (we don't want this span to be the parent of all other spans created before the future is resolved)
114+
span = pin.tracer.start_span(
115+
redisx.CMD, service=trace_utils.ext_service(pin, config.aioredis), span_type=SpanTypes.REDIS, activate=False
116+
)
117+
118+
span.set_tag(SPAN_MEASURED_KEY)
119+
query = format_command_args(decoded_args)
120+
span.resource = query
121+
span.set_tag(redisx.RAWCMD, query)
122+
if pin.tags:
123+
span.set_tags(pin.tags)
124+
125+
span.set_tags(
126+
{
127+
net.TARGET_HOST: instance.address[0],
128+
net.TARGET_PORT: instance.address[1],
129+
redisx.DB: instance.db or 0,
130+
}
131+
)
132+
span.set_metric(redisx.ARGS_LEN, len(decoded_args))
133+
# set analytics sample rate if enabled
134+
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.aioredis.get_analytics_sample_rate())
135+
136+
def _finish_span(future):
137+
try:
138+
# Accessing the result will raise an exception if:
139+
# - The future was cancelled
140+
# - There was an error executing the future (`future.exception()`)
141+
# - The future is in an invalid state
142+
future.result()
143+
except Exception:
144+
span.set_exc_info(*sys.exc_info())
145+
finally:
146+
span.finish()
147+
148+
task = func(*args, **kwargs)
149+
task.add_done_callback(_finish_span)
150+
return task
151+
152+
153+
async def traced_13_execute_pipeline(func, instance, args, kwargs):
154+
pin = Pin.get_from(instance)
155+
if not pin or not pin.enabled():
156+
return await func(*args, **kwargs)
157+
158+
cmds = []
159+
for _, cmd, cmd_args, _ in instance._pipeline:
160+
parts = [cmd.decode() if isinstance(cmd, bytes) else cmd]
161+
parts.extend(cmd_args)
162+
cmds.append(format_command_args(parts))
163+
resource = "\n".join(cmds)
164+
with pin.tracer.trace(
165+
redisx.CMD,
166+
resource=resource,
167+
service=trace_utils.ext_service(pin, config.aioredis),
168+
span_type=SpanTypes.REDIS,
169+
) as span:
170+
171+
span.set_tags(
172+
{
173+
net.TARGET_HOST: instance._pool_or_conn.address[0],
174+
net.TARGET_PORT: instance._pool_or_conn.address[1],
175+
redisx.DB: instance._pool_or_conn.db or 0,
176+
}
177+
)
178+
179+
span.set_tag(SPAN_MEASURED_KEY)
180+
span.set_tag(redisx.RAWCMD, resource)
181+
span.set_metric(redisx.PIPELINE_LEN, len(instance._pipeline))
182+
# set analytics sample rate if enabled
183+
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.aioredis.get_analytics_sample_rate())
184+
185+
return await func(*args, **kwargs)

ddtrace/contrib/redis/util.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def _extract_conn_tags(conn_kwargs):
2424
return {
2525
net.TARGET_HOST: conn_kwargs["host"],
2626
net.TARGET_PORT: conn_kwargs["port"],
27-
redisx.DB: conn_kwargs["db"] or 0,
27+
redisx.DB: conn_kwargs.get("db") or 0,
2828
}
2929
except Exception:
3030
return {}
@@ -72,7 +72,9 @@ def _trace_redis_cmd(pin, config_integration, instance, args):
7272
span.set_tag(redisx.RAWCMD, query)
7373
if pin.tags:
7474
span.set_tags(pin.tags)
75-
span.set_tags(_extract_conn_tags(instance.connection_pool.connection_kwargs))
75+
# some redis clients do not have a connection_pool attribute (ex. aioredis v1.3)
76+
if hasattr(instance, "connection_pool"):
77+
span.set_tags(_extract_conn_tags(instance.connection_pool.connection_kwargs))
7678
span.set_metric(redisx.ARGS_LEN, len(args))
7779
# set analytics sample rate if enabled
7880
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config_integration.get_analytics_sample_rate())

docs/index.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ contacting support.
4646
+--------------------------------------------------+---------------+----------------+
4747
| :ref:`aiopg` | >= 0.12.0 | Yes |
4848
+--------------------------------------------------+---------------+----------------+
49+
| :ref:`aioredis` | >= 1.3.0 | Yes |
50+
+--------------------------------------------------+---------------+----------------+
4951
| :ref:`algoliasearch` | >= 1.20.0 | Yes |
5052
+--------------------------------------------------+---------------+----------------+
5153
| :ref:`aredis` | >= 1.1.0 | Yes |

docs/integrations.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
Integrations
44
------------
55

6+
.. _aioredis:
7+
8+
aioredis
9+
^^^^^^^^
10+
.. automodule:: ddtrace.contrib.aioredis
11+
12+
613
.. _aiobotocore:
714

815
aiobotocore

docs/spelling_wordlist.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ OpenTracing
99
aiobotocore
1010
aiohttp
1111
aiopg
12+
aioredis
1213
algolia
1314
algoliasearch
1415
agentless
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
features:
3+
- |
4+
Add tracing support for the ``aioredis`` library. Version 1.3+ is fully
5+
supported.

riotfile.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,5 +1423,17 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION):
14231423
},
14241424
command="reno {cmdargs}",
14251425
),
1426+
Venv(
1427+
name="aioredis",
1428+
pys=select_pys(min_version="3.6"),
1429+
command="pytest {cmdargs} tests/contrib/aioredis",
1430+
pkgs={
1431+
"pytest-asyncio": latest,
1432+
"aioredis": [
1433+
"~=1.3.0",
1434+
latest,
1435+
],
1436+
},
1437+
),
14261438
],
14271439
)

0 commit comments

Comments
 (0)