Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 105 additions & 50 deletions newrelic/api/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from contextlib import contextmanager

from opentelemetry import trace as otel_api_trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.propagate import set_global_textmap

from newrelic.api.application import application_instance
from newrelic.api.background_task import BackgroundTask
Expand All @@ -26,29 +30,78 @@
from newrelic.api.message_trace import MessageTrace
from newrelic.api.message_transaction import MessageTransaction
from newrelic.api.time_trace import current_trace, notice_error
from newrelic.api.transaction import Sentinel, current_transaction
from newrelic.api.transaction import Sentinel, current_transaction, accept_distributed_trace_headers, insert_distributed_trace_headers
from newrelic.api.web_transaction import WebTransaction

from newrelic.core.otlp_utils import create_resource

_logger = logging.getLogger(__name__)


class NRTraceContextPropagator(TraceContextTextMapPropagator):
HEADER_KEYS = ("traceparent", "tracestate", "newrelic")

def extract(self, carrier, context=None, getter=None):
transaction = current_transaction()
# If we are passing into New Relic, traceparent
# and/or tracestate's keys also need to be NR compatible.

if transaction:
nr_headers = {header_key: getter.get(carrier, header_key)[0] for header_key in self.HEADER_KEYS if getter.get(carrier, header_key)}
transaction.accept_distributed_trace_headers(nr_headers)
return super().extract(carrier=carrier, context=context, getter=getter)


def inject(self, carrier, context=None, setter=None):
transaction = current_transaction()
# Only insert headers if we have not done so already this transaction
# New Relic's Distributed Trace State will have the following states:
# 0 (00) if not set:
# Transaction has not inserted any outbound headers nor has
# it accepted any inbound headers (yet).
# 1 (01) if already accepted:
# Transaction has accepted inbound headers and is able to
# insert outbound headers to the next app if needed.
# 2 (10) if inserted but not accepted:
# Transaction has inserted outbound headers already.
# Do not insert outbound headers multiple times. This is
# a fundamental difference in OTel vs NR behavior: if
# headers are inserted by OTel multiple times, it will
# propagate the last set of data that was inserted. NR
# will not allow more than one header insertion per
# transaction.
# 3 (11) if accepted, then inserted:
# Transaction has accepted inbound headers and has inserted
# outbound headers.

if not transaction:
return super().inject(carrier=carrier, context=context, setter=setter)

if transaction._distributed_trace_state < 2:
nr_headers = []
transaction.insert_distributed_trace_headers(nr_headers)
for key, value in nr_headers:
setter.set(carrier, key, value)
# Do NOT call super().inject() since we have already
# inserted the headers here. It will not cause harm,
# but it is redundant logic.

# If distributed_trace_state == 2 or 3, do not inject headers.


# Context and Context Propagator Setup
otel_context_propagator = CompositePropagator(
propagators=[
NRTraceContextPropagator(),
W3CBaggagePropagator(),
]
)
set_global_textmap(otel_context_propagator)

# ----------------------------------------------
# Custom OTel Spans and Traces
# ----------------------------------------------

# TracerProvider: we can think of this as the agent instance. Only one can exist
# SpanProcessor: we can think of this as an application. In NR, we can have multiple applications
# though right now, we can only do SpanProcessor and SynchronousMultiSpanProcessor
# Tracer: we can think of this as the transaction.
# Span: we can think of this as the trace.
# Links functionality has now been enabled but not implemented yet. Links are relationships
# between spans, but lateral in hierarchy. In NR we only have parent-child relationships.
# We may want to preserve this information with a custom attribute. We can also add this
# as a new attribute in a trace, but it will still not be seen in the UI other than a trace
# attribute.


