Skip to content

Commit 7a7f6d9

Browse files
authored
feat(redis): Add instrumentation for redis pipeline (#1543)
Add automatic instrumentation of redis pipelining for both redis and rediscluster. https://redis.io/docs/manual/pipelining/ Note: This does not add instrumentation for StrictRedisCluster.
1 parent 976fff2 commit 7a7f6d9

File tree

3 files changed

+154
-13
lines changed

3 files changed

+154
-13
lines changed

sentry_sdk/integrations/redis.py

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,64 @@
77
from sentry_sdk._types import MYPY
88

99
if MYPY:
10-
from typing import Any
10+
from typing import Any, Sequence
1111

1212
_SINGLE_KEY_COMMANDS = frozenset(
1313
["decr", "decrby", "get", "incr", "incrby", "pttl", "set", "setex", "setnx", "ttl"]
1414
)
1515
_MULTI_KEY_COMMANDS = frozenset(["del", "touch", "unlink"])
1616

17+
#: Trim argument lists to this many values
18+
_MAX_NUM_ARGS = 10
19+
20+
21+
def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
22+
# type: (Any, bool, Any) -> None
23+
old_execute = pipeline_cls.execute
24+
25+
def sentry_patched_execute(self, *args, **kwargs):
26+
# type: (Any, *Any, **Any) -> Any
27+
hub = Hub.current
28+
29+
if hub.get_integration(RedisIntegration) is None:
30+
return old_execute(self, *args, **kwargs)
31+
32+
with hub.start_span(op="redis", description="redis.pipeline.execute") as span:
33+
with capture_internal_exceptions():
34+
span.set_tag("redis.is_cluster", is_cluster)
35+
transaction = self.transaction if not is_cluster else False
36+
span.set_tag("redis.transaction", transaction)
37+
38+
commands = []
39+
for i, arg in enumerate(self.command_stack):
40+
if i > _MAX_NUM_ARGS:
41+
break
42+
command_args = []
43+
for j, command_arg in enumerate(get_command_args_fn(arg)):
44+
if j > 0:
45+
command_arg = repr(command_arg)
46+
command_args.append(command_arg)
47+
commands.append(" ".join(command_args))
48+
49+
span.set_data(
50+
"redis.commands",
51+
{"count": len(self.command_stack), "first_ten": commands},
52+
)
53+
54+
return old_execute(self, *args, **kwargs)
55+
56+
pipeline_cls.execute = sentry_patched_execute
57+
58+
59+
def _get_redis_command_args(command):
60+
# type: (Any) -> Sequence[Any]
61+
return command[0]
62+
63+
64+
def _parse_rediscluster_command(command):
65+
# type: (Any) -> Sequence[Any]
66+
return command.args
67+
1768

1869
def _patch_rediscluster():
1970
# type: () -> None
@@ -22,7 +73,7 @@ def _patch_rediscluster():
2273
except ImportError:
2374
return
2475

25-
patch_redis_client(rediscluster.RedisCluster)
76+
patch_redis_client(rediscluster.RedisCluster, is_cluster=True)
2677

2778
# up to v1.3.6, __version__ attribute is a tuple
2879
# from v2.0.0, __version__ is a string and VERSION a tuple
@@ -31,7 +82,12 @@ def _patch_rediscluster():
3182
# StrictRedisCluster was introduced in v0.2.0 and removed in v2.0.0
3283
# https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst
3384
if (0, 2, 0) < version < (2, 0, 0):
34-
patch_redis_client(rediscluster.StrictRedisCluster)
85+
pipeline_cls = rediscluster.StrictClusterPipeline
86+
patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True)
87+
else:
88+
pipeline_cls = rediscluster.ClusterPipeline
89+
90+
patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command)
3591

3692

3793
class RedisIntegration(Integration):
@@ -45,25 +101,32 @@ def setup_once():
45101
except ImportError:
46102
raise DidNotEnable("Redis client not installed")
47103

48-
patch_redis_client(redis.StrictRedis)
104+
patch_redis_client(redis.StrictRedis, is_cluster=False)
105+
patch_redis_pipeline(redis.client.Pipeline, False, _get_redis_command_args)
106+
try:
107+
strict_pipeline = redis.client.StrictPipeline # type: ignore
108+
except AttributeError:
109+
pass
110+
else:
111+
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
49112

50113
try:
51114
import rb.clients # type: ignore
52115
except ImportError:
53116
pass
54117
else:
55-
patch_redis_client(rb.clients.FanoutClient)
56-
patch_redis_client(rb.clients.MappingClient)
57-
patch_redis_client(rb.clients.RoutingClient)
118+
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
119+
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
120+
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
58121

59122
try:
60123
_patch_rediscluster()
61124
except Exception:
62125
logger.exception("Error occurred while patching `rediscluster` library")
63126

64127

