Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3743](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3743))
- Add `rstcheck` to pre-commit to stop introducing invalid RST
([#3777](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3777))
- `opentelemetry-instrumentation-redis`: Add support for semantic convention opt-in and stable Redis span attributes
([#3826](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3826))
- `opentelemetry-exporter-credential-provider-gcp`: create this package which provides support for supplying your machine's Application Default
Credentials (https://cloud.google.com/docs/authentication/application-default-credentials) to the OTLP Exporters created automatically by OpenTelemetry Python's auto instrumentation. These credentials authorize OTLP traces to be sent to `telemetry.googleapis.com`. [#3766](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3766).
- `opentelemetry-instrumentation-psycopg`: Add missing parameter `capture_parameters` to instrumentor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ def response_hook(span, instance, response):
from wrapt import wrap_function_wrapper

from opentelemetry import trace
from opentelemetry.instrumentation._semconv import (
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
_report_new,
_report_old,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.redis.package import _instruments
from opentelemetry.instrumentation.redis.util import (
Expand Down Expand Up @@ -185,6 +191,122 @@ def response_hook(span, instance, response):
_INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry"


def _get_redis_conn_info(instance):
host, port, db, unix_sock = None, None, None, None
pool = getattr(instance, "connection_pool", None)
if pool:
conn_kwargs = pool.connection_kwargs
host = conn_kwargs.get("host")
port = conn_kwargs.get("port")
db = conn_kwargs.get("db", 0)
unix_sock = conn_kwargs.get("path")
return host, port, db, unix_sock


# Helper function to set old semantic convention attributes
def _set_old_semconv_attributes(
span,
instance,
args,
query,
command_stack,
resource,
is_pipeline,
):
span.set_attribute(DB_STATEMENT, query if not is_pipeline else resource)
_set_connection_attributes(span, instance)
if not is_pipeline:
span.set_attribute("db.redis.args_length", len(args))
else:
span.set_attribute("db.redis.pipeline_length", len(command_stack))


# Helper function to set new semantic convention attributes
def _set_new_semconv_attributes(
span,
instance,
args,
query,
command_stack,
resource,
is_pipeline,
):
span.set_attribute("db.system.name", "redis")

if not is_pipeline:
if args and len(args) > 0:
span.set_attribute("db.operation.name", args[0])
else:
span.set_attribute("db.operation.name", "PIPELINE")
if len(command_stack) > 1:
span.set_attribute("db.operation.batch.size", len(command_stack))

host, port, db, unix_sock = _get_redis_conn_info(instance)
if db is not None:
span.set_attribute("db.namespace", str(db))
span.set_attribute("db.query.text", query if not is_pipeline else resource)
if host:
span.set_attribute("server.address", host)
span.set_attribute("network.peer.address", host)
if port:
span.set_attribute("server.port", port)
span.set_attribute("network.peer.port", port)
if unix_sock:
span.set_attribute("network.peer.address", unix_sock)
span.set_attribute("network.transport", "unix")

# db.stored_procedure.name (only for individual commands)
if not is_pipeline:
if args and args[0] in ("EVALSHA", "FCALL") and len(args) > 1:
span.set_attribute("db.stored_procedure.name", args[1])


# Helper function to set all common span attributes
def _set_span_attributes(
span,
instance,
args, # For individual commands
query, # For individual commands
command_stack, # For pipelines
resource, # For pipelines
semconv_opt_in_mode,
):
if not span.is_recording():
return

is_pipeline = command_stack is not None

if _report_old(semconv_opt_in_mode):
_set_old_semconv_attributes(
span, instance, args, query, command_stack, resource, is_pipeline
)

if _report_new(semconv_opt_in_mode):
_set_new_semconv_attributes(
span, instance, args, query, command_stack, resource, is_pipeline
)

# Command-specific attributes that depend on span.name (e.g., redis.create_index)
if not is_pipeline and span.name == "redis.create_index":
_add_create_attributes(span, args)


# Helper function to set error attributes on a span
def _set_span_error_attributes(span, exc, semconv_opt_in_mode):
if not span.is_recording():
return

if _report_new(semconv_opt_in_mode):
error_type = getattr(exc, "args", [None])[0]
if error_type and isinstance(error_type, str):
prefix = error_type.split(" ")[0]
span.set_attribute("db.response.status_code", prefix)
span.set_attribute("error.type", prefix)
else:
span.set_attribute("error.type", type(exc).__qualname__)
span.set_status(StatusCode.ERROR)


def _traced_execute_factory(
tracer: Tracer,
request_hook: RequestHook | None = None,
Expand All @@ -198,21 +320,29 @@ def _traced_execute_command(
) -> R:
query = _format_command_args(args)
name = _build_span_name(instance, args)
semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.DATABASE
)
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(DB_STATEMENT, query)
_set_connection_attributes(span, instance)
span.set_attribute("db.redis.args_length", len(args))
if span.name == "redis.create_index":
_add_create_attributes(span, args)
_set_span_attributes(
span, instance, args, query, None, None, semconv_opt_in_mode
)

if callable(request_hook):
request_hook(span, instance, args, kwargs)
response = func(*args, **kwargs)
if span.is_recording():
if span.name == "redis.search":
_add_search_attributes(span, response, args)
try:
response = func(*args, **kwargs)
except redis.WatchError as watch_exception:
if span.is_recording():
span.set_status(StatusCode.UNSET)
raise watch_exception
except Exception as exc: # pylint: disable=broad-exception-caught
_set_span_error_attributes(span, exc, semconv_opt_in_mode)
raise
if span.is_recording() and span.name == "redis.search":
_add_search_attributes(span, response, args)
if callable(response_hook):
response_hook(span, instance, response)
return response
Expand All @@ -237,22 +367,32 @@ def _traced_execute_pipeline(
span_name,
) = _build_span_meta_data_for_pipeline(instance)
exception = None
semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.DATABASE
)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(DB_STATEMENT, resource)
_set_connection_attributes(span, instance)
span.set_attribute(
"db.redis.pipeline_length", len(command_stack)
)
_set_span_attributes(
span,
instance,
None,
None,
command_stack,
resource,
semconv_opt_in_mode,
)

response = None
try:
response = func(*args, **kwargs)
except redis.WatchError as watch_exception:
span.set_status(StatusCode.UNSET)
if span.is_recording():
span.set_status(StatusCode.UNSET)
exception = watch_exception
except Exception as exc: # pylint: disable=broad-exception-caught
_set_span_error_attributes(span, exc, semconv_opt_in_mode)
exception = exc

if callable(response_hook):
response_hook(span, instance, response)
Expand All @@ -278,17 +418,29 @@ async def _async_traced_execute_command(
) -> Awaitable[R]:
query = _format_command_args(args)
name = _build_span_name(instance, args)

semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.DATABASE
)
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(DB_STATEMENT, query)
_set_connection_attributes(span, instance)
span.set_attribute("db.redis.args_length", len(args))
_set_span_attributes(
span, instance, args, query, None, None, semconv_opt_in_mode
)

if callable(request_hook):
request_hook(span, instance, args, kwargs)
response = await func(*args, **kwargs)
try:
response = await func(*args, **kwargs)
except redis.WatchError as watch_exception:
if span.is_recording():
span.set_status(StatusCode.UNSET)
raise watch_exception
except Exception as exc: # pylint: disable=broad-exception-caught
_set_span_error_attributes(span, exc, semconv_opt_in_mode)
raise
if span.is_recording() and span.name == "redis.search":
_add_search_attributes(span, response, args)
if callable(response_hook):
response_hook(span, instance, response)
return response
Expand All @@ -314,28 +466,37 @@ async def _async_traced_execute_pipeline(
) = _build_span_meta_data_for_pipeline(instance)

exception = None

semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.DATABASE
)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(DB_STATEMENT, resource)
_set_connection_attributes(span, instance)
span.set_attribute(
"db.redis.pipeline_length", len(command_stack)
)
_set_span_attributes(
span,
instance,
None,
None,
command_stack,
resource,
semconv_opt_in_mode,
)

response = None
try:
response = await func(*args, **kwargs)
except redis.WatchError as watch_exception:
span.set_status(StatusCode.UNSET)
if span.is_recording():
span.set_status(StatusCode.UNSET)
exception = watch_exception
except Exception as exc: # pylint: disable=broad-exception-caught
_set_span_error_attributes(span, exc, semconv_opt_in_mode)
exception = exc

if callable(response_hook):
response_hook(span, instance, response)

if exception:
if exception is not None:
raise exception

return response
Expand Down
Loading