class Span(otel_api_trace.Span):
def __init__(
self,
Expand All @@ -72,12 +125,7 @@ def __init__(
) # This attribute is purely to prevent garbage collection
self.nr_trace = None
self.instrumenting_module = instrumenting_module

# Do not create a New Relic trace if parent
# is a remote span and it is not sampled
if self._remote() and not self._sampled():
return


self.nr_parent = None
current_nr_trace = current_trace()
if (
Expand All @@ -100,7 +148,7 @@ def __init__(
_logger.error(
"OpenTelemetry span (%s) and NR trace (%s) do not match nor correspond to a remote span. Open Telemetry span will not be reported to New Relic. Please report this problem to New Relic.",
self.otel_parent,
current_nr_trace,
current_nr_trace, # NR parent trace
)
return

Expand Down Expand Up @@ -140,26 +188,6 @@ def __init__(

self.nr_trace.__enter__()

def _sampled(self):
# Uses NR to determine if the trace is sampled
#
# transaction.sampled can be `None`, `True`, `False`.
# If `None`, this has not been computed by NR which
# can also mean the following:
# 1. There was not a context passed in that explicitly has sampling disabled.
# This flag would be found in the traceparent or traceparent and tracespan headers.
# 2. Transaction was not created where DT headers are accepted during __init__
# Therefore, we will treat a value of `None` as `True` for now.
#
# The primary reason for this behavior is because Otel expects to
# only be able to record information like events and attributes
# when `is_recording()` == `True`

if self.otel_parent:
return bool(self.otel_parent.trace_flags)
else:
return bool(self.nr_transaction and (self.nr_transaction.sampled or (self.nr_transaction.sampled is None)))

def _remote(self):
# Remote span denotes if propagated from a remote parent
return bool(self.otel_parent and self.otel_parent.is_remote)
Expand All @@ -168,14 +196,12 @@ def get_span_context(self):
if not getattr(self, "nr_trace", False):
return otel_api_trace.INVALID_SPAN_CONTEXT

otel_tracestate_headers = None

return otel_api_trace.SpanContext(
trace_id=int(self.nr_transaction.trace_id, 16),
span_id=int(self.nr_trace.guid, 16),
is_remote=self._remote(),
trace_flags=otel_api_trace.TraceFlags(0x01 if self._sampled() else 0x00),
trace_state=otel_api_trace.TraceState(otel_tracestate_headers),
trace_flags=otel_api_trace.TraceFlags(0x01),
trace_state=otel_api_trace.TraceState(),
)

def set_attribute(self, key, value):
Expand All @@ -193,8 +219,7 @@ def _set_attributes_in_nr(self, otel_attributes=None):

def add_event(self, name, attributes=None, timestamp=None):
# TODO: Not implemented yet.
# We can implement this as a log event
raise NotImplementedError("TODO: We can implement this as a log event.")
raise NotImplementedError("Events are not implemented yet.")

def add_link(self, context=None, attributes=None):
# TODO: Not implemented yet.
Expand All @@ -208,7 +233,13 @@ def update_name(self, name):
self.nr_trace.name = self._name

def is_recording(self):
return self._sampled() and not (getattr(self.nr_trace, None), "end_time", None)
# TODO: Refine this logic further if needed.
if getattr(self.nr_trace, "end_time", None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where a span has been created but recording hasn't started yet that we need to handle here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes--In most of the OTel library instrumentations, span.is_recording() is called almost immediately after creation. This is an example of that happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, it seems pretty common for it to call is_recording() immediately after the creation of the span and right before the end of the span in the framework instrumentation.

return False
if not getattr(self.nr_transaction, "priority", None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused why we are looking at the priority here? Is this just being used to determine whether the transaction has ended or not?

Copy link
Contributor Author

@lrafeei lrafeei Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, in OTel, this is just used to determine if the span has ended or not. We end up not incorporating an end_time attribute for spans (and instead allowing the NR trace and transactions to deal with that). This call is usually made almost immediately after starting a span (it is also usually called at the end of the span). If it is at the beginning of the start of the span, there will not be an nr_trace attribute yet. There will be an nr_transaction attribute, though.

For the second part of the logic here, my thought process here is that if an OTel header is being passed into the Hybrid agent, it may have a sampled flag but not a priority flag. So, in order to respect the sampling decision configured in the Hybrid agent, if there is no priority flag, it should run _make_sampling_decision()

OTel has a concept of recording but not sampling (which still allows for context propagation), so even if the transaction is not sampled, if the priority is greater than 0, it is still a GO for recording.

self.nr_transaction._make_sampling_decision()

return self.nr_transaction.priority > 0

def set_status(self, status, description=None):
# TODO: not implemented yet
Expand Down Expand Up @@ -277,6 +308,10 @@ def start_span(
self.nr_application = application_instance()
self.attributes = attributes or {}

if not self.nr_application.active:
# Force application registration if not already active
self.nr_application.activate()

if not self.nr_application.settings.otel_bridge.enabled:
return otel_api_trace.INVALID_SPAN

Expand All @@ -286,7 +321,12 @@ def start_span(
if parent_span_context is None or not parent_span_context.is_valid:
parent_span_context = None

parent_span_trace_id = None
if parent_span_context and self.nr_application.settings.distributed_tracing.enabled:
parent_span_trace_id = parent_span_context.trace_id

# If remote_parent, transaction must be created, regardless of kind type
# Make sure we transfer DT headers when we are here, if DT is enabled
if parent_span_context and parent_span_context.is_remote:
if kind in (otel_api_trace.SpanKind.SERVER, otel_api_trace.SpanKind.CLIENT):
# This is a web request
Expand All @@ -296,6 +336,7 @@ def start_span(
port = self.attributes.get("net.host.port")
request_method = self.attributes.get("http.method")
request_path = self.attributes.get("http.route")

transaction = WebTransaction(
self.nr_application,
name=name,
Expand All @@ -306,7 +347,11 @@ def start_span(
request_path=request_path,
headers=headers,
)
elif kind in (otel_api_trace.SpanKind.PRODUCER, otel_api_trace.SpanKind.INTERNAL):

elif kind in (
otel_api_trace.SpanKind.PRODUCER,
otel_api_trace.SpanKind.INTERNAL,
):
transaction = BackgroundTask(self.nr_application, name=name)
elif kind == otel_api_trace.SpanKind.CONSUMER:
transaction = MessageTransaction(
Expand All @@ -315,7 +360,7 @@ def start_span(
destination_name=name,
application=self.nr_application,
transport_type=self.instrumentation_library,
headers=headers,
headers=None,
)

transaction.__enter__()
Expand Down Expand Up @@ -348,6 +393,16 @@ def start_span(
request_path=request_path,
headers=headers,
)

# If incoming headers do not exist, the transaction
# GUID needs to be updated to that of the parent span.
if not headers and parent_span_trace_id:
# Only update trace_id if this is the first transaction in the trace.
transaction._trace_id = f"{parent_span_trace_id:x}" if (transaction.guid == transaction.trace_id[:16]) else transaction._trace_id

guid = parent_span_trace_id >> 64
transaction.guid = f"{guid:x}"

transaction.__enter__()
elif kind == otel_api_trace.SpanKind.INTERNAL:
if transaction:
Expand All @@ -372,7 +427,7 @@ def start_span(
destination_name=name,
application=self.nr_application,
transport_type=self.instrumentation_library,
headers=headers,
headers=None,
)
transaction.__enter__()
elif kind == otel_api_trace.SpanKind.PRODUCER:
Expand Down
20 changes: 18 additions & 2 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4429,11 +4429,27 @@ def _process_module_builtin_defaults():

# Hybrid Agent Hooks
_process_module_definition(
"opentelemetry.trace", "newrelic.hooks.hybridagent_opentelemetry", "instrument_trace_api"
"opentelemetry.context",
"newrelic.hooks.hybridagent_opentelemetry",
"instrument_context_api",
)

_process_module_definition(
"opentelemetry.instrumentation.utils", "newrelic.hooks.hybridagent_opentelemetry", "instrument_utils"
"opentelemetry.instrumentation.propagators",
"newrelic.hooks.hybridagent_opentelemetry",
"instrument_global_propagators_api",
)

_process_module_definition(
"opentelemetry.trace",
"newrelic.hooks.hybridagent_opentelemetry",
"instrument_trace_api",
)

_process_module_definition(
"opentelemetry.instrumentation.utils",
"newrelic.hooks.hybridagent_opentelemetry",
"instrument_utils",
)


Expand Down
4 changes: 3 additions & 1 deletion newrelic/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,9 @@ def default_otlp_host(host):
_settings.azure_operator.enabled = _environ_as_bool("NEW_RELIC_AZURE_OPERATOR_ENABLED", default=False)
_settings.package_reporting.enabled = _environ_as_bool("NEW_RELIC_PACKAGE_REPORTING_ENABLED", default=True)
_settings.ml_insights_events.enabled = _environ_as_bool("NEW_RELIC_ML_INSIGHTS_EVENTS_ENABLED", default=False)
_settings.otel_bridge.enabled = _environ_as_bool("NEW_RELIC_OTEL_BRIDGE_ENABLED", default=False)
_settings.otel_bridge.enabled = _environ_as_bool(
"NEW_RELIC_OTEL_BRIDGE_ENABLED", default=False
)


def global_settings():
Expand Down
Loading
Loading