Skip to content

Commit 2b2670d

Browse files
committed
feat(redis): fix lint errors to pass tests (#3826)
1 parent 420b76c commit 2b2670d

File tree

1 file changed

+142
-155
lines changed
  • instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis

1 file changed

+142
-155
lines changed

instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py

Lines changed: 142 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,110 @@ def _get_redis_conn_info(instance):
203203
return host, port, db, unix_sock
204204

205205

206+
# Helper function to set old semantic convention attributes
207+
def _set_old_semconv_attributes(
208+
span,
209+
instance,
210+
args,
211+
query,
212+
command_stack,
213+
resource,
214+
is_pipeline,
215+
):
216+
span.set_attribute(DB_STATEMENT, query if not is_pipeline else resource)
217+
_set_connection_attributes(span, instance)
218+
if not is_pipeline:
219+
span.set_attribute("db.redis.args_length", len(args))
220+
else:
221+
span.set_attribute("db.redis.pipeline_length", len(command_stack))
222+
223+
224+
# Helper function to set new semantic convention attributes
225+
def _set_new_semconv_attributes(
226+
span,
227+
instance,
228+
args,
229+
query,
230+
command_stack,
231+
resource,
232+
is_pipeline,
233+
):
234+
span.set_attribute("db.system.name", "redis")
235+
236+
if not is_pipeline:
237+
if args and len(args) > 0:
238+
span.set_attribute("db.operation.name", args[0])
239+
else:
240+
span.set_attribute("db.operation.name", "PIPELINE")
241+
if len(command_stack) > 1:
242+
span.set_attribute("db.operation.batch.size", len(command_stack))
243+
244+
host, port, db, unix_sock = _get_redis_conn_info(instance)
245+
if db is not None:
246+
span.set_attribute("db.namespace", str(db))
247+
span.set_attribute("db.query.text", query if not is_pipeline else resource)
248+
if host:
249+
span.set_attribute("server.address", host)
250+
span.set_attribute("network.peer.address", host)
251+
if port:
252+
span.set_attribute("server.port", port)
253+
span.set_attribute("network.peer.port", port)
254+
if unix_sock:
255+
span.set_attribute("network.peer.address", unix_sock)
256+
span.set_attribute("network.transport", "unix")
257+
258+
# db.stored_procedure.name (only for individual commands)
259+
if not is_pipeline:
260+
if args and args[0] in ("EVALSHA", "FCALL") and len(args) > 1:
261+
span.set_attribute("db.stored_procedure.name", args[1])
262+
263+
264+
# Helper function to set all common span attributes
265+
def _set_span_attributes(
266+
span,
267+
instance,
268+
args, # For individual commands
269+
query, # For individual commands
270+
command_stack, # For pipelines
271+
resource, # For pipelines
272+
semconv_opt_in_mode,
273+
):
274+
if not span.is_recording():
275+
return
276+
277+
is_pipeline = command_stack is not None
278+
279+
if _report_old(semconv_opt_in_mode):
280+
_set_old_semconv_attributes(
281+
span, instance, args, query, command_stack, resource, is_pipeline
282+
)
283+
284+
if _report_new(semconv_opt_in_mode):
285+
_set_new_semconv_attributes(
286+
span, instance, args, query, command_stack, resource, is_pipeline
287+
)
288+
289+
# Command-specific attributes that depend on span.name (e.g., redis.create_index)
290+
if not is_pipeline and span.name == "redis.create_index":
291+
_add_create_attributes(span, args)
292+
293+
294+
# Helper function to set error attributes on a span
295+
def _set_span_error_attributes(span, exc, semconv_opt_in_mode):
296+
if not span.is_recording():
297+
return
298+
299+
if _report_new(semconv_opt_in_mode):
300+
error_type = getattr(exc, "args", [None])[0]
301+
if error_type and isinstance(error_type, str):
302+
prefix = error_type.split(" ")[0]
303+
span.set_attribute("db.response.status_code", prefix)
304+
span.set_attribute("error.type", prefix)
305+
else:
306+
span.set_attribute("error.type", type(exc).__qualname__)
307+
span.set_status(StatusCode.ERROR)
308+
309+
206310
def _traced_execute_factory(
207311
tracer: Tracer,
208312
request_hook: RequestHook | None = None,
@@ -222,38 +326,10 @@ def _traced_execute_command(
222326
with tracer.start_as_current_span(
223327
name, kind=trace.SpanKind.CLIENT
224328
) as span:
225-
if span.is_recording():
226-
# Old/existing attributes:
227-
if _report_old(semconv_opt_in_mode):
228-
span.set_attribute(DB_STATEMENT, query)
229-
_set_connection_attributes(span, instance)
230-
span.set_attribute("db.redis.args_length", len(args))
231-
# New semantic conventions:
232-
if _report_new(semconv_opt_in_mode):
233-
span.set_attribute("db.system.name", "redis")
234-
if args and len(args) > 0:
235-
span.set_attribute("db.operation.name", args[0])
236-
host, port, db, unix_sock = _get_redis_conn_info(instance)
237-
if db is not None:
238-
span.set_attribute("db.namespace", str(db))
239-
span.set_attribute("db.query.text", query)
240-
if host:
241-
span.set_attribute("server.address", host)
242-
span.set_attribute("network.peer.address", host)
243-
if port:
244-
span.set_attribute("server.port", port)
245-
span.set_attribute("network.peer.port", port)
246-
if unix_sock:
247-
span.set_attribute("network.peer.address", unix_sock)
248-
span.set_attribute("network.transport", "unix")
249-
if (
250-
args
251-
and args[0] in ("EVALSHA", "FCALL")
252-
and len(args) > 1
253-
):
254-
span.set_attribute("db.stored_procedure.name", args[1])
255-
if span.name == "redis.create_index":
256-
_add_create_attributes(span, args)
329+
_set_span_attributes(
330+
span, instance, args, query, None, None, semconv_opt_in_mode
331+
)
332+
257333
if callable(request_hook):
258334
request_hook(span, instance, args, kwargs)
259335
try:
@@ -262,19 +338,8 @@ def _traced_execute_command(
262338
if span.is_recording():
263339
span.set_status(StatusCode.UNSET)
264340
raise watch_exception
265-
except Exception as exc:
266-
if _report_new(semconv_opt_in_mode) and span.is_recording():
267-
error_type = getattr(exc, "args", [None])[0]
268-
if error_type and isinstance(error_type, str):
269-
prefix = error_type.split(" ")[0]
270-
span.set_attribute("db.response.status_code", prefix)
271-
span.set_attribute("error.type", prefix)
272-
else:
273-
span.set_attribute(
274-
"error.type", type(exc).__qualname__
275-
)
276-
if span.is_recording():
277-
span.set_status(StatusCode.ERROR)
341+
except Exception as exc: # pylint: disable=broad-exception-caught
342+
_set_span_error_attributes(span, exc, semconv_opt_in_mode)
278343
raise
279344
if span.is_recording() and span.name == "redis.search":
280345
_add_search_attributes(span, response, args)
@@ -308,39 +373,26 @@ def _traced_execute_pipeline(
308373
with tracer.start_as_current_span(
309374
span_name, kind=trace.SpanKind.CLIENT
310375
) as span:
311-
if span.is_recording():
312-
if _report_old(semconv_opt_in_mode):
313-
span.set_attribute(DB_STATEMENT, resource)
314-
_set_connection_attributes(span, instance)
315-
span.set_attribute(
316-
"db.redis.pipeline_length", len(command_stack)
317-
)
318-
if _report_new(semconv_opt_in_mode):
319-
span.set_attribute("db.system.name", "redis")
320-
span.set_attribute("db.operation.name", "PIPELINE")
321-
host, port, db, unix_sock = _get_redis_conn_info(instance)
322-
if db is not None:
323-
span.set_attribute("db.namespace", str(db))
324-
span.set_attribute("db.query.text", resource)
325-
if len(command_stack) > 1:
326-
span.set_attribute(
327-
"db.operation.batch.size", len(command_stack)
328-
)
329-
if host:
330-
span.set_attribute("server.address", host)
331-
span.set_attribute("network.peer.address", host)
332-
if port:
333-
span.set_attribute("server.port", port)
334-
span.set_attribute("network.peer.port", port)
335-
if unix_sock:
336-
span.set_attribute("network.peer.address", unix_sock)
337-
span.set_attribute("network.transport", "unix")
376+
_set_span_attributes(
377+
span,
378+
instance,
379+
None,
380+
None,
381+
command_stack,
382+
resource,
383+
semconv_opt_in_mode,
384+
)
385+
338386
response = None
339387
try:
340388
response = func(*args, **kwargs)
341389
except redis.WatchError as watch_exception:
342-
span.set_status(StatusCode.UNSET)
390+
if span.is_recording():
391+
span.set_status(StatusCode.UNSET)
343392
exception = watch_exception
393+
except Exception as exc: # pylint: disable=broad-exception-caught
394+
_set_span_error_attributes(span, exc, semconv_opt_in_mode)
395+
exception = exc
344396

345397
if callable(response_hook):
346398
response_hook(span, instance, response)
@@ -372,34 +424,10 @@ async def _async_traced_execute_command(
372424
with tracer.start_as_current_span(
373425
name, kind=trace.SpanKind.CLIENT
374426
) as span:
375-
if span.is_recording():
376-
if _report_old(semconv_opt_in_mode):
377-
span.set_attribute(DB_STATEMENT, query)
378-
_set_connection_attributes(span, instance)
379-
span.set_attribute("db.redis.args_length", len(args))
380-
if _report_new(semconv_opt_in_mode):
381-
span.set_attribute("db.system.name", "redis")
382-
if args and len(args) > 0:
383-
span.set_attribute("db.operation.name", args[0])
384-
host, port, db, unix_sock = _get_redis_conn_info(instance)
385-
if db is not None:
386-
span.set_attribute("db.namespace", str(db))
387-
span.set_attribute("db.query.text", query)
388-
if host:
389-
span.set_attribute("server.address", host)
390-
span.set_attribute("network.peer.address", host)
391-
if port:
392-
span.set_attribute("server.port", port)
393-
span.set_attribute("network.peer.port", port)
394-
if unix_sock:
395-
span.set_attribute("network.peer.address", unix_sock)
396-
span.set_attribute("network.transport", "unix")
397-
if (
398-
args
399-
and args[0] in ("EVALSHA", "FCALL")
400-
and len(args) > 1
401-
):
402-
span.set_attribute("db.stored_procedure.name", args[1])
427+
_set_span_attributes(
428+
span, instance, args, query, None, None, semconv_opt_in_mode
429+
)
430+
403431
if callable(request_hook):
404432
request_hook(span, instance, args, kwargs)
405433
try:
@@ -408,19 +436,8 @@ async def _async_traced_execute_command(
408436
if span.is_recording():
409437
span.set_status(StatusCode.UNSET)
410438
raise watch_exception
411-
except Exception as exc:
412-
if _report_new(semconv_opt_in_mode) and span.is_recording():
413-
error_type = getattr(exc, "args", [None])[0]
414-
if error_type and isinstance(error_type, str):
415-
prefix = error_type.split(" ")[0]
416-
span.set_attribute("db.response.status_code", prefix)
417-
span.set_attribute("error.type", prefix)
418-
else:
419-
span.set_attribute(
420-
"error.type", type(exc).__qualname__
421-
)
422-
if span.is_recording():
423-
span.set_status(StatusCode.ERROR)
439+
except Exception as exc: # pylint: disable=broad-exception-caught
440+
_set_span_error_attributes(span, exc, semconv_opt_in_mode)
424441
raise
425442
if span.is_recording() and span.name == "redis.search":
426443
_add_search_attributes(span, response, args)
@@ -455,33 +472,16 @@ async def _async_traced_execute_pipeline(
455472
with tracer.start_as_current_span(
456473
span_name, kind=trace.SpanKind.CLIENT
457474
) as span:
458-
if span.is_recording():
459-
if _report_old(semconv_opt_in_mode):
460-
span.set_attribute(DB_STATEMENT, resource)
461-
_set_connection_attributes(span, instance)
462-
span.set_attribute(
463-
"db.redis.pipeline_length", len(command_stack)
464-
)
465-
if _report_new(semconv_opt_in_mode):
466-
span.set_attribute("db.system.name", "redis")
467-
span.set_attribute("db.operation.name", "PIPELINE")
468-
host, port, db, unix_sock = _get_redis_conn_info(instance)
469-
if db is not None:
470-
span.set_attribute("db.namespace", str(db))
471-
span.set_attribute("db.query.text", resource)
472-
if len(command_stack) > 1:
473-
span.set_attribute(
474-
"db.operation.batch.size", len(command_stack)
475-
)
476-
if host:
477-
span.set_attribute("server.address", host)
478-
span.set_attribute("network.peer.address", host)
479-
if port:
480-
span.set_attribute("server.port", port)
481-
span.set_attribute("network.peer.port", port)
482-
if unix_sock:
483-
span.set_attribute("network.peer.address", unix_sock)
484-
span.set_attribute("network.transport", "unix")
475+
_set_span_attributes(
476+
span,
477+
instance,
478+
None,
479+
None,
480+
command_stack,
481+
resource,
482+
semconv_opt_in_mode,
483+
)
484+
485485
response = None
486486
try:
487487
response = await func(*args, **kwargs)
@@ -490,20 +490,7 @@ async def _async_traced_execute_pipeline(
490490
span.set_status(StatusCode.UNSET)
491491
exception = watch_exception
492492
except Exception as exc: # pylint: disable=broad-exception-caught
493-
if span.is_recording():
494-
if _report_new(semconv_opt_in_mode):
495-
error_type = getattr(exc, "args", [None])[0]
496-
if error_type and isinstance(error_type, str):
497-
prefix = error_type.split(" ")[0]
498-
span.set_attribute(
499-
"db.response.status_code", prefix
500-
)
501-
span.set_attribute("error.type", prefix)
502-
else:
503-
span.set_attribute(
504-
"error.type", type(exc).__qualname__
505-
)
506-
span.set_status(StatusCode.ERROR)
493+
_set_span_error_attributes(span, exc, semconv_opt_in_mode)
507494
exception = exc
508495

509496
if callable(response_hook):

0 commit comments

Comments
 (0)