@@ -126,187 +126,61 @@ def response_hook(span, instance, response):
126126from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
127127from opentelemetry .instrumentation .redis .package import _instruments
128128from opentelemetry .instrumentation .redis .util import (
129- _extract_conn_attributes ,
129+ _add_create_attributes ,
130+ _add_search_attributes ,
131+ _build_span_meta_data_for_pipeline ,
132+ _build_span_name ,
130133 _format_command_args ,
131- _set_span_attribute_if_value ,
132- _value_or_none ,
134+ _set_connection_attributes ,
133135)
134136from opentelemetry .instrumentation .redis .version import __version__
135137from opentelemetry .instrumentation .utils import unwrap
136138from opentelemetry .semconv .trace import SpanAttributes
137139from opentelemetry .trace import (
138- Span ,
139140 StatusCode ,
140141 Tracer ,
141142 TracerProvider ,
142143 get_tracer ,
143144)
144145
145146if TYPE_CHECKING :
146- from typing import Awaitable , TypeVar
147+ from typing import Awaitable
147148
148149 import redis .asyncio .client
149150 import redis .asyncio .cluster
150151 import redis .client
151152 import redis .cluster
152153 import redis .connection
153154
154- RequestHook = Callable [
155- [Span , redis .connection .Connection , list [Any ], dict [str , Any ]], None
156- ]
157- ResponseHook = Callable [[Span , redis .connection .Connection , Any ], None ]
158-
159- AsyncPipelineInstance = TypeVar (
160- "AsyncPipelineInstance" ,
161- redis .asyncio .client .Pipeline ,
162- redis .asyncio .cluster .ClusterPipeline ,
163- )
164- AsyncRedisInstance = TypeVar (
165- "AsyncRedisInstance" , redis .asyncio .Redis , redis .asyncio .RedisCluster
166- )
167- PipelineInstance = TypeVar (
168- "PipelineInstance" ,
169- redis .client .Pipeline ,
170- redis .cluster .ClusterPipeline ,
171- )
172- RedisInstance = TypeVar (
173- "RedisInstance" , redis .client .Redis , redis .cluster .RedisCluster
155+ from opentelemetry .instrumentation .redis .types import (
156+ AsyncPipelineInstance ,
157+ AsyncRedisInstance ,
158+ PipelineInstance ,
159+ R ,
160+ RedisInstance ,
161+ RequestHook ,
162+ ResponseHook ,
174163 )
175- R = TypeVar ("R" )
176164
177165
178- _DEFAULT_SERVICE = "redis"
179166_logger = logging .getLogger (__name__ )
180167
181168_REDIS_ASYNCIO_VERSION = (4 , 2 , 0 )
182169_REDIS_CLUSTER_VERSION = (4 , 1 , 0 )
183170_REDIS_ASYNCIO_CLUSTER_VERSION = (4 , 3 , 2 )
184171
185- _FIELD_TYPES = ["NUMERIC" , "TEXT" , "GEO" , "TAG" , "VECTOR" ]
186172
187173_CLIENT_ASYNCIO_SUPPORT = redis .VERSION >= _REDIS_ASYNCIO_VERSION
188174_CLIENT_ASYNCIO_CLUSTER_SUPPORT = (
189175 redis .VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION
190176)
191177_CLIENT_CLUSTER_SUPPORT = redis .VERSION >= _REDIS_CLUSTER_VERSION
192- _CLIENT_BEFORE_3_0_0 = redis .VERSION < (3 , 0 , 0 )
178+ _CLIENT_BEFORE_V3 = redis .VERSION < (3 , 0 , 0 )
193179
194180if _CLIENT_ASYNCIO_SUPPORT :
195181 import redis .asyncio
196182
197- INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry"
198-
199-
200- def _set_connection_attributes (
201- span : Span , conn : RedisInstance | AsyncRedisInstance
202- ) -> None :
203- if not span .is_recording () or not hasattr (conn , "connection_pool" ):
204- return
205- for key , value in _extract_conn_attributes (
206- conn .connection_pool .connection_kwargs
207- ).items ():
208- span .set_attribute (key , value )
209-
210-
211- def _build_span_name (
212- instance : RedisInstance | AsyncRedisInstance , cmd_args : tuple [Any , ...]
213- ) -> str :
214- if len (cmd_args ) > 0 and cmd_args [0 ]:
215- if cmd_args [0 ] == "FT.SEARCH" :
216- name = "redis.search"
217- elif cmd_args [0 ] == "FT.CREATE" :
218- name = "redis.create_index"
219- else :
220- name = cmd_args [0 ]
221- else :
222- name = instance .connection_pool .connection_kwargs .get ("db" , 0 )
223- return name
224-
225-
226- def _add_create_attributes (span : Span , args : tuple [Any , ...]):
227- _set_span_attribute_if_value (
228- span , "redis.create_index.index" , _value_or_none (args , 1 )
229- )
230- # According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
231- try :
232- schema_index = args .index ("SCHEMA" )
233- except ValueError :
234- return
235- schema = args [schema_index :]
236- field_attribute = ""
237- # Schema in format:
238- # [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
239- field_attribute = "" .join (
240- f"Field(name: { schema [index - 1 ]} , type: { schema [index ]} );"
241- for index in range (1 , len (schema ))
242- if schema [index ] in _FIELD_TYPES
243- )
244- _set_span_attribute_if_value (
245- span ,
246- "redis.create_index.fields" ,
247- field_attribute ,
248- )
249-
250-
251- def _add_search_attributes (span : Span , response , args ):
252- _set_span_attribute_if_value (
253- span , "redis.search.index" , _value_or_none (args , 1 )
254- )
255- _set_span_attribute_if_value (
256- span , "redis.search.query" , _value_or_none (args , 2 )
257- )
258- # Parse response from search
259- # https://redis.io/docs/latest/commands/ft.search/
260- # Response in format:
261- # [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
262- # Returned documents in array format:
263- # [first_field_name, first_field_value, second_field_name, second_field_value ...]
264- number_of_returned_documents = _value_or_none (response , 0 )
265- _set_span_attribute_if_value (
266- span , "redis.search.total" , number_of_returned_documents
267- )
268- if "NOCONTENT" in args or not number_of_returned_documents :
269- return
270- for document_number in range (number_of_returned_documents ):
271- document_index = _value_or_none (response , 1 + 2 * document_number )
272- if document_index :
273- document = response [2 + 2 * document_number ]
274- for attribute_name_index in range (0 , len (document ), 2 ):
275- _set_span_attribute_if_value (
276- span ,
277- f"redis.search.xdoc_{ document_index } .{ document [attribute_name_index ]} " ,
278- document [attribute_name_index + 1 ],
279- )
280-
281-
282- def _build_span_meta_data_for_pipeline (
283- instance : PipelineInstance | AsyncPipelineInstance ,
284- ) -> tuple [list [Any ], str , str ]:
285- try :
286- command_stack = (
287- instance .command_stack
288- if hasattr (instance , "command_stack" )
289- else instance ._command_stack
290- )
291-
292- cmds = [
293- _format_command_args (c .args if hasattr (c , "args" ) else c [0 ])
294- for c in command_stack
295- ]
296- resource = "\n " .join (cmds )
297-
298- span_name = " " .join (
299- [
300- (c .args [0 ] if hasattr (c , "args" ) else c [0 ][0 ])
301- for c in command_stack
302- ]
303- )
304- except (AttributeError , IndexError ):
305- command_stack = []
306- resource = ""
307- span_name = ""
308-
309- return command_stack , resource , span_name
183+ _INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry"
310184
311185
312186def _traced_execute_factory (
@@ -479,8 +353,8 @@ def _instrument(
479353 _traced_execute_pipeline = _traced_execute_pipeline_factory (
480354 tracer , request_hook , response_hook
481355 )
482- pipeline_class = "BasePipeline" if _CLIENT_BEFORE_3_0_0 else "Pipeline"
483- redis_class = "StrictRedis" if _CLIENT_BEFORE_3_0_0 else "Redis"
356+ pipeline_class = "BasePipeline" if _CLIENT_BEFORE_V3 else "Pipeline"
357+ redis_class = "StrictRedis" if _CLIENT_BEFORE_V3 else "Redis"
484358
485359 wrap_function_wrapper (
486360 "redis" , f"{ redis_class } .execute_command" , _traced_execute_command
@@ -677,7 +551,7 @@ def _instrument(self, **kwargs: Any):
677551 )
678552
679553 def _uninstrument (self , ** kwargs : Any ):
680- if _CLIENT_BEFORE_3_0_0 :
554+ if _CLIENT_BEFORE_V3 :
681555 unwrap (redis .StrictRedis , "execute_command" )
682556 unwrap (redis .StrictRedis , "pipeline" )
683557 unwrap (redis .Redis , "pipeline" )
@@ -739,16 +613,16 @@ def instrument_client(
739613
740614 The ``args`` represents the response.
741615 """
742- if not hasattr (client , INSTRUMENTATION_ATTR ):
743- setattr (client , INSTRUMENTATION_ATTR , False )
744- if not getattr (client , INSTRUMENTATION_ATTR ):
616+ if not hasattr (client , _INSTRUMENTATION_ATTR ):
617+ setattr (client , _INSTRUMENTATION_ATTR , False )
618+ if not getattr (client , _INSTRUMENTATION_ATTR ):
745619 _instrument_client (
746620 client ,
747621 RedisInstrumentor ._get_tracer (tracer_provider = tracer_provider ),
748622 request_hook = request_hook ,
749623 response_hook = response_hook ,
750624 )
751- setattr (client , INSTRUMENTATION_ATTR , True )
625+ setattr (client , _INSTRUMENTATION_ATTR , True )
752626 else :
753627 _logger .warning (
754628 "Attempting to instrument Redis connection while already instrumented"
@@ -767,7 +641,7 @@ def uninstrument_client(
767641 Args:
768642 client: The redis client
769643 """
770- if getattr (client , INSTRUMENTATION_ATTR ):
644+ if getattr (client , _INSTRUMENTATION_ATTR ):
771645 # for all clients we need to unwrap execute_command and pipeline functions
772646 unwrap (client , "execute_command" )
773647 # the method was creating a pipeline and wrapping the functions of the
0 commit comments