65-
def patch_redis_client(cls):
66-
# type: (Any) -> None
128+
def patch_redis_client(cls, is_cluster):
129+
# type: (Any, bool) -> None
67130
"""
68131
This function can be used to instrument custom redis client classes or
69132
subclasses.
@@ -83,14 +146,15 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
83146
with capture_internal_exceptions():
84147
description_parts = [name]
85148
for i, arg in enumerate(args):
86-
if i > 10:
149+
if i > _MAX_NUM_ARGS:
87150
break
88151

89152
description_parts.append(repr(arg))
90153

91154
description = " ".join(description_parts)
92155

93156
with hub.start_span(op="redis", description=description) as span:
157+
span.set_tag("redis.is_cluster", is_cluster)
94158
if name:
95159
span.set_tag("redis.command", name)
96160

tests/integrations/redis/test_redis.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
from sentry_sdk import capture_message
1+
from sentry_sdk import capture_message, start_transaction
22
from sentry_sdk.integrations.redis import RedisIntegration
33

44
from fakeredis import FakeStrictRedis
5+
import pytest
56

67

78
def test_basic(sentry_init, capture_events):
@@ -19,7 +20,41 @@ def test_basic(sentry_init, capture_events):
1920
assert crumb == {
2021
"category": "redis",
2122
"message": "GET 'foobar'",
22-
"data": {"redis.key": "foobar", "redis.command": "GET"},
23+
"data": {
24+
"redis.key": "foobar",
25+
"redis.command": "GET",
26+
"redis.is_cluster": False,
27+
},
2328
"timestamp": crumb["timestamp"],
2429
"type": "redis",
2530
}
31+
32+
33+
@pytest.mark.parametrize("is_transaction", [False, True])
34+
def test_redis_pipeline(sentry_init, capture_events, is_transaction):
35+
sentry_init(integrations=[RedisIntegration()], traces_sample_rate=1.0)
36+
events = capture_events()
37+
38+
connection = FakeStrictRedis()
39+
with start_transaction():
40+
41+
pipeline = connection.pipeline(transaction=is_transaction)
42+
pipeline.get("foo")
43+
pipeline.set("bar", 1)
44+
pipeline.set("baz", 2)
45+
pipeline.execute()
46+
47+
(event,) = events
48+
(span,) = event["spans"]
49+
assert span["op"] == "redis"
50+
assert span["description"] == "redis.pipeline.execute"
51+
assert span["data"] == {
52+
"redis.commands": {
53+
"count": 3,
54+
"first_ten": ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"],
55+
}
56+
}
57+
assert span["tags"] == {
58+
"redis.transaction": is_transaction,
59+
"redis.is_cluster": False,
60+
}

tests/integrations/rediscluster/test_rediscluster.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pytest
22
from sentry_sdk import capture_message
3+
from sentry_sdk.api import start_transaction
34
from sentry_sdk.integrations.redis import RedisIntegration
45

56
import rediscluster
@@ -12,6 +13,15 @@
1213

1314
@pytest.fixture(scope="module", autouse=True)
1415
def monkeypatch_rediscluster_classes():
16+
17+
try:
18+
pipeline_cls = rediscluster.ClusterPipeline
19+
except AttributeError:
20+
pipeline_cls = rediscluster.StrictClusterPipeline
21+
rediscluster.RedisCluster.pipeline = lambda *_, **__: pipeline_cls(
22+
connection_pool=True
23+
)
24+
pipeline_cls.execute = lambda *_, **__: None
1525
for cls in rediscluster_classes:
1626
cls.execute_command = lambda *_, **__: None
1727

@@ -31,7 +41,39 @@ def test_rediscluster_basic(rediscluster_cls, sentry_init, capture_events):
3141
assert crumb == {
3242
"category": "redis",
3343
"message": "GET 'foobar'",
34-
"data": {"redis.key": "foobar", "redis.command": "GET"},
44+
"data": {
45+
"redis.key": "foobar",
46+
"redis.command": "GET",
47+
"redis.is_cluster": True,
48+
},
3549
"timestamp": crumb["timestamp"],
3650
"type": "redis",
3751
}
52+
53+
54+
def test_rediscluster_pipeline(sentry_init, capture_events):
55+
sentry_init(integrations=[RedisIntegration()], traces_sample_rate=1.0)
56+
events = capture_events()
57+
58+
rc = rediscluster.RedisCluster(connection_pool=True)
59+
with start_transaction():
60+
pipeline = rc.pipeline()
61+
pipeline.get("foo")
62+
pipeline.set("bar", 1)
63+
pipeline.set("baz", 2)
64+
pipeline.execute()
65+
66+
(event,) = events
67+
(span,) = event["spans"]
68+
assert span["op"] == "redis"
69+
assert span["description"] == "redis.pipeline.execute"
70+
assert span["data"] == {
71+
"redis.commands": {
72+
"count": 3,
73+
"first_ten": ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"],
74+
}
75+
}
76+
assert span["tags"] == {
77+
"redis.transaction": False, # For Cluster, this is always False
78+
"redis.is_cluster": True,
79+
}

0 commit comments

Comments
 (0)