Skip to content

Commit 20d5823

Browse files
fix(aioredis): Allow usage of wrapped pipelines in async with statements (#3119)
Fixes #3106 The `Redis.pipeline` function that we wrap is not an async function but our wrapper was. This meant we were changing the signature and return type of the original wrapped function, causing an incompatibility when used with `async with` statements. Original function: https://github.com/aio-libs/aioredis-py/blob/224f843bd4b33d657770bded6f86ce33b881257c/aioredis/client.py#L930-L942 (cherry picked from commit d91a8ff) Co-authored-by: Brett Langdon <[email protected]>
1 parent 19b3c5e commit 20d5823

File tree

4 files changed

+63
-2
lines changed

4 files changed

+63
-2
lines changed

ddtrace/contrib/aioredis/patch.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ async def traced_execute_command(func, instance, args, kwargs):
7070
return await func(*args, **kwargs)
7171

7272

73-
async def traced_pipeline(func, instance, args, kwargs):
74-
pipeline = await func(*args, **kwargs)
73+
def traced_pipeline(func, instance, args, kwargs):
74+
pipeline = func(*args, **kwargs)
7575
pin = Pin.get_from(instance)
7676
if pin:
7777
pin.onto(pipeline)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
Fixes incompatibility of wrapped aioredis pipelines in ``async with`` statements.

tests/contrib/aioredis/test_aioredis.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,35 @@ async def test_pipeline_traced(redis_client):
153153
assert response_list[3].decode() == "bar"
154154

155155

156+
@pytest.mark.skipif(aioredis_version < (2, 0), reason="only supported in aioredis >= 2.0")
157+
@pytest.mark.asyncio
158+
@pytest.mark.snapshot
159+
async def test_pipeline_traced_context_manager_transaction(redis_client):
160+
"""
161+
Regression test for: https://github.com/DataDog/dd-trace-py/issues/3106
162+
163+
https://aioredis.readthedocs.io/en/latest/migration/#pipelines-and-transactions-multiexec
164+
165+
Example::
166+
167+
async def main():
168+
redis = await aioredis.from_url("redis://localhost")
169+
async with redis.pipeline(transaction=True) as pipe:
170+
ok1, ok2 = await (pipe.set("key1", "value1").set("key2", "value2").execute())
171+
assert ok1
172+
assert ok2
173+
"""
174+
175+
async with redis_client.pipeline(transaction=True) as p:
176+
set_1, set_2, get_1, get_2 = await (p.set("blah", "boo").set("foo", "bar").get("blah").get("foo").execute())
177+
178+
# response from redis.set is OK if successfully pushed
179+
assert set_1 is True
180+
assert set_2 is True
181+
assert get_1.decode() == "boo"
182+
assert get_2.decode() == "bar"
183+
184+
156185
@pytest.mark.asyncio
157186
@pytest.mark.snapshot(variants={"": aioredis_version >= (2, 0), "13": aioredis_version < (2, 0)})
158187
async def test_two_traced_pipelines(redis_client):
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[[
2+
{
3+
"name": "redis.command",
4+
"service": "redis",
5+
"resource": "SET blah boo\nSET foo bar\nGET blah\nGET foo",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "redis",
10+
"meta": {
11+
"out.host": "127.0.0.1",
12+
"redis.raw_command": "SET blah boo\nSET foo bar\nGET blah\nGET foo",
13+
"runtime-id": "b734eb991b1f45f2b063db6d3c5623b9"
14+
},
15+
"metrics": {
16+
"_dd.agent_psr": 1.0,
17+
"_dd.measured": 1,
18+
"_dd.top_level": 1,
19+
"_dd.tracer_kr": 1.0,
20+
"_sampling_priority_v1": 1,
21+
"out.port": 6379,
22+
"out.redis_db": 0,
23+
"redis.pipeline_length": 4,
24+
"system.pid": 28312
25+
},
26+
"duration": 2132000,
27+
"start": 1641496497488785000
28+
}]]

0 commit comments

Comments
 (0)