33from sentry_sdk .integrations .redis .consts import SPAN_ORIGIN
44from sentry_sdk .integrations .redis .modules .caches import (
55 _compile_cache_span_properties ,
6- _set_cache_data ,
6+ _get_cache_data ,
77)
88from sentry_sdk .integrations .redis .modules .queries import _compile_db_span_properties
99from sentry_sdk .integrations .redis .utils import (
10- _set_client_data ,
11- _set_pipeline_data ,
10+ _create_breadcrumb ,
11+ _get_client_data ,
12+ _get_pipeline_data ,
13+ _update_span ,
1214)
1315from sentry_sdk .tracing import Span
1416from sentry_sdk .utils import capture_internal_exceptions
2325
2426
2527def patch_redis_async_pipeline (
26- pipeline_cls , is_cluster , get_command_args_fn , set_db_data_fn
28+ pipeline_cls , is_cluster , get_command_args_fn , get_db_data_fn
2729):
28- # type: (Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]], bool, Any, Callable[[Span, Any], None ]) -> None
30+ # type: (Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]], bool, Any, Callable[[Any], dict[str, Any] ]) -> None
2931 old_execute = pipeline_cls .execute
3032
3133 from sentry_sdk .integrations .redis import RedisIntegration
@@ -41,22 +43,25 @@ async def _sentry_execute(self, *args, **kwargs):
4143 origin = SPAN_ORIGIN ,
4244 ) as span :
4345 with capture_internal_exceptions ():
44- set_db_data_fn (span , self )
45- _set_pipeline_data (
46- span ,
47- is_cluster ,
48- get_command_args_fn ,
49- False if is_cluster else self .is_transaction ,
50- self ._command_stack if is_cluster else self .command_stack ,
46+ span_data = get_db_data_fn (self )
47+ pipeline_data = _get_pipeline_data (
48+ is_cluster = is_cluster ,
49+ get_command_args_fn = get_command_args_fn ,
50+ is_transaction = False if is_cluster else self .is_transaction ,
51+ command_stack = (
52+ self ._command_stack if is_cluster else self .command_stack
53+ ),
5154 )
55+ _update_span (span , span_data , pipeline_data )
56+ _create_breadcrumb ("redis.pipeline.execute" , span_data , pipeline_data )
5257
5358 return await old_execute (self , * args , ** kwargs )
5459
5560 pipeline_cls .execute = _sentry_execute # type: ignore
5661
5762
58- def patch_redis_async_client (cls , is_cluster , set_db_data_fn ):
59- # type: (Union[type[StrictRedis[Any]], type[RedisCluster[Any]]], bool, Callable[[Span, Any], None ]) -> None
63+ def patch_redis_async_client (cls , is_cluster , get_db_data_fn ):
64+ # type: (Union[type[StrictRedis[Any]], type[RedisCluster[Any]]], bool, Callable[[Any], dict[str, Any] ]) -> None
6065 old_execute_command = cls .execute_command
6166
6267 from sentry_sdk .integrations .redis import RedisIntegration
@@ -92,15 +97,20 @@ async def _sentry_execute_command(self, name, *args, **kwargs):
9297 )
9398 db_span .__enter__ ()
9499
95- set_db_data_fn (db_span , self )
96- _set_client_data (db_span , is_cluster , name , * args )
100+ db_span_data = get_db_data_fn (self )
101+ db_client_span_data = _get_client_data (is_cluster , name , * args )
102+ _update_span (db_span , db_span_data , db_client_span_data )
103+ _create_breadcrumb (
104+ db_properties ["description" ], db_span_data , db_client_span_data
105+ )
97106
98107 value = await old_execute_command (self , name , * args , ** kwargs )
99108
100109 db_span .__exit__ (None , None , None )
101110
102111 if cache_span :
103- _set_cache_data (cache_span , self , cache_properties , value )
112+ cache_span_data = _get_cache_data (self , cache_properties , value )
113+ _update_span (cache_span , cache_span_data )
104114 cache_span .__exit__ (None , None , None )
105115
106116 return value
0 commit comments