@@ -184,6 +184,62 @@ def _build_span_name(
184184 return name
185185
186186
187+ def _add_create_attributes (span : Span , args : tuple [Any , ...]):
188+ _set_span_attribute_if_value (
189+ span , "redis.create_index.index" , _value_or_none (args , 1 )
190+ )
191+ # According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
192+ try :
193+ schema_index = args .index ("SCHEMA" )
194+ except ValueError :
195+ return
196+ schema = args [schema_index :]
197+ field_attribute = ""
198+ # Schema in format:
199+ # [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
200+ field_attribute = "" .join (
201+ f"Field(name: { schema [index - 1 ]} , type: { schema [index ]} );"
202+ for index in range (1 , len (schema ))
203+ if schema [index ] in _FIELD_TYPES
204+ )
205+ _set_span_attribute_if_value (
206+ span ,
207+ "redis.create_index.fields" ,
208+ field_attribute ,
209+ )
210+
211+
212+ def _add_search_attributes (span : Span , response , args ):
213+ _set_span_attribute_if_value (
214+ span , "redis.search.index" , _value_or_none (args , 1 )
215+ )
216+ _set_span_attribute_if_value (
217+ span , "redis.search.query" , _value_or_none (args , 2 )
218+ )
219+ # Parse response from search
220+ # https://redis.io/docs/latest/commands/ft.search/
221+ # Response in format:
222+ # [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
223+ # Returned documents in array format:
224+ # [first_field_name, first_field_value, second_field_name, second_field_value ...]
225+ number_of_returned_documents = _value_or_none (response , 0 )
226+ _set_span_attribute_if_value (
227+ span , "redis.search.total" , number_of_returned_documents
228+ )
229+ if "NOCONTENT" in args or not number_of_returned_documents :
230+ return
231+ for document_number in range (number_of_returned_documents ):
232+ document_index = _value_or_none (response , 1 + 2 * document_number )
233+ if document_index :
234+ document = response [2 + 2 * document_number ]
235+ for attribute_name_index in range (0 , len (document ), 2 ):
236+ _set_span_attribute_if_value (
237+ span ,
238+ f"redis.search.xdoc_{ document_index } .{ document [attribute_name_index ]} " ,
239+ document [attribute_name_index + 1 ],
240+ )
241+
242+
187243def _build_span_meta_data_for_pipeline (
188244 instance : PipelineInstance | AsyncPipelineInstance ,
189245) -> tuple [list [Any ], str , str ]:
@@ -214,11 +270,10 @@ def _build_span_meta_data_for_pipeline(
214270 return command_stack , resource , span_name
215271
216272
217- # pylint: disable=R0915
218- def _instrument (
219- tracer : Tracer ,
220- request_hook : _RequestHookT | None = None ,
221- response_hook : _ResponseHookT | None = None ,
273+ def _traced_execute_factory (
274+ tracer ,
275+ request_hook : _RequestHookT = None ,
276+ response_hook : _ResponseHookT = None ,
222277):
223278 def _traced_execute_command (
224279 func : Callable [..., R ],
@@ -247,6 +302,14 @@ def _traced_execute_command(
247302 response_hook (span , instance , response )
248303 return response
249304
305+ return _traced_execute_command
306+
307+
308+ def _traced_execute_pipeline_factory (
309+ tracer ,
310+ request_hook : _RequestHookT = None ,
311+ response_hook : _ResponseHookT = None ,
312+ ):
250313 def _traced_execute_pipeline (
251314 func : Callable [..., R ],
252315 instance : PipelineInstance ,
@@ -284,90 +347,14 @@ def _traced_execute_pipeline(
284347
285348 return response
286349
287- def _add_create_attributes (span : Span , args : tuple [Any , ...]):
288- _set_span_attribute_if_value (
289- span , "redis.create_index.index" , _value_or_none (args , 1 )
290- )
291- # According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
292- try :
293- schema_index = args .index ("SCHEMA" )
294- except ValueError :
295- return
296- schema = args [schema_index :]
297- field_attribute = ""
298- # Schema in format:
299- # [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
300- field_attribute = "" .join (
301- f"Field(name: { schema [index - 1 ]} , type: { schema [index ]} );"
302- for index in range (1 , len (schema ))
303- if schema [index ] in _FIELD_TYPES
304- )
305- _set_span_attribute_if_value (
306- span ,
307- "redis.create_index.fields" ,
308- field_attribute ,
309- )
310-
311- def _add_search_attributes (span : Span , response , args ):
312- _set_span_attribute_if_value (
313- span , "redis.search.index" , _value_or_none (args , 1 )
314- )
315- _set_span_attribute_if_value (
316- span , "redis.search.query" , _value_or_none (args , 2 )
317- )
318- # Parse response from search
319- # https://redis.io/docs/latest/commands/ft.search/
320- # Response in format:
321- # [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
322- # Returned documents in array format:
323- # [first_field_name, first_field_value, second_field_name, second_field_value ...]
324- number_of_returned_documents = _value_or_none (response , 0 )
325- _set_span_attribute_if_value (
326- span , "redis.search.total" , number_of_returned_documents
327- )
328- if "NOCONTENT" in args or not number_of_returned_documents :
329- return
330- for document_number in range (number_of_returned_documents ):
331- document_index = _value_or_none (response , 1 + 2 * document_number )
332- if document_index :
333- document = response [2 + 2 * document_number ]
334- for attribute_name_index in range (0 , len (document ), 2 ):
335- _set_span_attribute_if_value (
336- span ,
337- f"redis.search.xdoc_{ document_index } .{ document [attribute_name_index ]} " ,
338- document [attribute_name_index + 1 ],
339- )
350+ return _traced_execute_pipeline
340351
341- pipeline_class = (
342- "BasePipeline" if redis .VERSION < (3 , 0 , 0 ) else "Pipeline"
343- )
344- redis_class = "StrictRedis" if redis .VERSION < (3 , 0 , 0 ) else "Redis"
345-
346- wrap_function_wrapper (
347- "redis" , f"{ redis_class } .execute_command" , _traced_execute_command
348- )
349- wrap_function_wrapper (
350- "redis.client" ,
351- f"{ pipeline_class } .execute" ,
352- _traced_execute_pipeline ,
353- )
354- wrap_function_wrapper (
355- "redis.client" ,
356- f"{ pipeline_class } .immediate_execute_command" ,
357- _traced_execute_command ,
358- )
359- if redis .VERSION >= _REDIS_CLUSTER_VERSION :
360- wrap_function_wrapper (
361- "redis.cluster" ,
362- "RedisCluster.execute_command" ,
363- _traced_execute_command ,
364- )
365- wrap_function_wrapper (
366- "redis.cluster" ,
367- "ClusterPipeline.execute" ,
368- _traced_execute_pipeline ,
369- )
370352
353+ def _async_traced_execute_factory (
354+ tracer ,
355+ request_hook : _RequestHookT = None ,
356+ response_hook : _ResponseHookT = None ,
357+ ):
371358 async def _async_traced_execute_command (
372359 func : Callable [..., Awaitable [R ]],
373360 instance : AsyncRedisInstance ,
@@ -391,6 +378,14 @@ async def _async_traced_execute_command(
391378 response_hook (span , instance , response )
392379 return response
393380
381+ return _async_traced_execute_command
382+
383+
384+ def _async_traced_execute_pipeline_factory (
385+ tracer ,
386+ request_hook : _RequestHookT = None ,
387+ response_hook : _ResponseHookT = None ,
388+ ):
394389 async def _async_traced_execute_pipeline (
395390 func : Callable [..., Awaitable [R ]],
396391 instance : AsyncPipelineInstance ,
@@ -430,6 +425,57 @@ async def _async_traced_execute_pipeline(
430425
431426 return response
432427
428+ return _async_traced_execute_pipeline
429+
430+
431+ # pylint: disable=R0915
432+ def _instrument (
433+ tracer : Tracer ,
434+ request_hook : _RequestHookT | None = None ,
435+ response_hook : _ResponseHookT | None = None ,
436+ ):
437+ _traced_execute_command = _traced_execute_factory (
438+ tracer , request_hook , response_hook
439+ )
440+ _traced_execute_pipeline = _traced_execute_pipeline_factory (
441+ tracer , request_hook , response_hook
442+ )
443+ pipeline_class = (
444+ "BasePipeline" if redis .VERSION < (3 , 0 , 0 ) else "Pipeline"
445+ )
446+ redis_class = "StrictRedis" if redis .VERSION < (3 , 0 , 0 ) else "Redis"
447+
448+ wrap_function_wrapper (
449+ "redis" , f"{ redis_class } .execute_command" , _traced_execute_command
450+ )
451+ wrap_function_wrapper (
452+ "redis.client" ,
453+ f"{ pipeline_class } .execute" ,
454+ _traced_execute_pipeline ,
455+ )
456+ wrap_function_wrapper (
457+ "redis.client" ,
458+ f"{ pipeline_class } .immediate_execute_command" ,
459+ _traced_execute_command ,
460+ )
461+ if redis .VERSION >= _REDIS_CLUSTER_VERSION :
462+ wrap_function_wrapper (
463+ "redis.cluster" ,
464+ "RedisCluster.execute_command" ,
465+ _traced_execute_command ,
466+ )
467+ wrap_function_wrapper (
468+ "redis.cluster" ,
469+ "ClusterPipeline.execute" ,
470+ _traced_execute_pipeline ,
471+ )
472+
473+ _async_traced_execute_command = _async_traced_execute_factory (
474+ tracer , request_hook , response_hook
475+ )
476+ _async_traced_execute_pipeline = _async_traced_execute_pipeline_factory (
477+ tracer , request_hook , response_hook
478+ )
433479 if redis .VERSION >= _REDIS_ASYNCIO_VERSION :
434480 wrap_function_wrapper (
435481 "redis.asyncio" ,
@@ -459,6 +505,94 @@ async def _async_traced_execute_pipeline(
459505 )
460506
461507
508+ def _instrument_client (
509+ client ,
510+ tracer ,
511+ request_hook : _RequestHookT = None ,
512+ response_hook : _ResponseHookT = None ,
513+ ):
514+ # first, handle async clients
515+ _async_traced_execute_command = _async_traced_execute_factory (
516+ tracer , request_hook , response_hook
517+ )
518+ _async_traced_execute_pipeline = _async_traced_execute_pipeline_factory (
519+ tracer , request_hook , response_hook
520+ )
521+
522+ def _async_pipeline_wrapper (func , instance , args , kwargs ):
523+ result = func (* args , ** kwargs )
524+ wrap_function_wrapper (
525+ result , "execute" , _async_traced_execute_pipeline
526+ )
527+ wrap_function_wrapper (
528+ result , "immediate_execute_command" , _async_traced_execute_command
529+ )
530+ return result
531+
532+ if redis .VERSION >= _REDIS_ASYNCIO_VERSION :
533+ client_type = (
534+ redis .asyncio .StrictRedis
535+ if redis .VERSION < (3 , 0 , 0 )
536+ else redis .asyncio .Redis
537+ )
538+
539+ if isinstance (client , client_type ):
540+ wrap_function_wrapper (
541+ client , "execute_command" , _async_traced_execute_command
542+ )
543+ wrap_function_wrapper (client , "pipeline" , _async_pipeline_wrapper )
544+ return
545+
546+ def _async_cluster_pipeline_wrapper (func , instance , args , kwargs ):
547+ result = func (* args , ** kwargs )
548+ wrap_function_wrapper (
549+ result , "execute" , _async_traced_execute_pipeline
550+ )
551+ return result
552+
553+ # handle
554+ if redis .VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION and isinstance (
555+ client , redis .asyncio .RedisCluster
556+ ):
557+ wrap_function_wrapper (
558+ client , "execute_command" , _async_traced_execute_command
559+ )
560+ wrap_function_wrapper (
561+ client , "pipeline" , _async_cluster_pipeline_wrapper
562+ )
563+ return
564+ # for redis.client.Redis, redis.Cluster and v3.0.0 redis.client.StrictRedis
565+ # the wrappers are the same
566+ # client_type = (
567+ # redis.client.StrictRedis if redis.VERSION < (3, 0, 0) else redis.client.Redis
568+ # )
569+ _traced_execute_command = _traced_execute_factory (
570+ tracer , request_hook , response_hook
571+ )
572+ _traced_execute_pipeline = _traced_execute_pipeline_factory (
573+ tracer , request_hook , response_hook
574+ )
575+
576+ def _pipeline_wrapper (func , instance , args , kwargs ):
577+ result = func (* args , ** kwargs )
578+ wrap_function_wrapper (result , "execute" , _traced_execute_pipeline )
579+ wrap_function_wrapper (
580+ result , "immediate_execute_command" , _traced_execute_command
581+ )
582+ return result
583+
584+ wrap_function_wrapper (
585+ client ,
586+ "execute_command" ,
587+ _traced_execute_command ,
588+ )
589+ wrap_function_wrapper (
590+ client ,
591+ "pipeline" ,
592+ _pipeline_wrapper ,
593+ )
594+
595+
462596class RedisInstrumentor (BaseInstrumentor ):
463597 """An instrumentor for Redis.
464598
@@ -483,11 +617,20 @@ def _instrument(self, **kwargs: Any):
483617 tracer_provider = tracer_provider ,
484618 schema_url = "https://opentelemetry.io/schemas/1.11.0" ,
485619 )
486- _instrument (
487- tracer ,
488- request_hook = kwargs .get ("request_hook" ),
489- response_hook = kwargs .get ("response_hook" ),
490- )
620+ redis_client = kwargs .get ("client" )
621+ if redis_client :
622+ _instrument_client (
623+ redis_client ,
624+ tracer ,
625+ request_hook = kwargs .get ("request_hook" ),
626+ response_hook = kwargs .get ("response_hook" ),
627+ )
628+ else :
629+ _instrument (
630+ tracer ,
631+ request_hook = kwargs .get ("request_hook" ),
632+ response_hook = kwargs .get ("response_hook" ),
633+ )
491634
492635 def _uninstrument (self , ** kwargs : Any ):
493636 if redis .VERSION < (3 , 0 , 0 ):
0 commit comments