@@ -102,6 +102,8 @@ def response_hook(span, instance, response):
102102from opentelemetry .instrumentation .redis .util import (
103103 _extract_conn_attributes ,
104104 _format_command_args ,
105+ _set_span_attribute_if_value ,
106+ _value_or_none ,
105107)
106108from opentelemetry .instrumentation .redis .version import __version__
107109from opentelemetry .instrumentation .utils import unwrap
@@ -126,6 +128,8 @@ def response_hook(span, instance, response):
126128_REDIS_CLUSTER_VERSION = (4 , 1 , 0 )
127129_REDIS_ASYNCIO_CLUSTER_VERSION = (4 , 3 , 2 )
128130
131+ _FIELD_TYPES = ["NUMERIC" , "TEXT" , "GEO" , "TAG" , "VECTOR" ]
132+
129133
130134def _set_connection_attributes (span , conn ):
131135 if not span .is_recording () or not hasattr (conn , "connection_pool" ):
@@ -138,7 +142,12 @@ def _set_connection_attributes(span, conn):
138142
139143def _build_span_name (instance , cmd_args ):
140144 if len (cmd_args ) > 0 and cmd_args [0 ]:
141- name = cmd_args [0 ]
145+ if cmd_args [0 ] == "FT.SEARCH" :
146+ name = "redis.search"
147+ elif cmd_args [0 ] == "FT.CREATE" :
148+ name = "redis.create_index"
149+ else :
150+ name = cmd_args [0 ]
142151 else :
143152 name = instance .connection_pool .connection_kwargs .get ("db" , 0 )
144153 return name
@@ -181,17 +190,21 @@ def _instrument(
181190 def _traced_execute_command (func , instance , args , kwargs ):
182191 query = _format_command_args (args )
183192 name = _build_span_name (instance , args )
184-
185193 with tracer .start_as_current_span (
186194 name , kind = trace .SpanKind .CLIENT
187195 ) as span :
188196 if span .is_recording ():
189197 span .set_attribute (SpanAttributes .DB_STATEMENT , query )
190198 _set_connection_attributes (span , instance )
191199 span .set_attribute ("db.redis.args_length" , len (args ))
200+ if span .name == "redis.create_index" :
201+ _add_create_attributes (span , args )
192202 if callable (request_hook ):
193203 request_hook (span , instance , args , kwargs )
194204 response = func (* args , ** kwargs )
205+ if span .is_recording ():
206+ if span .name == "redis.search" :
207+ _add_search_attributes (span , response , args )
195208 if callable (response_hook ):
196209 response_hook (span , instance , response )
197210 return response
@@ -202,9 +215,7 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
202215 resource ,
203216 span_name ,
204217 ) = _build_span_meta_data_for_pipeline (instance )
205-
206218 exception = None
207-
208219 with tracer .start_as_current_span (
209220 span_name , kind = trace .SpanKind .CLIENT
210221 ) as span :
@@ -230,6 +241,60 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
230241
231242 return response
232243
244+ def _add_create_attributes (span , args ):
245+ _set_span_attribute_if_value (
246+ span , "redis.create_index.index" , _value_or_none (args , 1 )
247+ )
248+ # According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
249+ try :
250+ schema_index = args .index ("SCHEMA" )
251+ except ValueError :
252+ return
253+ schema = args [schema_index :]
254+ field_attribute = ""
255+ # Schema in format:
256+ # [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
257+ field_attribute = "" .join (
258+ f"Field(name: { schema [index - 1 ]} , type: { schema [index ]} );"
259+ for index in range (1 , len (schema ))
260+ if schema [index ] in _FIELD_TYPES
261+ )
262+ _set_span_attribute_if_value (
263+ span ,
264+ "redis.create_index.fields" ,
265+ field_attribute ,
266+ )
267+
268+ def _add_search_attributes (span , response , args ):
269+ _set_span_attribute_if_value (
270+ span , "redis.search.index" , _value_or_none (args , 1 )
271+ )
272+ _set_span_attribute_if_value (
273+ span , "redis.search.query" , _value_or_none (args , 2 )
274+ )
275+ # Parse response from search
276+ # https://redis.io/docs/latest/commands/ft.search/
277+ # Response in format:
278+ # [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
279+ # Returned documents in array format:
280+ # [first_field_name, first_field_value, second_field_name, second_field_value ...]
281+ number_of_returned_documents = _value_or_none (response , 0 )
282+ _set_span_attribute_if_value (
283+ span , "redis.search.total" , number_of_returned_documents
284+ )
285+ if "NOCONTENT" in args or not number_of_returned_documents :
286+ return
287+ for document_number in range (number_of_returned_documents ):
288+ document_index = _value_or_none (response , 1 + 2 * document_number )
289+ if document_index :
290+ document = response [2 + 2 * document_number ]
291+ for attribute_name_index in range (0 , len (document ), 2 ):
292+ _set_span_attribute_if_value (
293+ span ,
294+ f"redis.search.xdoc_{ document_index } .{ document [attribute_name_index ]} " ,
295+ document [attribute_name_index + 1 ],
296+ )
297+
233298 pipeline_class = (
234299 "BasePipeline" if redis .VERSION < (3 , 0 , 0 ) else "Pipeline"
235300 )
0 commit comments