diff --git a/newrelic/api/transaction.py b/newrelic/api/transaction.py index f90bbde143..f8a9f329f5 100644 --- a/newrelic/api/transaction.py +++ b/newrelic/api/transaction.py @@ -637,6 +637,7 @@ def __exit__(self, exc, value, tb): trace_id=self.trace_id, loop_time=self._loop_time, root=root_node, + partial_granularity_sampled=hasattr(self, "partial_granularity_sampled"), ) # Clear settings as we are all done and don't need it @@ -1073,23 +1074,52 @@ def _make_sampling_decision(self): return priority = self._priority sampled = self._sampled - _logger.debug( - "Full granularity tracing is enabled. Asking if full granularity wants to sample. priority=%s, sampled=%s", - priority, - sampled, - ) - computed_priority, computed_sampled = self._compute_sampled_and_priority( - priority, - sampled, - remote_parent_sampled_path="distributed_tracing.sampler.remote_parent_sampled", - remote_parent_sampled_setting=self.settings.distributed_tracing.sampler.remote_parent_sampled, - remote_parent_not_sampled_path="distributed_tracing.sampler.remote_parent_not_sampled", - remote_parent_not_sampled_setting=self.settings.distributed_tracing.sampler.remote_parent_not_sampled, - ) - _logger.debug("Full granularity sampling decision was %s with priority=%s.", sampled, priority) - self._priority = computed_priority - self._sampled = computed_sampled - self._sampling_decision_made = True + # Compute sampling decision for full granularity. + if self.settings.distributed_tracing.sampler.full_granularity.enabled: + _logger.debug( + "Full granularity tracing is enabled. Asking if full granularity wants to sample. priority=%s, sampled=%s", + priority, + sampled, + ) + computed_priority, computed_sampled = self._compute_sampled_and_priority( + priority, + sampled, + remote_parent_sampled_path="distributed_tracing.sampler.full_granularity.remote_parent_sampled", + remote_parent_sampled_setting=self.settings.distributed_tracing.sampler.full_granularity.remote_parent_sampled, + remote_parent_not_sampled_path="distributed_tracing.sampler.full_granularity.remote_parent_not_sampled", + remote_parent_not_sampled_setting=self.settings.distributed_tracing.sampler.full_granularity.remote_parent_not_sampled, + ) + _logger.debug("Full granularity sampling decision was %s with priority=%s.", sampled, priority) + if computed_sampled or not self.settings.distributed_tracing.sampler.partial_granularity.enabled: + self._priority = computed_priority + self._sampled = computed_sampled + self._sampling_decision_made = True + return + + # If full granularity is not going to sample, let partial granularity decide. + if self.settings.distributed_tracing.sampler.partial_granularity.enabled: + _logger.debug("Partial granularity tracing is enabled. Asking if partial granularity wants to sample.") + self._priority, self._sampled = self._compute_sampled_and_priority( + priority, + sampled, + remote_parent_sampled_path="distributed_tracing.sampler.partial_granularity.remote_parent_sampled", + remote_parent_sampled_setting=self.settings.distributed_tracing.sampler.partial_granularity.remote_parent_sampled, + remote_parent_not_sampled_path="distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled", + remote_parent_not_sampled_setting=self.settings.distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled, + ) + _logger.debug( + "Partial granularity sampling decision was %s with priority=%s.", self._sampled, self._priority + ) + self._sampling_decision_made = True + if self._sampled: + self.partial_granularity_sampled = True + return + + # This is only reachable if both full and partial granularity tracing are off. + # Set priority=0 and do not sample. This enables DT headers to still be sent + # even if the trace is never sampled. + self._priority = 0 + self._sampled = False def _freeze_path(self): if self._frozen_path is None: diff --git a/newrelic/config.py b/newrelic/config.py index 5367538695..41d118961f 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -319,6 +319,47 @@ def _process_setting(section, option, getter, mapper): _raise_configuration_error(section, option) +def _process_dt_setting(section, option_p1, option_p2, getter): + try: + # The type of a value is dictated by the getter + # function supplied. + + value1 = getattr(_config_object, getter)(section, option_p1) + value2 = getattr(_config_object, getter)(section, option_p2) + + # Now need to apply the option from the + # configuration file to the internal settings + # object. Walk the object path and assign it. + + target = _settings + fields = option_p1.split(".", 1) + + while True: + if len(fields) == 1: + value = value1 or value2 or "default" + setattr(target, fields[0], value) + break + target = getattr(target, fields[0]) + fields = fields[1].split(".", 1) + + # Cache the configuration so can be dumped out to + # log file when whole main configuration has been + # processed. This ensures that the log file and log + # level entries have been set. + + _cache_object.append((option_p1, value1)) + _cache_object.append((option_p2, value2)) + + except configparser.NoSectionError: + pass + + except configparser.NoOptionError: + pass + + except Exception: + _raise_configuration_error(section, option_p1) + + # Processing of all the settings for specified section except # for log file and log level which are applied separately to # ensure they are set as soon as possible. @@ -405,8 +446,23 @@ def _process_configuration(section): _process_setting(section, "distributed_tracing.enabled", "getboolean", None) _process_setting(section, "distributed_tracing.exclude_newrelic_header", "getboolean", None) _process_setting(section, "distributed_tracing.sampler.adaptive_sampling_target", "getint", None) - _process_setting(section, "distributed_tracing.sampler.remote_parent_sampled", "get", None) - _process_setting(section, "distributed_tracing.sampler.remote_parent_not_sampled", "get", None) + _process_dt_setting( + section, + "distributed_tracing.sampler.full_granularity.remote_parent_sampled", + "distributed_tracing.sampler.remote_parent_sampled", + "get", + ) + _process_dt_setting( + section, + "distributed_tracing.sampler.full_granularity.remote_parent_not_sampled", + "distributed_tracing.sampler.remote_parent_not_sampled", + "get", + ) + _process_setting(section, "distributed_tracing.sampler.full_granularity.enabled", "getboolean", None) + _process_setting(section, "distributed_tracing.sampler.partial_granularity.enabled", "getboolean", None) + _process_setting(section, "distributed_tracing.sampler.partial_granularity.type", "get", None) + _process_setting(section, "distributed_tracing.sampler.partial_granularity.remote_parent_sampled", "get", None) + _process_setting(section, "distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled", "get", None) _process_setting(section, "span_events.enabled", "getboolean", None) _process_setting(section, "span_events.max_samples_stored", "getint", None) _process_setting(section, "span_events.attributes.enabled", "getboolean", None) diff --git a/newrelic/core/application.py b/newrelic/core/application.py index 3ba8168d60..fe5d0af186 100644 --- a/newrelic/core/application.py +++ b/newrelic/core/application.py @@ -1373,6 +1373,8 @@ def harvest(self, shutdown=False, flexible=False): spans_sampled = spans.num_samples internal_count_metric("Supportability/SpanEvent/TotalEventsSeen", spans_seen) internal_count_metric("Supportability/SpanEvent/TotalEventsSent", spans_sampled) + if configuration.distributed_tracing.sampler.partial_granularity.enabled: + internal_count_metric(f"Supportability/Python/PartialGranularity/{configuration.distributed_tracing.sampler.partial_granularity.type}", 1) stats.reset_span_events() diff --git a/newrelic/core/attribute.py b/newrelic/core/attribute.py index 785a2fa0ec..49bc890a80 100644 --- a/newrelic/core/attribute.py +++ b/newrelic/core/attribute.py @@ -109,6 +109,23 @@ "zeebe.client.resourceFile", } +SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES = { + "cloud.account.id", + "cloud.platform", + "cloud.region", + "cloud.resource_id", + "db.instance", + "db.system", + "http.url", + "messaging.destination.name", + "messaging.system", + "peer.hostname", + "server.address", + "server.port", + "span.kind", +} + + MAX_NUM_USER_ATTRIBUTES = 128 MAX_ATTRIBUTE_LENGTH = 255 MAX_NUM_ML_USER_ATTRIBUTES = 64 diff --git a/newrelic/core/config.py b/newrelic/core/config.py index 63007a7317..fe5c9b5872 100644 --- a/newrelic/core/config.py +++ b/newrelic/core/config.py @@ -337,6 +337,14 @@ class DistributedTracingSamplerSettings(Settings): pass +class DistributedTracingSamplerFullGranularitySettings(Settings): + pass + + +class DistributedTracingSamplerPartialGranularitySettings(Settings): + pass + + class ServerlessModeSettings(Settings): pass @@ -507,6 +515,8 @@ class EventHarvestConfigHarvestLimitSettings(Settings): _settings.debug = DebugSettings() _settings.distributed_tracing = DistributedTracingSettings() _settings.distributed_tracing.sampler = DistributedTracingSamplerSettings() +_settings.distributed_tracing.sampler.full_granularity = DistributedTracingSamplerFullGranularitySettings() +_settings.distributed_tracing.sampler.partial_granularity = DistributedTracingSamplerPartialGranularitySettings() _settings.error_collector = ErrorCollectorSettings() _settings.error_collector.attributes = ErrorCollectorAttributesSettings() _settings.event_harvest_config = EventHarvestConfigSettings() @@ -845,11 +855,26 @@ def default_otlp_host(host): _settings.distributed_tracing.sampler.adaptive_sampling_target = _environ_as_int( "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_ADAPTIVE_SAMPLING_TARGET", default=10 ) -_settings.distributed_tracing.sampler.remote_parent_sampled = os.environ.get( - "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_SAMPLED", "default" +_settings.distributed_tracing.sampler.full_granularity.enabled = _environ_as_bool( + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_FULL_GRANULARITY_ENABLED", default=True ) -_settings.distributed_tracing.sampler.remote_parent_not_sampled = os.environ.get( - "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_NOT_SAMPLED", "default" +_settings.distributed_tracing.sampler.full_granularity.remote_parent_sampled = os.environ.get( + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_FULL_GRANULARITY_REMOTE_PARENT_SAMPLED", None +) or os.environ.get("NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_SAMPLED", "default") +_settings.distributed_tracing.sampler.full_granularity.remote_parent_not_sampled = os.environ.get( + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_FULL_GRANULARITY_REMOTE_PARENT_NOT_SAMPLED", None +) or os.environ.get("NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_NOT_SAMPLED", "default") +_settings.distributed_tracing.sampler.partial_granularity.enabled = _environ_as_bool( + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_PARTIAL_GRANULARITY_ENABLED", default=False +) +_settings.distributed_tracing.sampler.partial_granularity.type = os.environ.get( + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_PARTIAL_GRANULARITY_TYPE", "essential" +) +_settings.distributed_tracing.sampler.partial_granularity.remote_parent_sampled = os.environ.get( + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_PARTIAL_GRANULARITY_REMOTE_PARENT_SAMPLED", "default" +) +_settings.distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled = os.environ.get( + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_PARTIAL_GRANULARITY_REMOTE_PARENT_NOT_SAMPLED", "default" ) _settings.distributed_tracing.exclude_newrelic_header = False _settings.span_events.enabled = _environ_as_bool("NEW_RELIC_SPAN_EVENTS_ENABLED", default=True) @@ -1366,6 +1391,16 @@ def apply_server_side_settings(server_side_config=None, settings=_settings): min(settings_snapshot.custom_insights_events.max_attribute_value, 4095), ) + # Partial granularity tracing is not available in infinite tracing mode. + if ( + settings_snapshot.infinite_tracing.enabled + and settings_snapshot.distributed_tracing.sampler.partial_granularity.enabled + ): + _logger.warning( + "Improper configuration. Infinite tracing cannot be enabled at the same time as partial granularity tracing. Setting distributed_tracing.sampler.partial_granularity.enabled=False." + ) + apply_config_setting(settings_snapshot, "distributed_tracing.sampler.partial_granularity.enabled", False) + # This will be removed at some future point # Special case for account_id which will be sent instead of # cross_process_id in the future diff --git a/newrelic/core/data_collector.py b/newrelic/core/data_collector.py index e481f1d6e7..c303fad90b 100644 --- a/newrelic/core/data_collector.py +++ b/newrelic/core/data_collector.py @@ -117,7 +117,14 @@ def send_ml_events(self, sampling_info, custom_event_data): def send_span_events(self, sampling_info, span_event_data): """Called to submit sample set for span events.""" - + # TODO: remove this later after list types are suported. + for span_event in span_event_data: + try: + ids = span_event[1].get("nr.ids") + if ids: + span_event[1]["nr.ids"] = ",".join(ids) + except: + pass payload = (self.agent_run_id, sampling_info, span_event_data) return self._protocol.send("span_event_data", payload) diff --git a/newrelic/core/database_node.py b/newrelic/core/database_node.py index 1f60add195..8e30e3fecf 100644 --- a/newrelic/core/database_node.py +++ b/newrelic/core/database_node.py @@ -279,7 +279,15 @@ def trace_node(self, stats, root, connections): start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=None ) - def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): + def span_event( + self, + settings, + base_attrs=None, + parent_guid=None, + attr_class=dict, + partial_granularity_sampled=False, + ct_exit_spans=None, + ): sql = self.formatted if sql: @@ -288,4 +296,11 @@ def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dic self.agent_attributes["db.statement"] = sql - return super().span_event(settings, base_attrs=base_attrs, parent_guid=parent_guid, attr_class=attr_class) + return super().span_event( + settings, + base_attrs=base_attrs, + parent_guid=parent_guid, + attr_class=attr_class, + partial_granularity_sampled=partial_granularity_sampled, + ct_exit_spans=ct_exit_spans, + ) diff --git a/newrelic/core/external_node.py b/newrelic/core/external_node.py index f47d634b3d..7251504bb1 100644 --- a/newrelic/core/external_node.py +++ b/newrelic/core/external_node.py @@ -169,7 +169,15 @@ def trace_node(self, stats, root, connections): start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=None ) - def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): + def span_event( + self, + settings, + base_attrs=None, + parent_guid=None, + attr_class=dict, + partial_granularity_sampled=False, + ct_exit_spans=None, + ): self.agent_attributes["http.url"] = self.http_url i_attrs = (base_attrs and base_attrs.copy()) or attr_class() @@ -180,4 +188,11 @@ def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dic if self.method: _, i_attrs["http.method"] = attribute.process_user_attribute("http.method", self.method) - return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class) + return super().span_event( + settings, + base_attrs=i_attrs, + parent_guid=parent_guid, + attr_class=attr_class, + partial_granularity_sampled=partial_granularity_sampled, + ct_exit_spans=ct_exit_spans, + ) diff --git a/newrelic/core/function_node.py b/newrelic/core/function_node.py index 588f675f31..2eab783ecc 100644 --- a/newrelic/core/function_node.py +++ b/newrelic/core/function_node.py @@ -114,8 +114,23 @@ def trace_node(self, stats, root, connections): start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=self.label ) - def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): + def span_event( + self, + settings, + base_attrs=None, + parent_guid=None, + attr_class=dict, + partial_granularity_sampled=False, + ct_exit_spans=None, + ): i_attrs = (base_attrs and base_attrs.copy()) or attr_class() i_attrs["name"] = f"{self.group}/{self.name}" - return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class) + return super().span_event( + settings, + base_attrs=i_attrs, + parent_guid=parent_guid, + attr_class=attr_class, + partial_granularity_sampled=partial_granularity_sampled, + ct_exit_spans=ct_exit_spans, + ) diff --git a/newrelic/core/loop_node.py b/newrelic/core/loop_node.py index 58d1b3a746..b562720a85 100644 --- a/newrelic/core/loop_node.py +++ b/newrelic/core/loop_node.py @@ -79,8 +79,23 @@ def trace_node(self, stats, root, connections): start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=None ) - def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): + def span_event( + self, + settings, + base_attrs=None, + parent_guid=None, + attr_class=dict, + partial_granularity_sampled=False, + ct_exit_spans=None, + ): i_attrs = (base_attrs and base_attrs.copy()) or attr_class() i_attrs["name"] = f"EventLoop/Wait/{self.name}" - return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class) + return super().span_event( + settings, + base_attrs=i_attrs, + parent_guid=parent_guid, + attr_class=attr_class, + partial_granularity_sampled=partial_granularity_sampled, + ct_exit_spans=ct_exit_spans, + ) diff --git a/newrelic/core/node_mixin.py b/newrelic/core/node_mixin.py index 9154cc8765..92f0975827 100644 --- a/newrelic/core/node_mixin.py +++ b/newrelic/core/node_mixin.py @@ -49,7 +49,15 @@ def get_trace_segment_params(self, settings, params=None): _params["exclusive_duration_millis"] = 1000.0 * self.exclusive return _params - def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): + def span_event( + self, + settings, + base_attrs=None, + parent_guid=None, + attr_class=dict, + partial_granularity_sampled=False, + ct_exit_spans=None, + ): i_attrs = (base_attrs and base_attrs.copy()) or attr_class() i_attrs["type"] = "Span" i_attrs["name"] = i_attrs.get("name") or self.name @@ -68,18 +76,115 @@ def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dic u_attrs = attribute.resolve_user_attributes( self.processed_user_attributes, settings.attribute_filter, DST_SPAN_EVENTS, attr_class=attr_class ) + if not partial_granularity_sampled: + # intrinsics, user attrs, agent attrs + return [i_attrs, u_attrs, a_attrs] + else: + if ct_exit_spans is None: + ct_exit_spans = {} - # intrinsics, user attrs, agent attrs - return [i_attrs, u_attrs, a_attrs] - - def span_events(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): - yield self.span_event(settings, base_attrs=base_attrs, parent_guid=parent_guid, attr_class=attr_class) + partial_granularity_type = settings.distributed_tracing.sampler.partial_granularity.type + exit_span_attrs_present = attribute.SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES & set(a_attrs) + # If this is the entry node or an LLM span always return it. + if i_attrs.get("nr.entryPoint") or i_attrs["name"].startswith("Llm/"): + if partial_granularity_type == "reduced": + return [i_attrs, u_attrs, a_attrs] + else: + return [i_attrs, {}, {}] + # If the span is not an exit span, skip it by returning None. + if not exit_span_attrs_present: + return None + # If the span is an exit span and we are in reduced mode (meaning no attribute dropping), + # just return the exit span as is. + if partial_granularity_type == "reduced": + return [i_attrs, u_attrs, a_attrs] + else: + a_minimized_attrs = attr_class({key: a_attrs[key] for key in exit_span_attrs_present}) + # If we are in essential mode return the span with minimized attributes. + if partial_granularity_type == "essential": + return [i_attrs, {}, a_minimized_attrs] + # If the span is an exit span but span compression (compact) is enabled, + # we need to check for uniqueness before returning it. + # Combine all the entity relationship attr values into a string to be + # used as the hash to check for uniqueness. + span_attrs = "".join([str(a_minimized_attrs[key]) for key in exit_span_attrs_present]) + new_exit_span = span_attrs not in ct_exit_spans + # If this is a new exit span, add it to the known ct_exit_spans and + # return it. + if new_exit_span: + # nr.ids is the list of span guids that share this unqiue exit span. + a_minimized_attrs["nr.ids"] = [] + a_minimized_attrs["nr.durations"] = self.duration + ct_exit_spans[span_attrs] = [i_attrs, a_minimized_attrs] + return [i_attrs, {}, a_minimized_attrs] + # If this is an exit span we've already seen, add it's guid to the list + # of ids on the seen span, compute the new duration & start time, and + # return None. + ct_exit_spans[span_attrs][1]["nr.ids"].append(self.guid) + # Max size for `nr.ids` = 1024. Max length = 63 (each span id is 16 bytes + 8 bytes for list type). + ct_exit_spans[span_attrs][1]["nr.ids"] = ct_exit_spans[span_attrs][1]["nr.ids"][:63] + # Compute the new start and end time for all compressed spans and use + # that to set the duration for all compressed spans. + current_start_time = ct_exit_spans[span_attrs][0]["timestamp"] + current_end_time = ( + ct_exit_spans[span_attrs][0]["timestamp"] / 1000 + ct_exit_spans[span_attrs][1]["nr.durations"] + ) + new_start_time = i_attrs["timestamp"] + new_end_time = i_attrs["timestamp"] / 1000 + i_attrs["duration"] + set_start_time = min(new_start_time, current_start_time) + # If the new span starts after the old span's end time or the new span + # ends before the current span starts; add the durations. + if current_end_time < new_start_time / 1000 or new_end_time < current_start_time / 1000: + set_duration = ct_exit_spans[span_attrs][1]["nr.durations"] + i_attrs["duration"] + # Otherwise, if the new and old span's overlap in time, use the newest + # end time and subtract the start time from it to calculate the new + # duration. + else: + set_duration = max(current_end_time, new_end_time) - set_start_time / 1000 + ct_exit_spans[span_attrs][0]["timestamp"] = set_start_time + ct_exit_spans[span_attrs][1]["nr.durations"] = set_duration + return None + def span_events( + self, + settings, + base_attrs=None, + parent_guid=None, + attr_class=dict, + partial_granularity_sampled=False, + ct_exit_spans=None, + ): + span = self.span_event( + settings, + base_attrs=base_attrs, + parent_guid=parent_guid, + attr_class=attr_class, + partial_granularity_sampled=partial_granularity_sampled, + ct_exit_spans=ct_exit_spans, + ) + ct_exit_spans["instrumented"] += 1 + parent_id = parent_guid + if span: # span will be None if the span is an inprocess span or repeated exit span. + ct_exit_spans["kept"] += 1 + yield span + # Compressed spans are always reparented onto the entry span. + if not settings.distributed_tracing.sampler.partial_granularity.type == "compact" or span[0].get( + "nr.entryPoint" + ): + parent_id = self.guid for child in self.children: - for event in child.span_events( # noqa: UP028 - settings, base_attrs=base_attrs, parent_guid=self.guid, attr_class=attr_class + for event in child.span_events( + settings, + base_attrs=base_attrs, + parent_guid=parent_id, + attr_class=attr_class, + partial_granularity_sampled=partial_granularity_sampled, + ct_exit_spans=ct_exit_spans, ): - yield event + ct_exit_spans["instrumented"] += 1 + if event: # event will be None if the span is an inprocess span or repeated exit span. + ct_exit_spans["kept"] += 1 + yield event class DatastoreNodeMixin(GenericNodeMixin): @@ -108,7 +213,15 @@ def db_instance(self): self._db_instance = db_instance_attr return db_instance_attr - def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): + def span_event( + self, + settings, + base_attrs=None, + parent_guid=None, + attr_class=dict, + partial_granularity_sampled=False, + ct_exit_spans=None, + ): a_attrs = self.agent_attributes a_attrs["db.instance"] = self.db_instance i_attrs = (base_attrs and base_attrs.copy()) or attr_class() @@ -140,4 +253,11 @@ def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dic except Exception: pass - return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class) + return super().span_event( + settings, + base_attrs=i_attrs, + parent_guid=parent_guid, + attr_class=attr_class, + partial_granularity_sampled=partial_granularity_sampled, + ct_exit_spans=ct_exit_spans, + ) diff --git a/newrelic/core/root_node.py b/newrelic/core/root_node.py index fa8b3de82b..72f1d392d7 100644 --- a/newrelic/core/root_node.py +++ b/newrelic/core/root_node.py @@ -37,7 +37,15 @@ class RootNode(_RootNode, GenericNodeMixin): - def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict): + def span_event( + self, + settings, + base_attrs=None, + parent_guid=None, + attr_class=dict, + partial_granularity_sampled=False, + ct_exit_spans=None, + ): i_attrs = (base_attrs and base_attrs.copy()) or attr_class() i_attrs["transaction.name"] = self.path i_attrs["nr.entryPoint"] = True @@ -46,7 +54,14 @@ def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dic if self.tracing_vendors: i_attrs["tracingVendors"] = self.tracing_vendors - return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class) + return super().span_event( + settings, + base_attrs=i_attrs, + parent_guid=parent_guid, + attr_class=attr_class, + partial_granularity_sampled=partial_granularity_sampled, + ct_exit_spans=ct_exit_spans, + ) def trace_node(self, stats, root, connections): name = self.path diff --git a/newrelic/core/stats_engine.py b/newrelic/core/stats_engine.py index f44f82fe13..47bb6ba7ec 100644 --- a/newrelic/core/stats_engine.py +++ b/newrelic/core/stats_engine.py @@ -1190,6 +1190,12 @@ def record_transaction(self, transaction): elif transaction.sampled: for event in transaction.span_events(self.__settings): self._span_events.add(event, priority=transaction.priority) + if transaction.partial_granularity_sampled: + partial_gran_type = settings.distributed_tracing.sampler.partial_granularity.type + self.record_custom_metrics([ + (f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Instrumented", {"count": transaction.instrumented}) + (f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Kept", {"count": transaction.kept}) + ]) # Merge in log events diff --git a/newrelic/core/transaction_node.py b/newrelic/core/transaction_node.py index 34871d8b21..eaa3b5f343 100644 --- a/newrelic/core/transaction_node.py +++ b/newrelic/core/transaction_node.py @@ -98,6 +98,7 @@ "root_span_guid", "trace_id", "loop_time", + "partial_granularity_sampled", ], ) @@ -633,5 +634,18 @@ def span_events(self, settings, attr_class=dict): ("priority", self.priority), ) ) - - yield from self.root.span_events(settings, base_attrs, parent_guid=self.parent_span, attr_class=attr_class) + ct_exit_spans = {"instrumented": 0, "kept": 0} + yield from self.root.span_events( + settings, + base_attrs, + parent_guid=self.parent_span, + attr_class=attr_class, + partial_granularity_sampled=self.partial_granularity_sampled, + ct_exit_spans=ct_exit_spans, + ) + # If this transaction is partial granularity sampled, record the number of spans + # instrumented and the number of spans kept to monitor cost savings of partial + # granularity tracing. + if self.partial_granularity_sampled: + self.instrumented = ct_exit_spans["instrumented"] + self.kept = ct_exit_spans["kept"] diff --git a/tests/agent_features/test_distributed_tracing.py b/tests/agent_features/test_distributed_tracing.py index 6548b17cf8..f11375a00b 100644 --- a/tests/agent_features/test_distributed_tracing.py +++ b/tests/agent_features/test_distributed_tracing.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import copy import json +import time import pytest import webtest @@ -24,6 +26,18 @@ from testing_support.validators.validate_transaction_event_attributes import validate_transaction_event_attributes from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics +from newrelic.api.function_trace import function_trace +from newrelic.common.object_wrapper import function_wrapper, transient_function_wrapper + +try: + from newrelic.core.infinite_tracing_pb2 import AttributeValue, Span +except: + AttributeValue = None + Span = None + +from testing_support.mock_external_http_server import MockExternalHTTPHResponseHeadersServer +from testing_support.validators.validate_span_events import check_value_equals, validate_span_events + from newrelic.api.application import application_instance from newrelic.api.background_task import BackgroundTask, background_task from newrelic.api.external_trace import ExternalTrace @@ -72,6 +86,110 @@ } +def validate_compact_span_event( + name, compressed_span_count, expected_nr_durations_low_bound, expected_nr_durations_high_bound +): + @function_wrapper + def _validate_wrapper(wrapped, instance, args, kwargs): + record_transaction_called = [] + recorded_span_events = [] + + @transient_function_wrapper("newrelic.core.stats_engine", "StatsEngine.record_transaction") + def capture_span_events(wrapped, instance, args, kwargs): + events = [] + + @transient_function_wrapper("newrelic.common.streaming_utils", "StreamBuffer.put") + def stream_capture(wrapped, instance, args, kwargs): + event = args[0] + events.append(event) + return wrapped(*args, **kwargs) + + record_transaction_called.append(True) + try: + result = stream_capture(wrapped)(*args, **kwargs) + except: + raise + else: + if not instance.settings.infinite_tracing.enabled: + events = [event for priority, seen_at, event in instance.span_events.pq] + + recorded_span_events.append(events) + + return result + + _new_wrapper = capture_span_events(wrapped) + val = _new_wrapper(*args, **kwargs) + assert record_transaction_called + captured_events = recorded_span_events.pop(-1) + + mismatches = [] + matching_span_events = 0 + + def _span_details(): + details = [ + f"matching_span_events={matching_span_events}", + f"mismatches={mismatches}", + f"captured_events={captured_events}", + ] + return "\n".join(details) + + for captured_event in captured_events: + if Span and isinstance(captured_event, Span): + intrinsics = captured_event.intrinsics + user_attrs = captured_event.user_attributes + agent_attrs = captured_event.agent_attributes + else: + intrinsics, _, agent_attrs = captured_event + + # Find the span by name. + if not check_value_equals(intrinsics, "name", name): + continue + assert check_value_length(agent_attrs, "nr.ids", compressed_span_count - 1, mismatches), _span_details() + assert check_value_between( + agent_attrs, + "nr.durations", + expected_nr_durations_low_bound, + expected_nr_durations_high_bound, + mismatches, + ), _span_details() + matching_span_events += 1 + + assert matching_span_events == 1, _span_details() + return val + + return _validate_wrapper + + +def check_value_between(dictionary, key, expected_min, expected_max, mismatches): + value = dictionary.get(key) + if AttributeValue and isinstance(value, AttributeValue): + for _, val in value.ListFields(): + if not (expected_min < val < expected_max): + mismatches.append(f"key: {key}, not {expected_min} < {val} < {expected_max}") + return False + return True + else: + if not (expected_min < value < expected_max): + mismatches.append(f"key: {key}, not {expected_min} < {value} < {expected_max}") + return False + return True + + +def check_value_length(dictionary, key, expected_length, mismatches): + value = dictionary.get(key) + if AttributeValue and isinstance(value, AttributeValue): + for _, val in value.ListFields(): + if len(val) != expected_length: + mismatches.append(f"key: {key}, not len({val}) == {expected_length}") + return False + return True + else: + if len(value) != expected_length: + mismatches.append(f"key: {key}, not len({value}) == {expected_length}") + return False + return True + + @wsgi_application() def target_wsgi_application(environ, start_response): status = "200 OK" @@ -468,8 +586,99 @@ def test_distributed_trace_remote_parent_sampling_decision_full_granularity( test_settings = _override_settings.copy() test_settings.update( { - "distributed_tracing.sampler.remote_parent_sampled": remote_parent_sampled_setting, - "distributed_tracing.sampler.remote_parent_not_sampled": remote_parent_not_sampled_setting, + "distributed_tracing.sampler.full_granularity.remote_parent_sampled": remote_parent_sampled_setting, + "distributed_tracing.sampler.full_granularity.remote_parent_not_sampled": remote_parent_not_sampled_setting, + "span_events.enabled": True, + } + ) + if expected_adaptive_sampling_algo_called: + function_called_decorator = validate_function_called( + "newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled" + ) + else: + function_called_decorator = validate_function_not_called( + "newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled" + ) + + @function_called_decorator + @override_application_settings(test_settings) + @validate_attributes_complete("intrinsic", required_intrinsics) + @background_task(name="test_distributed_trace_attributes") + def _test(): + txn = current_transaction() + + if traceparent_sampled is not None: + headers = { + "traceparent": f"00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-{int(traceparent_sampled):02x}", + "newrelic": '{"v":[0,1],"d":{"ty":"Mobile","ac":"123","ap":"51424","id":"5f474d64b9cc9b2a","tr":"6e2fea0b173fdad0","pr":0.1234,"sa":true,"ti":1482959525577,"tx":"27856f70d3d314b7"}}', # This header should be ignored. + } + if newrelic_sampled is not None: + headers["tracestate"] = ( + f"1@nr=0-0-1-2827902-0af7651916cd43dd-00f067aa0ba902b7-{int(newrelic_sampled)}-1.23456-1518469636035" + ) + else: + headers = { + "newrelic": '{"v":[0,1],"d":{"ty":"Mobile","ac":"1","ap":"51424","id":"00f067aa0ba902b7","tr":"0af7651916cd43dd8448eb211c80319c","pr":0.1234,"sa":%s,"ti":1482959525577,"tx":"0af7651916cd43dd"}}' + % (str(newrelic_sampled).lower()) + } + accept_distributed_trace_headers(headers) + + _test() + + +@pytest.mark.parametrize( + "traceparent_sampled,newrelic_sampled,remote_parent_sampled_setting,remote_parent_not_sampled_setting,expected_sampled,expected_priority,expected_adaptive_sampling_algo_called", + ( + (True, None, "default", "default", None, None, True), # Uses adaptive sampling algo. + (True, None, "always_on", "default", True, 2, False), # Always sampled. + (True, None, "always_off", "default", False, 0, False), # Never sampled. + (False, None, "default", "default", None, None, True), # Uses adaptive sampling algo. + (False, None, "always_on", "default", None, None, True), # Uses adaptive sampling alog. + (False, None, "always_off", "default", None, None, True), # Uses adaptive sampling algo. + (True, None, "default", "always_on", None, None, True), # Uses adaptive sampling algo. + (True, None, "default", "always_off", None, None, True), # Uses adaptive sampling algo. + (False, None, "default", "always_on", True, 2, False), # Always sampled. + (False, None, "default", "always_off", False, 0, False), # Never sampled. + (True, True, "default", "default", True, 1.23456, False), # Uses sampling decision in W3C TraceState header. + (True, False, "default", "default", False, 1.23456, False), # Uses sampling decision in W3C TraceState header. + (False, False, "default", "default", False, 1.23456, False), # Uses sampling decision in W3C TraceState header. + (True, False, "always_on", "default", True, 2, False), # Always sampled. + (True, True, "always_off", "default", False, 0, False), # Never sampled. + (False, False, "default", "always_on", True, 2, False), # Always sampled. + (False, True, "default", "always_off", False, 0, False), # Never sampled. + (None, True, "default", "default", True, 0.1234, False), # Uses sampling and priority from newrelic header. + (None, True, "always_on", "default", True, 2, False), # Always sampled. + (None, True, "always_off", "default", False, 0, False), # Never sampled. + (None, False, "default", "default", False, 0.1234, False), # Uses sampling and priority from newrelic header. + (None, False, "always_on", "default", False, 0.1234, False), # Uses sampling and priority from newrelic header. + (None, True, "default", "always_on", True, 0.1234, False), # Uses sampling and priority from newrelic header. + (None, False, "default", "always_on", True, 2, False), # Always sampled. + (None, False, "default", "always_off", False, 0, False), # Never sampled. + (None, None, "default", "default", None, None, True), # Uses adaptive sampling algo. + ), +) +def test_distributed_trace_remote_parent_sampling_decision_partial_granularity( + traceparent_sampled, + newrelic_sampled, + remote_parent_sampled_setting, + remote_parent_not_sampled_setting, + expected_sampled, + expected_priority, + expected_adaptive_sampling_algo_called, +): + required_intrinsics = [] + if expected_sampled is not None: + required_intrinsics.append(Attribute(name="sampled", value=expected_sampled, destinations=0b110)) + if expected_priority is not None: + required_intrinsics.append(Attribute(name="priority", value=expected_priority, destinations=0b110)) + + test_settings = _override_settings.copy() + test_settings.update( + { + "distributed_tracing.sampler.full_granularity.enabled": False, + "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.remote_parent_sampled": remote_parent_sampled_setting, + "distributed_tracing.sampler.partial_granularity.remote_parent_not_sampled": remote_parent_not_sampled_setting, "span_events.enabled": True, } ) @@ -506,3 +715,320 @@ def _test(): accept_distributed_trace_headers(headers) _test() + + +@pytest.mark.parametrize( + "full_granularity_enabled,full_granularity_remote_parent_sampled_setting,partial_granularity_enabled,partial_granularity_remote_parent_sampled_setting,expected_sampled,expected_priority,expected_adaptive_sampling_algo_called", + ( + (True, "always_off", True, "adaptive", None, None, True), # Uses adaptive sampling algo. + (True, "always_on", True, "adaptive", True, 2, False), # Uses adaptive sampling algo. + (False, "always_on", False, "adaptive", False, 0, False), # Uses adaptive sampling algo. + ), +) +def test_distributed_trace_remote_parent_sampling_decision_between_full_and_partial_granularity( + full_granularity_enabled, + full_granularity_remote_parent_sampled_setting, + partial_granularity_enabled, + partial_granularity_remote_parent_sampled_setting, + expected_sampled, + expected_priority, + expected_adaptive_sampling_algo_called, +): + required_intrinsics = [] + if expected_sampled is not None: + required_intrinsics.append(Attribute(name="sampled", value=expected_sampled, destinations=0b110)) + if expected_priority is not None: + required_intrinsics.append(Attribute(name="priority", value=expected_priority, destinations=0b110)) + + test_settings = _override_settings.copy() + test_settings.update( + { + "distributed_tracing.sampler.full_granularity.enabled": full_granularity_enabled, + "distributed_tracing.sampler.partial_granularity.enabled": partial_granularity_enabled, + "distributed_tracing.sampler.full_granularity.remote_parent_sampled": full_granularity_remote_parent_sampled_setting, + "distributed_tracing.sampler.partial_granularity.remote_parent_sampled": partial_granularity_remote_parent_sampled_setting, + "span_events.enabled": True, + } + ) + if expected_adaptive_sampling_algo_called: + function_called_decorator = validate_function_called( + "newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled" + ) + else: + function_called_decorator = validate_function_not_called( + "newrelic.core.adaptive_sampler", "AdaptiveSampler.compute_sampled" + ) + + @function_called_decorator + @override_application_settings(test_settings) + @validate_attributes_complete("intrinsic", required_intrinsics) + @background_task(name="test_distributed_trace_attributes") + def _test(): + headers = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"} + accept_distributed_trace_headers(headers) + + _test() + + +def test_partial_granularity_max_compressed_spans(): + """ + Tests `nr.ids` does not exceed 1024 byte limit. + """ + + async def test(index): + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + time.sleep(0.1) + + @function_trace() + async def call_tests(): + tasks = [test(i) for i in range(65)] + await asyncio.gather(*tasks) + + @validate_span_events( + count=1, # Entry span. + exact_intrinsics={ + "name": "Function/test_distributed_tracing:test_partial_granularity_max_compressed_spans.._test" + }, + expected_intrinsics=["duration", "timestamp"], + ) + @validate_span_events( + count=1, # 1 external compressed span. + exact_intrinsics={"name": "External/localhost:3000/requests/GET"}, + exact_agents={"http.url": "http://localhost:3000/"}, + expected_agents=["nr.durations", "nr.ids"], + ) + @validate_compact_span_event( + name="External/localhost:3000/requests/GET", + # `nr.ids` can only hold 63 ids but duration reflects all compressed spans. + compressed_span_count=64, + expected_nr_durations_low_bound=6.5, + expected_nr_durations_high_bound=6.8, # 64 of these adds > .2 overhead. + ) + @background_task() + def _test(): + headers = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"} + accept_distributed_trace_headers(headers) + asyncio.run(call_tests()) + + _test = override_application_settings( + { + "distributed_tracing.sampler.full_granularity.enabled": False, + "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.type": "compact", + "distributed_tracing.sampler.partial_granularity.remote_parent_sampled": "always_on", + "span_events.enabled": True, + } + )(_test) + + _test() + + +def test_partial_granularity_compressed_span_attributes_in_series(): + """ + Tests compressed span attributes when compressed span times are serial. + Aka: each span ends before the next compressed span begins. + """ + + async def test(index): + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + time.sleep(0.1) + + @function_trace() + async def call_tests(): + tasks = [test(i) for i in range(3)] + await asyncio.gather(*tasks) + + @validate_span_events( + count=1, # Entry span. + exact_intrinsics={ + "name": "Function/test_distributed_tracing:test_partial_granularity_compressed_span_attributes_in_series.._test" + }, + expected_intrinsics=["duration", "timestamp"], + ) + @validate_span_events( + count=1, # 1 external compressed span. + exact_intrinsics={"name": "External/localhost:3000/requests/GET"}, + exact_agents={"http.url": "http://localhost:3000/"}, + expected_agents=["nr.durations", "nr.ids"], + ) + @validate_compact_span_event( + name="External/localhost:3000/requests/GET", + compressed_span_count=3, + expected_nr_durations_low_bound=0.3, + expected_nr_durations_high_bound=0.4, + ) + @background_task() + def _test(): + headers = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"} + accept_distributed_trace_headers(headers) + asyncio.run(call_tests()) + + _test = override_application_settings( + { + "distributed_tracing.sampler.full_granularity.enabled": False, + "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.type": "compact", + "distributed_tracing.sampler.partial_granularity.remote_parent_sampled": "always_on", + "span_events.enabled": True, + } + )(_test) + + _test() + + +def test_partial_granularity_compressed_span_attributes_overlapping(): + """ + Tests compressed span attributes when compressed span times overlap. + Aka: the next span begins in the middle of the first span. + """ + + @validate_span_events( + count=1, # Entry span. + exact_intrinsics={ + "name": "Function/test_distributed_tracing:test_partial_granularity_compressed_span_attributes_overlapping.._test" + }, + expected_intrinsics=["duration", "timestamp"], + ) + @validate_span_events( + count=1, # 1 external compressed span. + exact_intrinsics={"name": "External/localhost:3000/requests/GET"}, + exact_agents={"http.url": "http://localhost:3000/"}, + expected_agents=["nr.durations", "nr.ids"], + ) + @validate_compact_span_event( + name="External/localhost:3000/requests/GET", + compressed_span_count=2, + expected_nr_durations_low_bound=0.1, + expected_nr_durations_high_bound=0.2, + ) + @background_task() + def _test(): + headers = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"} + accept_distributed_trace_headers(headers) + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace1: + # Override terminal_node so we can create a nested exit span. + trace1.terminal_node = lambda: False + trace2 = ExternalTrace("requests", "http://localhost:3000/", method="GET") + trace2.__enter__() + time.sleep(0.1) + trace2.__exit__(None, None, None) + + _test = override_application_settings( + { + "distributed_tracing.sampler.full_granularity.enabled": False, + "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.type": "compact", + "distributed_tracing.sampler.partial_granularity.remote_parent_sampled": "always_on", + "span_events.enabled": True, + } + )(_test) + + _test() + + +def test_partial_granularity_reduced_span_attributes(): + """ + In reduced mode, only inprocess spans are dropped. + """ + + @function_trace() + def foo(): + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + trace.add_custom_attribute("custom", "bar") + + @validate_span_events( + count=1, # Entry span. + exact_intrinsics={ + "name": "Function/test_distributed_tracing:test_partial_granularity_reduced_span_attributes.._test" + }, + expected_intrinsics=["duration", "timestamp"], + expected_agents=["code.function", "code.lineno", "code.namespace"], + ) + @validate_span_events( + count=0, # Function foo span should not be present. + exact_intrinsics={ + "name": "Function/test_distributed_tracing:test_partial_granularity_reduced_span_attributes..foo" + }, + expected_intrinsics=["duration", "timestamp"], + ) + @validate_span_events( + count=2, # 2 external spans. + exact_intrinsics={"name": "External/localhost:3000/requests/GET"}, + exact_agents={"http.url": "http://localhost:3000/"}, + exact_users={"custom": "bar"}, + ) + @background_task() + def _test(): + headers = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"} + accept_distributed_trace_headers(headers) + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + # Override terminal_node so we can create a nested exit span. + trace.terminal_node = lambda: False + trace.add_custom_attribute("custom", "bar") + foo() + + _test = override_application_settings( + { + "distributed_tracing.sampler.full_granularity.enabled": False, + "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.type": "reduced", + "distributed_tracing.sampler.partial_granularity.remote_parent_sampled": "always_on", + "span_events.enabled": True, + } + )(_test) + + _test() + + +def test_partial_granularity_essential_span_attributes(): + """ + In essential mode, inprocess spans are dropped and non-entity synthesis attributes. + """ + + @function_trace() + def foo(): + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + trace.add_custom_attribute("custom", "bar") + + @validate_span_events( + count=1, # Entry span. + exact_intrinsics={ + "name": "Function/test_distributed_tracing:test_partial_granularity_essential_span_attributes.._test" + }, + expected_intrinsics=["duration", "timestamp"], + unexpected_agents=["code.function", "code.lineno", "code.namespace"], + ) + @validate_span_events( + count=0, # Function foo span should not be present. + exact_intrinsics={ + "name": "Function/test_distributed_tracing:test_partial_granularity_essential_span_attributes..foo" + }, + expected_intrinsics=["duration", "timestamp"], + ) + @validate_span_events( + count=2, # 2 external spans. + exact_intrinsics={"name": "External/localhost:3000/requests/GET"}, + exact_agents={"http.url": "http://localhost:3000/"}, + unexpected_users=["custom"], + ) + @background_task() + def _test(): + headers = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"} + accept_distributed_trace_headers(headers) + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + # Override terminal_node so we can create a nested exit span. + trace.terminal_node = lambda: False + trace.add_custom_attribute("custom", "bar") + foo() + + _test = override_application_settings( + { + "distributed_tracing.sampler.full_granularity.enabled": False, + "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.type": "essential", + "distributed_tracing.sampler.partial_granularity.remote_parent_sampled": "always_on", + "span_events.enabled": True, + } + )(_test) + + _test() diff --git a/tests/agent_features/test_event_loop_wait_time.py b/tests/agent_features/test_event_loop_wait_time.py index cad4679600..87bbde6a52 100644 --- a/tests/agent_features/test_event_loop_wait_time.py +++ b/tests/agent_features/test_event_loop_wait_time.py @@ -27,6 +27,15 @@ from newrelic.core.trace_cache import trace_cache +@pytest.fixture() +def event_loop(): + from asyncio import new_event_loop, set_event_loop + + loop = new_event_loop() + set_event_loop(loop) + return loop + + @background_task(name="block") async def block_loop(ready, done, blocking_transaction_active, times=1): for _ in range(times): @@ -64,8 +73,6 @@ async def wait_for_loop(ready, done, times=1): "blocking_transaction_active,event_loop_visibility_enabled", ((True, True), (False, True), (False, False)) ) def test_record_event_loop_wait(event_loop, blocking_transaction_active, event_loop_visibility_enabled): - # import asyncio - metric_count = 2 if event_loop_visibility_enabled else None execute_attributes = {"intrinsic": ("eventLoopTime",), "agent": (), "user": ()} wait_attributes = {"intrinsic": ("eventLoopWait",), "agent": (), "user": ()} @@ -143,8 +150,6 @@ def test_blocking_task_on_different_loop(): def test_record_event_loop_wait_on_different_task(event_loop): - # import asyncio - async def recorder(ready, wait): ready.set() await wait.wait() diff --git a/tests/agent_unittests/test_distributed_tracing_settings.py b/tests/agent_unittests/test_distributed_tracing_settings.py index a1c99da58d..3668cfbe32 100644 --- a/tests/agent_unittests/test_distributed_tracing_settings.py +++ b/tests/agent_unittests/test_distributed_tracing_settings.py @@ -14,6 +14,8 @@ import pytest +from newrelic.core.config import finalize_application_settings + INI_FILE_EMPTY = b""" [newrelic] """ @@ -30,3 +32,35 @@ def test_distributed_trace_setings(ini, env, expected_format, global_settings): settings = global_settings() assert settings.distributed_tracing.exclude_newrelic_header == expected_format + + +@pytest.mark.parametrize( + "ini,env", + ( + ( + INI_FILE_EMPTY, + { + "NEW_RELIC_ENABLED": "true", + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_SAMPLED": "default", + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_NOT_SAMPLED": "default", + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_FULL_GRANULARITY_REMOTE_PARENT_SAMPLED": "always_on", + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_FULL_GRANULARITY_REMOTE_PARENT_NOT_SAMPLED": "always_off", + }, + ), + ( + INI_FILE_EMPTY, + { + "NEW_RELIC_ENABLED": "true", + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_SAMPLED": "always_on", + "NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_REMOTE_PARENT_NOT_SAMPLED": "always_off", + }, + ), + ), +) +def test_full_granularity_precedence(ini, env, global_settings): + settings = global_settings() + + app_settings = finalize_application_settings(settings=settings) + + assert app_settings.distributed_tracing.sampler.full_granularity.remote_parent_sampled == "always_on" + assert app_settings.distributed_tracing.sampler.full_granularity.remote_parent_not_sampled == "always_off" diff --git a/tests/agent_unittests/test_harvest_loop.py b/tests/agent_unittests/test_harvest_loop.py index 9717e956ba..9ca2094268 100644 --- a/tests/agent_unittests/test_harvest_loop.py +++ b/tests/agent_unittests/test_harvest_loop.py @@ -166,6 +166,7 @@ def transaction_node(request): root_span_guid=None, trace_id="4485b89db608aece", loop_time=0.0, + partial_granularity_sampled=False, ) return node @@ -321,14 +322,14 @@ def test_serverless_application_harvest(): @pytest.mark.parametrize( - "distributed_tracing_enabled,span_events_enabled,spans_created", - [(True, True, 1), (True, True, 15), (True, False, 1), (True, True, 0), (True, False, 0), (False, True, 0)], + "distributed_tracing_enabled,full_granularity_enabled,partial_granularity_enabled,span_events_enabled,spans_created", + [(True, True, False, True, 1), (True, True, True, True, 1), (True, True, False, True, 15), (True, True, False, False, 1), (True, True, False, True, 0), (True, True, False, False, 0), (False, True, False, True, 0)], ) -def test_application_harvest_with_spans(distributed_tracing_enabled, span_events_enabled, spans_created): +def test_application_harvest_with_spans(distributed_tracing_enabled, full_granularity_enabled, partial_granularity_enabled, span_events_enabled, spans_created): span_endpoints_called = [] max_samples_stored = 10 - if distributed_tracing_enabled and span_events_enabled: + if distributed_tracing_enabled and span_events_enabled and (full_granularity_enabled or partial_granularity_enabled): seen = spans_created sent = min(spans_created, max_samples_stored) else: @@ -340,6 +341,10 @@ def test_application_harvest_with_spans(distributed_tracing_enabled, span_events spans_required_metrics.extend( [("Supportability/SpanEvent/TotalEventsSeen", seen), ("Supportability/SpanEvent/TotalEventsSent", sent)] ) + if partial_granularity_enabled: + spans_required_metrics.extend( + [("Supportability/Python/PartialGranularity/essential", 1)] + ) @validate_metric_payload(metrics=spans_required_metrics, endpoints_called=span_endpoints_called) @override_generic_settings( @@ -348,6 +353,8 @@ def test_application_harvest_with_spans(distributed_tracing_enabled, span_events "developer_mode": True, "license_key": "**NOT A LICENSE KEY**", "distributed_tracing.enabled": distributed_tracing_enabled, + "distributed_tracing.sampler.full_granularity.enabled": full_granularity_enabled, + "distributed_tracing.sampler.partial_granularity.enabled": partial_granularity_enabled, "span_events.enabled": span_events_enabled, # Uses the name from post-translation as this is modifying the settings object, not a config file "event_harvest_config.harvest_limits.span_event_data": max_samples_stored, @@ -366,12 +373,12 @@ def _test(): # Verify that the metric_data endpoint is the 2nd to last and # span_event_data is the 3rd to last endpoint called - assert span_endpoints_called[-2] == "metric_data" + assert span_endpoints_called[-2] == "metric_data", span_endpoints_called if span_events_enabled and spans_created > 0: - assert span_endpoints_called[-3] == "span_event_data" + assert span_endpoints_called[-3] == "span_event_data", span_endpoints_called else: - assert span_endpoints_called[-3] != "span_event_data" + assert span_endpoints_called[-3] != "span_event_data", span_endpoints_called _test() diff --git a/tests/agent_unittests/test_infinite_trace_settings.py b/tests/agent_unittests/test_infinite_trace_settings.py index 4b47a72398..31c8e6819e 100644 --- a/tests/agent_unittests/test_infinite_trace_settings.py +++ b/tests/agent_unittests/test_infinite_trace_settings.py @@ -14,6 +14,8 @@ import pytest +from newrelic.core.config import finalize_application_settings + INI_FILE_EMPTY = b""" [newrelic] """ @@ -77,3 +79,18 @@ def test_infinite_tracing_port(ini, env, expected_port, global_settings): def test_infinite_tracing_span_queue_size(ini, env, expected_size, global_settings): settings = global_settings() assert settings.infinite_tracing.span_queue_size == expected_size + + +@pytest.mark.parametrize( + "ini,env", + ((INI_FILE_INFINITE_TRACING, {"NEW_RELIC_DISTRIBUTED_TRACING_SAMPLER_PARTIAL_GRANULARITY_ENABLED": "true"}),), +) +def test_partial_granularity_dissabled_when_infinite_tracing_enabled(ini, env, global_settings): + settings = global_settings() + assert settings.distributed_tracing.sampler.partial_granularity.enabled + assert settings.infinite_tracing.enabled + + app_settings = finalize_application_settings(settings=settings) + + assert not app_settings.distributed_tracing.sampler.partial_granularity.enabled + assert app_settings.infinite_tracing.enabled diff --git a/tests/mlmodel_openai/test_chat_completion_v1.py b/tests/mlmodel_openai/test_chat_completion_v1.py index 817db35d8e..969e4233bf 100644 --- a/tests/mlmodel_openai/test_chat_completion_v1.py +++ b/tests/mlmodel_openai/test_chat_completion_v1.py @@ -11,9 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import openai -from testing_support.fixtures import override_llm_token_callback_settings, reset_core_stats_engine, validate_attributes +import pytest +from testing_support.fixtures import ( + override_application_settings, + override_llm_token_callback_settings, + reset_core_stats_engine, + validate_attributes, +) from testing_support.ml_testing_utils import ( add_token_count_to_events, disabled_ai_monitoring_record_content_settings, @@ -31,7 +36,7 @@ from newrelic.api.background_task import background_task from newrelic.api.llm_custom_attributes import WithLlmCustomAttributes -from newrelic.api.transaction import add_custom_attribute +from newrelic.api.transaction import accept_distributed_trace_headers, add_custom_attribute _test_openai_chat_completion_messages = ( {"role": "system", "content": "You are a scientist."}, @@ -387,6 +392,46 @@ def test_openai_chat_completion_async_with_llm_metadata_no_content(loop, set_tra ) +@pytest.mark.parametrize("partial_granularity_type", ("reduced", "essential", "compact")) +def test_openai_chat_completion_async_in_txn_with_token_count_partial_granularity_dt( + partial_granularity_type, set_trace_info, loop, async_openai_client +): + @reset_core_stats_engine() + @disabled_ai_monitoring_record_content_settings + @validate_custom_events(events_sans_content(chat_completion_recorded_events)) + @validate_custom_event_count(count=4) + @validate_transaction_metrics( + "test_chat_completion_v1:test_openai_chat_completion_async_in_txn_with_token_count_partial_granularity_dt.._test", + scoped_metrics=[("Llm/completion/OpenAI/create", 1)], + rollup_metrics=[("Llm/completion/OpenAI/create", 1)], + custom_metrics=[(f"Supportability/Python/ML/OpenAI/{openai.__version__}", 1)], + background_task=True, + ) + @validate_attributes("agent", ["llm"]) + @override_application_settings( + { + "distributed_tracing.sampler.full_granularity.enabled": False, + "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.type": partial_granularity_type, + "distributed_tracing.sampler.partial_granularity.remote_parent_sampled": "always_on", + "span_events.enabled": True, + } + ) + @background_task() + def _test(): + add_custom_attribute("llm.conversation_id", "my-awesome-id") + add_custom_attribute("llm.foo", "bar") + accept_distributed_trace_headers({"traceparent": "00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"}) + set_trace_info() + loop.run_until_complete( + async_openai_client.chat.completions.create( + model="gpt-3.5-turbo", messages=_test_openai_chat_completion_messages, temperature=0.7, max_tokens=100 + ) + ) + + _test() + + @reset_core_stats_engine() @override_llm_token_callback_settings(llm_token_count_callback) @validate_custom_events(add_token_count_to_events(chat_completion_recorded_events))