From 7650f5210ee0cba3d562021aeddb113c2b5daec0 Mon Sep 17 00:00:00 2001 From: Hannah Stepanek Date: Fri, 6 Jun 2025 10:33:53 -0700 Subject: [PATCH 1/3] Add support 4 CT & dropping inprocess spans CT = Core Tracing --- newrelic/api/transaction.py | 10 +++- newrelic/common/streaming_utils.py | 70 +++++++++++++++++++++++- newrelic/config.py | 3 ++ newrelic/core/application.py | 14 ++++- newrelic/core/attribute.py | 17 ++++++ newrelic/core/config.py | 18 +++++++ newrelic/core/external_node.py | 7 ++- newrelic/core/function_node.py | 8 ++- newrelic/core/loop_node.py | 8 ++- newrelic/core/node_mixin.py | 85 ++++++++++++++++++++++++------ newrelic/core/root_node.py | 8 +-- newrelic/core/stats_engine.py | 37 ++++++++++--- newrelic/core/transaction_node.py | 12 ++--- 13 files changed, 246 insertions(+), 51 deletions(-) diff --git a/newrelic/api/transaction.py b/newrelic/api/transaction.py index 28d3a07638..ddf20855c6 100644 --- a/newrelic/api/transaction.py +++ b/newrelic/api/transaction.py @@ -658,6 +658,13 @@ def __exit__(self, exc, value, tb): def sampled(self): return self._sampled + @property + def ct_sampled(self): + # If DT doesn't sample it CT will. + if not self.sampled and (self._settings.distributed_tracing.drop_inprocess_spans.enabled or self._settings.distributed_tracing.unique_spans.enabled): + return True + return False + @property def priority(self): return self._priority @@ -1082,12 +1089,13 @@ def _create_distributed_trace_data(self): return self._compute_sampled_and_priority() + sampled = self.sampled or self.ct_sampled data = { "ty": "App", "ac": account_id, "ap": application_id, "tr": self.trace_id, - "sa": self.sampled, + "sa": sampled, "pr": self.priority, "tx": self.guid, "ti": int(time.time() * 1000.0), diff --git a/newrelic/common/streaming_utils.py b/newrelic/common/streaming_utils.py index fbb6f5b104..ca21f8be0b 100644 --- a/newrelic/common/streaming_utils.py +++ b/newrelic/common/streaming_utils.py @@ -15,6 +15,7 @@ import collections import logging import threading +import sys try: from newrelic.core.infinite_tracing_pb2 import AttributeValue, SpanBatch @@ -25,6 +26,67 @@ _logger = logging.getLogger(__name__) +def get_deep_size(obj, seen=None): + """Recursively calculates the size of an object including nested lists and dicts.""" + if seen is None: + seen = set() + size = -8*3 # Subtract 8 for each of the 3 attribute lists as those don't count. + else: + size = 0 + + # Avoid recursion for already seen objects (handle circular references) + obj_id = id(obj) + if obj_id in seen: + return 0 + seen.add(obj_id) + + if isinstance(obj, str): + size += len(obj) + return size + elif isinstance(obj, float) or isinstance(obj, int): + size += 8 + return size + elif isinstance(obj, bool): + size += 1 + return size + elif isinstance(obj, dict): + size += sum(get_deep_size(k, seen) + get_deep_size(v, seen) for k, v in obj.items()) + elif isinstance(obj, (list, tuple, set, frozenset)): + size += 8 + sum(get_deep_size(i, seen) for i in obj) + else: + size += 8 + + return size + + +def get_deep_size_protobuf(obj): + """Recursively calculates the size of an object including nested lists and dicts.""" + size = 0 + if hasattr(obj, "string_value"): + size += len(obj.string_value) + return size + elif hasattr(obj, "double_value"): + size += 8 + return size + elif hasattr(obj, "int_value"): + size += 8 + return size + elif hasattr(obj, "bool_value"): + size += 1 + return size + + if hasattr(obj, "agent_attributes"): + size += sum(len(k) + get_deep_size_protobuf(v) for k, v in obj.agent_attributes.items()) + if hasattr(obj, "user_attributes"): + size += sum(len(k) + get_deep_size_protobuf(v) for k, v in obj.user_attributes.items()) + if hasattr(obj, "intrinsics"): + size += sum(len(k) + get_deep_size_protobuf(v) for k, v in obj.intrinsics.items()) + else: + size += 8 + + return size + + class StreamBuffer: def __init__(self, maxlen, batching=False): self._queue = collections.deque(maxlen=maxlen) @@ -32,6 +94,8 @@ def __init__(self, maxlen, batching=False): self._shutdown = False self._seen = 0 self._dropped = 0 + self._bytes = 0 + self._ct_processing_time = 0 self._settings = None self.batching = batching @@ -51,6 +115,8 @@ def put(self, item): return self._seen += 1 + _logger.debug(f"{item.intrinsics['name']} [{len(item.intrinsics)}, {len(item.user_attributes)}, {len(item.agent_attributes)}] {get_deep_size_protobuf(item)}") + self._bytes += get_deep_size_protobuf(item) # NOTE: dropped can be over-counted as the queue approaches # capacity while data is still being transmitted. @@ -67,8 +133,10 @@ def stats(self): with self._notify: seen, dropped = self._seen, self._dropped self._seen, self._dropped = 0, 0 + _bytes, ct_processing_time = self._bytes, self._ct_processing_time + self._bytes, self._ct_processing_time = 0, 0 - return seen, dropped + return seen, dropped, _bytes, ct_processing_time def __bool__(self): return bool(self._queue) diff --git a/newrelic/config.py b/newrelic/config.py index b5b47b4f3a..43f1f95eae 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -398,6 +398,9 @@ def _process_configuration(section): _process_setting(section, "custom_insights_events.max_attribute_value", "getint", None) _process_setting(section, "ml_insights_events.enabled", "getboolean", None) _process_setting(section, "distributed_tracing.enabled", "getboolean", None) + _process_setting(section, "distributed_tracing.drop_inprocess_spans.enabled", "getboolean", None) + _process_setting(section, "distributed_tracing.unique_spans.enabled", "getboolean", None) + _process_setting(section, "distributed_tracing.minimize_attributes.enabled", "getboolean", None) _process_setting(section, "distributed_tracing.exclude_newrelic_header", "getboolean", None) _process_setting(section, "span_events.enabled", "getboolean", None) _process_setting(section, "span_events.max_samples_stored", "getint", None) diff --git a/newrelic/core/application.py b/newrelic/core/application.py index 43fdddc0ed..fd11985d3d 100644 --- a/newrelic/core/application.py +++ b/newrelic/core/application.py @@ -507,7 +507,11 @@ def connect_to_data_collector(self, activate_agent): sampling_target_period = 60.0 else: sampling_target_period = configuration.sampling_target_period_in_seconds - self.adaptive_sampler = AdaptiveSampler(configuration.sampling_target, sampling_target_period) + sampling_target = configuration.sampling_target + # If span reduction is enabled double the transaction reservoir size. + if configuration.distributed_tracing.drop_inprocess_spans.enabled or configuration.distributed_tracing.unique_spans.enabled: + sampling_target = configuration.sampling_target*2 + self.adaptive_sampler = AdaptiveSampler(sampling_target, sampling_target_period) active_session.connect_span_stream(self._stats_engine.span_stream, self.record_custom_metric) @@ -1367,11 +1371,14 @@ def harvest(self, shutdown=False, flexible=False): span_stream = stats.span_stream # Only merge stats as part of default harvest if span_stream is not None and not flexible: - spans_seen, spans_dropped = span_stream.stats() + spans_seen, spans_dropped, _bytes, ct_processing_time = span_stream.stats() spans_sent = spans_seen - spans_dropped internal_count_metric("Supportability/InfiniteTracing/Span/Seen", spans_seen) internal_count_metric("Supportability/InfiniteTracing/Span/Sent", spans_sent) + print(f"spans sent: {spans_sent}") + internal_count_metric("Supportability/InfiniteTracing/Bytes/Seen", _bytes) + internal_count_metric("Supportability/CoreTracing/TotalTime", ct_processing_time*1000) # Time in ms. else: spans = stats.span_events if spans: @@ -1388,6 +1395,9 @@ def harvest(self, shutdown=False, flexible=False): spans_sampled = spans.num_samples internal_count_metric("Supportability/SpanEvent/TotalEventsSeen", spans_seen) internal_count_metric("Supportability/SpanEvent/TotalEventsSent", spans_sampled) + print(f"spans sent: {spans_sampled}") + internal_count_metric("Supportability/DistributedTracing/Bytes/Seen", spans.bytes) + internal_count_metric("Supportability/SpanEvent/TotalCoreTracingTime", spans.ct_processing_time*1000) # Time in ms. stats.reset_span_events() diff --git a/newrelic/core/attribute.py b/newrelic/core/attribute.py index 9cc14cfb29..bb313d8531 100644 --- a/newrelic/core/attribute.py +++ b/newrelic/core/attribute.py @@ -102,6 +102,23 @@ "server.address", } +SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES = { + "cloud.account.id", + "cloud.platform", + "cloud.region", + "cloud.resource_id", + "db.instance", + "db.system", + "http.url", + "messaging.destination.name", + "messaging.system", + "peer.hostname", + "server.address", + "server.port", + "span.kind", +} + + MAX_NUM_USER_ATTRIBUTES = 128 MAX_ATTRIBUTE_LENGTH = 255 MAX_NUM_ML_USER_ATTRIBUTES = 64 diff --git a/newrelic/core/config.py b/newrelic/core/config.py index 3ee20cd67f..1f6b88d5f1 100644 --- a/newrelic/core/config.py +++ b/newrelic/core/config.py @@ -324,6 +324,18 @@ class DistributedTracingSettings(Settings): pass +class DistributedTracingDropInprocessSpansSettings(Settings): + pass + + +class DistributedTracingUniqueSpansSettings(Settings): + pass + + +class DistributedTracingMinimizeAttributesSettings(Settings): + pass + + class ServerlessModeSettings(Settings): pass @@ -493,6 +505,9 @@ class EventHarvestConfigHarvestLimitSettings(Settings): _settings.datastore_tracer.instance_reporting = DatastoreTracerInstanceReportingSettings() _settings.debug = DebugSettings() _settings.distributed_tracing = DistributedTracingSettings() +_settings.distributed_tracing.drop_inprocess_spans = DistributedTracingDropInprocessSpansSettings() +_settings.distributed_tracing.unique_spans = DistributedTracingUniqueSpansSettings() +_settings.distributed_tracing.minimize_attributes = DistributedTracingMinimizeAttributesSettings() _settings.error_collector = ErrorCollectorSettings() _settings.error_collector.attributes = ErrorCollectorAttributesSettings() _settings.event_harvest_config = EventHarvestConfigSettings() @@ -814,6 +829,9 @@ def default_otlp_host(host): _settings.ml_insights_events.enabled = False _settings.distributed_tracing.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_ENABLED", default=True) +_settings.distributed_tracing.drop_inprocess_spans.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_DROP_INPROCESS_SPANS_ENABLED", default=True) +_settings.distributed_tracing.unique_spans.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_UNIQUE_SPANS_ENABLED", default=False) +_settings.distributed_tracing.minimize_attributes.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_MINIMIZE_ATTRIBUTES_ENABLED", default=True) _settings.distributed_tracing.exclude_newrelic_header = False _settings.span_events.enabled = _environ_as_bool("NEW_RELIC_SPAN_EVENTS_ENABLED", default=True) _settings.span_events.attributes.enabled = True diff --git a/newrelic/core/external_node.py b/newrelic/core/external_node.py index 547503a4dc..7080666dbe 100644 --- a/newrelic/core/external_node.py +++ b/newrelic/core/external_node.py @@ -169,11 +169,10 @@ def trace_node(self, stats, root, connections): start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=None ) - def span_event(self, *args, **kwargs): + def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, *args, **kwargs): self.agent_attributes["http.url"] = self.http_url - attrs = super().span_event(*args, **kwargs) - i_attrs = attrs[0] + i_attrs = base_attrs and base_attrs.copy() or attr_class() i_attrs["category"] = "http" i_attrs["span.kind"] = "client" _, i_attrs["component"] = attribute.process_user_attribute("component", self.library) @@ -181,4 +180,4 @@ def span_event(self, *args, **kwargs): if self.method: _, i_attrs["http.method"] = attribute.process_user_attribute("http.method", self.method) - return attrs + return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs) diff --git a/newrelic/core/function_node.py b/newrelic/core/function_node.py index 809f26742c..2973749c46 100644 --- a/newrelic/core/function_node.py +++ b/newrelic/core/function_node.py @@ -114,10 +114,8 @@ def trace_node(self, stats, root, connections): start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=self.label ) - def span_event(self, *args, **kwargs): - attrs = super().span_event(*args, **kwargs) - i_attrs = attrs[0] - + def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, *args, **kwargs): + i_attrs = base_attrs and base_attrs.copy() or attr_class() i_attrs["name"] = f"{self.group}/{self.name}" - return attrs + return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs) diff --git a/newrelic/core/loop_node.py b/newrelic/core/loop_node.py index b9328e7013..7a9ae1c371 100644 --- a/newrelic/core/loop_node.py +++ b/newrelic/core/loop_node.py @@ -79,10 +79,8 @@ def trace_node(self, stats, root, connections): start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=None ) - def span_event(self, *args, **kwargs): - attrs = super().span_event(*args, **kwargs) - i_attrs = attrs[0] - + def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, *args, **kwargs): + i_attrs = base_attrs and base_attrs.copy() or attr_class() i_attrs["name"] = f"EventLoop/Wait/{self.name}" - return attrs + return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs) diff --git a/newrelic/core/node_mixin.py b/newrelic/core/node_mixin.py index 114b6f23e0..d5c3fecdd7 100644 --- a/newrelic/core/node_mixin.py +++ b/newrelic/core/node_mixin.py @@ -11,12 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import time from newrelic.core import attribute from newrelic.core.attribute_filter import DST_SPAN_EVENTS, DST_TRANSACTION_SEGMENTS class GenericNodeMixin: + def __init__(self, *args, **kwargs): + self.ids = [] + @property def processed_user_attributes(self): if hasattr(self, "_processed_user_attributes"): @@ -49,14 +52,20 @@ def get_trace_segment_params(self, settings, params=None): _params["exclusive_duration_millis"] = 1000.0 * self.exclusive return _params - def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): + def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, ct_exit_spans=None, ct_processing_time=None): + if ct_exit_spans is None: + ct_exit_spans = {} i_attrs = base_attrs and base_attrs.copy() or attr_class() i_attrs["type"] = "Span" - i_attrs["name"] = self.name + i_attrs["name"] = i_attrs.get("name") or self.name i_attrs["guid"] = self.guid i_attrs["timestamp"] = int(self.start_time * 1000) i_attrs["duration"] = self.duration - i_attrs["category"] = "generic" + i_attrs["category"] = i_attrs.get("category") or "generic" + # TODO: limit intrinsic attributes but this likely requires changes in the pipeline. + #if settings.distributed_tracing.minimize_attributes.enabled: + # i_ct_attrs = {"type", "name", "guid", "parentId", "transaction.name", "traceId", "timestamp", "duration", "nr.entryPoint", "transactionId"} + # i_attrs = {key: value for key, value in i_attrs.items() if key in i_ct_attrs} if parent_guid: i_attrs["parentId"] = parent_guid @@ -64,22 +73,65 @@ def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dic a_attrs = attribute.resolve_agent_attributes( self.agent_attributes, settings.attribute_filter, DST_SPAN_EVENTS, attr_class=attr_class ) + u_attrs = self.processed_user_attributes + if settings.distributed_tracing.unique_spans.enabled: + # ids is the list of span guids that share this unqiue exit span. + u_attrs["ids"] = self.ids u_attrs = attribute.resolve_user_attributes( - self.processed_user_attributes, settings.attribute_filter, DST_SPAN_EVENTS, attr_class=attr_class + u_attrs, settings.attribute_filter, DST_SPAN_EVENTS, attr_class=attr_class ) - # intrinsics, user attrs, agent attrs + start_time = time.time() + if settings.distributed_tracing.drop_inprocess_spans.enabled or settings.distributed_tracing.unique_spans.enabled: + exit_span_attrs_present = attribute.SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES & set(a_attrs) + # If this is the entry node, always return it. + if i_attrs.get("nr.entryPoint"): + ct_processing_time[0] += (time.time() - start_time) + return [i_attrs, u_attrs, {}] if settings.distributed_tracing.minimize_attributes.enabled else [i_attrs, u_attrs, a_attrs] + # If the span is not an exit span, skip it by returning None. + if not exit_span_attrs_present: + ct_processing_time[0] += (time.time() - start_time) + return None + # If the span is an exit span but unique spans is enabled, we need to check + # for uniqueness before returning it. + if settings.distributed_tracing.unique_spans.enabled: + a_minimized_attrs = attr_class({key: a_attrs[key] for key in exit_span_attrs_present}) + # Combine all the entity relationship attr values into a string to be + # used as the hash to check for uniqueness. + # TODO: use attr value name rather than str casting for infinite tracing. + span_attrs = "".join([str(a_minimized_attrs[key]) for key in exit_span_attrs_present]) + new_exit_span = span_attrs not in ct_exit_spans + # If this is a new exit span, add it to the known ct_exit_spans and return it. + if new_exit_span: + ct_exit_spans[span_attrs] = self.ids + ct_processing_time[0] += (time.time() - start_time) + return [i_attrs, u_attrs, a_minimized_attrs] if settings.distributed_tracing.minimize_attributes.enabled else [i_attrs, u_attrs, a_attrs] + # If this is an exit span we've already seen, add it's guid to the list + # of ids on the seen span and return None. + # For now add ids to user attributes list + ct_exit_spans[span_attrs].append(self.guid) + ct_processing_time[0] += (time.time() - start_time) + return None + elif settings.distributed_tracing.minimize_attributes.enabled: + # Drop all non-entity relationship attributes from the span. + exit_span_attrs_present = attribute.SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES & set(a_attrs) + a_attrs = attr_class({key: a_attrs[key] for key in exit_span_attrs_present}) + ct_processing_time[0] += (time.time() - start_time) return [i_attrs, u_attrs, a_attrs] - def span_events(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): - yield self.span_event(settings, base_attrs=base_attrs, parent_guid=parent_guid, attr_class=attr_class) - + def span_events(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, ct_exit_spans=None, ct_processing_time=None): + span = self.span_event(settings, base_attrs=base_attrs, parent_guid=parent_guid, attr_class=attr_class, ct_exit_spans=ct_exit_spans, ct_processing_time=ct_processing_time) + parent_id = parent_guid + if span: # span will be None if the span is an inprocess span or repeated exit span. + yield span + parent_id = self.guid for child in self.children: for event in child.span_events( # noqa: UP028 - settings, base_attrs=base_attrs, parent_guid=self.guid, attr_class=attr_class + settings, base_attrs=base_attrs, parent_guid=parent_id, attr_class=attr_class, ct_exit_spans=ct_exit_spans, ct_processing_time=ct_processing_time ): - yield event + if event: # event will be None if the span is an inprocess span or repeated exit span. + yield event class DatastoreNodeMixin(GenericNodeMixin): @@ -108,11 +160,10 @@ def db_instance(self): self._db_instance = db_instance_attr return db_instance_attr - def span_event(self, *args, **kwargs): - self.agent_attributes["db.instance"] = self.db_instance - attrs = super().span_event(*args, **kwargs) - i_attrs = attrs[0] - a_attrs = attrs[2] + def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, *args, **kwargs): + a_attrs = self.agent_attributes + a_attrs["db.instance"] = self.db_instance + i_attrs = base_attrs and base_attrs.copy() or attr_class() i_attrs["category"] = "datastore" i_attrs["span.kind"] = "client" @@ -141,4 +192,4 @@ def span_event(self, *args, **kwargs): except Exception: pass - return attrs + return super().span_event(settings, i_attrs=base_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs) diff --git a/newrelic/core/root_node.py b/newrelic/core/root_node.py index 1591afa3ad..030ea07a62 100644 --- a/newrelic/core/root_node.py +++ b/newrelic/core/root_node.py @@ -37,16 +37,16 @@ class RootNode(_RootNode, GenericNodeMixin): - def span_event(self, *args, **kwargs): - span = super().span_event(*args, **kwargs) - i_attrs = span[0] + def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, *args, **kwargs): + i_attrs = base_attrs and base_attrs.copy() or attr_class() i_attrs["transaction.name"] = self.path i_attrs["nr.entryPoint"] = True if self.trusted_parent_span: i_attrs["trustedParentId"] = self.trusted_parent_span if self.tracing_vendors: i_attrs["tracingVendors"] = self.tracing_vendors - return span + + return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs) def trace_node(self, stats, root, connections): name = self.path diff --git a/newrelic/core/stats_engine.py b/newrelic/core/stats_engine.py index bcb1fb35d0..bb987438f7 100644 --- a/newrelic/core/stats_engine.py +++ b/newrelic/core/stats_engine.py @@ -36,7 +36,7 @@ from newrelic.common.encoding_utils import json_encode from newrelic.common.metric_utils import create_metric_identity from newrelic.common.object_names import parse_exc_info -from newrelic.common.streaming_utils import StreamBuffer +from newrelic.common.streaming_utils import StreamBuffer, get_deep_size from newrelic.core.attribute import ( MAX_LOG_MESSAGE_LENGTH, create_agent_attributes, @@ -446,6 +446,26 @@ def merge(self, other_data_set, priority=None): self.num_seen += other_data_set.num_seen - other_data_set.num_samples +class SpanSampledDataSet(SampledDataSet): + def __init__(self, capacity=100): + super().__init__(capacity=capacity) + self.ct_processing_time = 0 + self.bytes = 0 + + def add(self, sample, priority=None): + super().add(sample=sample, priority=priority) + _logger.debug(f"{sample[0]['name']} [{len(sample[0])}, {len(sample[1])}, {len(sample[2])}] {get_deep_size(sample)}") + self.bytes += get_deep_size(sample) + + def reset(self): + super().reset() + self.ct_processing_time = 0 + + def merge(self, other_data_set, priority=None): + super().merge(other_data_set=other_data_set, priority=priority) + self.ct_processing_time += other_data_set.ct_processing_time + + class LimitedDataSet(list): def __init__(self, capacity=200): super().__init__() @@ -529,7 +549,7 @@ def __init__(self): self._error_events = SampledDataSet() self._custom_events = SampledDataSet() self._ml_events = SampledDataSet() - self._span_events = SampledDataSet() + self._span_events = SpanSampledDataSet() self._log_events = SampledDataSet() self._span_stream = None self.__sql_stats_table = {} @@ -1196,11 +1216,16 @@ def record_transaction(self, transaction): if settings.distributed_tracing.enabled and settings.span_events.enabled and settings.collect_span_events: if settings.infinite_tracing.enabled: - for event in transaction.span_protos(settings): + ct_processing_time = [0] # Hack for getting Python to create a non mutable number. + for event in transaction.span_protos(settings, ct_processing_time=ct_processing_time): self._span_stream.put(event) + self._span_stream._ct_processing_time += ct_processing_time[0] elif transaction.sampled: - for event in transaction.span_events(self.__settings): + ct_processing_time = [0] # Hack for getting Python to create a non mutable number. + for event in transaction.span_events(self.__settings, ct_processing_time=ct_processing_time): self._span_events.add(event, priority=transaction.priority) + self._span_events.ct_processing_time += ct_processing_time[0] + # Merge in log events @@ -1741,9 +1766,9 @@ def reset_ml_events(self): def reset_span_events(self): if self.__settings is not None: - self._span_events = SampledDataSet(self.__settings.event_harvest_config.harvest_limits.span_event_data) + self._span_events = SpanSampledDataSet(self.__settings.event_harvest_config.harvest_limits.span_event_data) else: - self._span_events = SampledDataSet() + self._span_events = SpanSampledDataSet() def reset_log_events(self): if self.__settings is not None: diff --git a/newrelic/core/transaction_node.py b/newrelic/core/transaction_node.py index 34871d8b21..c767f57129 100644 --- a/newrelic/core/transaction_node.py +++ b/newrelic/core/transaction_node.py @@ -620,11 +620,11 @@ def _add_call_count(source, target): return intrinsics - def span_protos(self, settings): - for i_attrs, u_attrs, a_attrs in self.span_events(settings, attr_class=SpanProtoAttrs): - yield Span(trace_id=self.trace_id, intrinsics=i_attrs, user_attributes=u_attrs, agent_attributes=a_attrs) + def span_protos(self, settings, ct_processing_time=None): + for span in self.span_events(settings, attr_class=SpanProtoAttrs, ct_processing_time=ct_processing_time): + yield Span(trace_id=self.trace_id, intrinsics=span[0], user_attributes=span[1], agent_attributes=span[2]) - def span_events(self, settings, attr_class=dict): + def span_events(self, settings, attr_class=dict, ct_processing_time=None): base_attrs = attr_class( ( ("transactionId", self.guid), @@ -633,5 +633,5 @@ def span_events(self, settings, attr_class=dict): ("priority", self.priority), ) ) - - yield from self.root.span_events(settings, base_attrs, parent_guid=self.parent_span, attr_class=attr_class) + ct_exit_spans = {} + yield from self.root.span_events(settings, base_attrs, parent_guid=self.parent_span, attr_class=attr_class, ct_exit_spans=ct_exit_spans, ct_processing_time=ct_processing_time) From 55ee8ab36633053d74e790601699269209a699eb Mon Sep 17 00:00:00 2001 From: Hannah Stepanek Date: Mon, 28 Jul 2025 13:46:35 -0700 Subject: [PATCH 2/3] Make new config options off by default --- newrelic/core/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/newrelic/core/config.py b/newrelic/core/config.py index 1f6b88d5f1..6898037bbb 100644 --- a/newrelic/core/config.py +++ b/newrelic/core/config.py @@ -829,9 +829,9 @@ def default_otlp_host(host): _settings.ml_insights_events.enabled = False _settings.distributed_tracing.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_ENABLED", default=True) -_settings.distributed_tracing.drop_inprocess_spans.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_DROP_INPROCESS_SPANS_ENABLED", default=True) +_settings.distributed_tracing.drop_inprocess_spans.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_DROP_INPROCESS_SPANS_ENABLED", default=False) _settings.distributed_tracing.unique_spans.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_UNIQUE_SPANS_ENABLED", default=False) -_settings.distributed_tracing.minimize_attributes.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_MINIMIZE_ATTRIBUTES_ENABLED", default=True) +_settings.distributed_tracing.minimize_attributes.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_MINIMIZE_ATTRIBUTES_ENABLED", default=False) _settings.distributed_tracing.exclude_newrelic_header = False _settings.span_events.enabled = _environ_as_bool("NEW_RELIC_SPAN_EVENTS_ENABLED", default=True) _settings.span_events.attributes.enabled = True From 119e5fccff2d59eae5d933c58724cc833350aeab Mon Sep 17 00:00:00 2001 From: Hannah Stepanek Date: Tue, 29 Jul 2025 10:28:00 -0700 Subject: [PATCH 3/3] Fixup --- newrelic/core/node_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newrelic/core/node_mixin.py b/newrelic/core/node_mixin.py index d5c3fecdd7..38eae87d8a 100644 --- a/newrelic/core/node_mixin.py +++ b/newrelic/core/node_mixin.py @@ -192,4 +192,4 @@ def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dic except Exception: pass - return super().span_event(settings, i_attrs=base_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs) + return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs)