Skip to content

Commit 4a206aa

Browse files
author
OlegZv
committed
Rough draft of the contribution
1 parent e3ba54b commit 4a206aa

File tree

2 files changed

+343
-103
lines changed

2 files changed

+343
-103
lines changed

instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py

Lines changed: 232 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,62 @@ def _build_span_name(instance, cmd_args):
154154
return name
155155

156156

157+
def _add_create_attributes(span, args):
158+
_set_span_attribute_if_value(
159+
span, "redis.create_index.index", _value_or_none(args, 1)
160+
)
161+
# According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
162+
try:
163+
schema_index = args.index("SCHEMA")
164+
except ValueError:
165+
return
166+
schema = args[schema_index:]
167+
field_attribute = ""
168+
# Schema in format:
169+
# [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
170+
field_attribute = "".join(
171+
f"Field(name: {schema[index - 1]}, type: {schema[index]});"
172+
for index in range(1, len(schema))
173+
if schema[index] in _FIELD_TYPES
174+
)
175+
_set_span_attribute_if_value(
176+
span,
177+
"redis.create_index.fields",
178+
field_attribute,
179+
)
180+
181+
182+
def _add_search_attributes(span, response, args):
183+
_set_span_attribute_if_value(
184+
span, "redis.search.index", _value_or_none(args, 1)
185+
)
186+
_set_span_attribute_if_value(
187+
span, "redis.search.query", _value_or_none(args, 2)
188+
)
189+
# Parse response from search
190+
# https://redis.io/docs/latest/commands/ft.search/
191+
# Response in format:
192+
# [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
193+
# Returned documents in array format:
194+
# [first_field_name, first_field_value, second_field_name, second_field_value ...]
195+
number_of_returned_documents = _value_or_none(response, 0)
196+
_set_span_attribute_if_value(
197+
span, "redis.search.total", number_of_returned_documents
198+
)
199+
if "NOCONTENT" in args or not number_of_returned_documents:
200+
return
201+
for document_number in range(number_of_returned_documents):
202+
document_index = _value_or_none(response, 1 + 2 * document_number)
203+
if document_index:
204+
document = response[2 + 2 * document_number]
205+
for attribute_name_index in range(0, len(document), 2):
206+
_set_span_attribute_if_value(
207+
span,
208+
f"redis.search.xdoc_{document_index}.{document[attribute_name_index]}",
209+
document[attribute_name_index + 1],
210+
)
211+
212+
157213
def _build_span_meta_data_for_pipeline(instance):
158214
try:
159215
command_stack = (
@@ -182,8 +238,7 @@ def _build_span_meta_data_for_pipeline(instance):
182238
return command_stack, resource, span_name
183239

184240

185-
# pylint: disable=R0915
186-
def _instrument(
241+
def _traced_execute_factory(
187242
tracer,
188243
request_hook: _RequestHookT = None,
189244
response_hook: _ResponseHookT = None,
@@ -210,6 +265,14 @@ def _traced_execute_command(func, instance, args, kwargs):
210265
response_hook(span, instance, response)
211266
return response
212267

268+
return _traced_execute_command
269+
270+
271+
def _traced_execute_pipeline_factory(
272+
tracer,
273+
request_hook: _RequestHookT = None,
274+
response_hook: _ResponseHookT = None,
275+
):
213276
def _traced_execute_pipeline(func, instance, args, kwargs):
214277
(
215278
command_stack,
@@ -242,90 +305,14 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
242305

243306
return response
244307

245-
def _add_create_attributes(span, args):
246-
_set_span_attribute_if_value(
247-
span, "redis.create_index.index", _value_or_none(args, 1)
248-
)
249-
# According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
250-
try:
251-
schema_index = args.index("SCHEMA")
252-
except ValueError:
253-
return
254-
schema = args[schema_index:]
255-
field_attribute = ""
256-
# Schema in format:
257-
# [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
258-
field_attribute = "".join(
259-
f"Field(name: {schema[index - 1]}, type: {schema[index]});"
260-
for index in range(1, len(schema))
261-
if schema[index] in _FIELD_TYPES
262-
)
263-
_set_span_attribute_if_value(
264-
span,
265-
"redis.create_index.fields",
266-
field_attribute,
267-
)
308+
return _traced_execute_pipeline
268309

269-
def _add_search_attributes(span, response, args):
270-
_set_span_attribute_if_value(
271-
span, "redis.search.index", _value_or_none(args, 1)
272-
)
273-
_set_span_attribute_if_value(
274-
span, "redis.search.query", _value_or_none(args, 2)
275-
)
276-
# Parse response from search
277-
# https://redis.io/docs/latest/commands/ft.search/
278-
# Response in format:
279-
# [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
280-
# Returned documents in array format:
281-
# [first_field_name, first_field_value, second_field_name, second_field_value ...]
282-
number_of_returned_documents = _value_or_none(response, 0)
283-
_set_span_attribute_if_value(
284-
span, "redis.search.total", number_of_returned_documents
285-
)
286-
if "NOCONTENT" in args or not number_of_returned_documents:
287-
return
288-
for document_number in range(number_of_returned_documents):
289-
document_index = _value_or_none(response, 1 + 2 * document_number)
290-
if document_index:
291-
document = response[2 + 2 * document_number]
292-
for attribute_name_index in range(0, len(document), 2):
293-
_set_span_attribute_if_value(
294-
span,
295-
f"redis.search.xdoc_{document_index}.{document[attribute_name_index]}",
296-
document[attribute_name_index + 1],
297-
)
298-
299-
pipeline_class = (
300-
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
301-
)
302-
redis_class = "StrictRedis" if redis.VERSION < (3, 0, 0) else "Redis"
303-
304-
wrap_function_wrapper(
305-
"redis", f"{redis_class}.execute_command", _traced_execute_command
306-
)
307-
wrap_function_wrapper(
308-
"redis.client",
309-
f"{pipeline_class}.execute",
310-
_traced_execute_pipeline,
311-
)
312-
wrap_function_wrapper(
313-
"redis.client",
314-
f"{pipeline_class}.immediate_execute_command",
315-
_traced_execute_command,
316-
)
317-
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
318-
wrap_function_wrapper(
319-
"redis.cluster",
320-
"RedisCluster.execute_command",
321-
_traced_execute_command,
322-
)
323-
wrap_function_wrapper(
324-
"redis.cluster",
325-
"ClusterPipeline.execute",
326-
_traced_execute_pipeline,
327-
)
328310

311+
def _async_traced_execute_factory(
312+
tracer,
313+
request_hook: _RequestHookT = None,
314+
response_hook: _ResponseHookT = None,
315+
):
329316
async def _async_traced_execute_command(func, instance, args, kwargs):
330317
query = _format_command_args(args)
331318
name = _build_span_name(instance, args)
@@ -344,6 +331,14 @@ async def _async_traced_execute_command(func, instance, args, kwargs):
344331
response_hook(span, instance, response)
345332
return response
346333

334+
return _async_traced_execute_command
335+
336+
337+
def _async_traced_execute_pipeline_factory(
338+
tracer,
339+
request_hook: _RequestHookT = None,
340+
response_hook: _ResponseHookT = None,
341+
):
347342
async def _async_traced_execute_pipeline(func, instance, args, kwargs):
348343
(
349344
command_stack,
@@ -378,6 +373,57 @@ async def _async_traced_execute_pipeline(func, instance, args, kwargs):
378373

379374
return response
380375

376+
return _async_traced_execute_pipeline
377+
378+
379+
# pylint: disable=R0915
380+
def _instrument(
381+
tracer,
382+
request_hook: _RequestHookT = None,
383+
response_hook: _ResponseHookT = None,
384+
):
385+
_traced_execute_command = _traced_execute_factory(
386+
tracer, request_hook, response_hook
387+
)
388+
_traced_execute_pipeline = _traced_execute_pipeline_factory(
389+
tracer, request_hook, response_hook
390+
)
391+
pipeline_class = (
392+
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
393+
)
394+
redis_class = "StrictRedis" if redis.VERSION < (3, 0, 0) else "Redis"
395+
396+
wrap_function_wrapper(
397+
"redis", f"{redis_class}.execute_command", _traced_execute_command
398+
)
399+
wrap_function_wrapper(
400+
"redis.client",
401+
f"{pipeline_class}.execute",
402+
_traced_execute_pipeline,
403+
)
404+
wrap_function_wrapper(
405+
"redis.client",
406+
f"{pipeline_class}.immediate_execute_command",
407+
_traced_execute_command,
408+
)
409+
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
410+
wrap_function_wrapper(
411+
"redis.cluster",
412+
"RedisCluster.execute_command",
413+
_traced_execute_command,
414+
)
415+
wrap_function_wrapper(
416+
"redis.cluster",
417+
"ClusterPipeline.execute",
418+
_traced_execute_pipeline,
419+
)
420+
421+
_async_traced_execute_command = _async_traced_execute_factory(
422+
tracer, request_hook, response_hook
423+
)
424+
_async_traced_execute_pipeline = _async_traced_execute_pipeline_factory(
425+
tracer, request_hook, response_hook
426+
)
381427
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
382428
wrap_function_wrapper(
383429
"redis.asyncio",
@@ -407,6 +453,94 @@ async def _async_traced_execute_pipeline(func, instance, args, kwargs):
407453
)
408454

409455

456+
def _instrument_client(
457+
client,
458+
tracer,
459+
request_hook: _RequestHookT = None,
460+
response_hook: _ResponseHookT = None,
461+
):
462+
# first, handle async clients
463+
_async_traced_execute_command = _async_traced_execute_factory(
464+
tracer, request_hook, response_hook
465+
)
466+
_async_traced_execute_pipeline = _async_traced_execute_pipeline_factory(
467+
tracer, request_hook, response_hook
468+
)
469+
470+
def _async_pipeline_wrapper(func, instance, args, kwargs):
471+
result = func(*args, **kwargs)
472+
wrap_function_wrapper(
473+
result, "execute", _async_traced_execute_pipeline
474+
)
475+
wrap_function_wrapper(
476+
result, "immediate_execute_command", _async_traced_execute_command
477+
)
478+
return result
479+
480+
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
481+
client_type = (
482+
redis.asyncio.StrictRedis
483+
if redis.VERSION < (3, 0, 0)
484+
else redis.asyncio.Redis
485+
)
486+
487+
if isinstance(client, client_type):
488+
wrap_function_wrapper(
489+
client, "execute_command", _async_traced_execute_command
490+
)
491+
wrap_function_wrapper(client, "pipeline", _async_pipeline_wrapper)
492+
return
493+
494+
def _async_cluster_pipeline_wrapper(func, instance, args, kwargs):
495+
result = func(*args, **kwargs)
496+
wrap_function_wrapper(
497+
result, "execute", _async_traced_execute_pipeline
498+
)
499+
return result
500+
501+
# handle
502+
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION and isinstance(
503+
client, redis.asyncio.RedisCluster
504+
):
505+
wrap_function_wrapper(
506+
client, "execute_command", _async_traced_execute_command
507+
)
508+
wrap_function_wrapper(
509+
client, "pipeline", _async_cluster_pipeline_wrapper
510+
)
511+
return
512+
# for redis.client.Redis, redis.Cluster and v3.0.0 redis.client.StrictRedis
513+
# the wrappers are the same
514+
# client_type = (
515+
# redis.client.StrictRedis if redis.VERSION < (3, 0, 0) else redis.client.Redis
516+
# )
517+
_traced_execute_command = _traced_execute_factory(
518+
tracer, request_hook, response_hook
519+
)
520+
_traced_execute_pipeline = _traced_execute_pipeline_factory(
521+
tracer, request_hook, response_hook
522+
)
523+
524+
def _pipeline_wrapper(func, instance, args, kwargs):
525+
result = func(*args, **kwargs)
526+
wrap_function_wrapper(result, "execute", _traced_execute_pipeline)
527+
wrap_function_wrapper(
528+
result, "immediate_execute_command", _traced_execute_command
529+
)
530+
return result
531+
532+
wrap_function_wrapper(
533+
client,
534+
"execute_command",
535+
_traced_execute_command,
536+
)
537+
wrap_function_wrapper(
538+
client,
539+
"pipeline",
540+
_pipeline_wrapper,
541+
)
542+
543+
410544
class RedisInstrumentor(BaseInstrumentor):
411545
"""An instrumentor for Redis
412546
See `BaseInstrumentor`
@@ -430,11 +564,20 @@ def _instrument(self, **kwargs):
430564
tracer_provider=tracer_provider,
431565
schema_url="https://opentelemetry.io/schemas/1.11.0",
432566
)
433-
_instrument(
434-
tracer,
435-
request_hook=kwargs.get("request_hook"),
436-
response_hook=kwargs.get("response_hook"),
437-
)
567+
redis_client = kwargs.get("client")
568+
if redis_client:
569+
_instrument_client(
570+
redis_client,
571+
tracer,
572+
request_hook=kwargs.get("request_hook"),
573+
response_hook=kwargs.get("response_hook"),
574+
)
575+
else:
576+
_instrument(
577+
tracer,
578+
request_hook=kwargs.get("request_hook"),
579+
response_hook=kwargs.get("response_hook"),
580+
)
438581

439582
def _uninstrument(self, **kwargs):
440583
if redis.VERSION < (3, 0, 0):

0 commit comments

Comments
 (0)