Skip to content

Core tracing prototype options #1419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion newrelic/api/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,13 @@ def __exit__(self, exc, value, tb):
def sampled(self):
return self._sampled

@property
def ct_sampled(self):
# If DT doesn't sample it CT will.
if not self.sampled and (self._settings.distributed_tracing.drop_inprocess_spans.enabled or self._settings.distributed_tracing.unique_spans.enabled):
return True
return False

@property
def priority(self):
return self._priority
Expand Down Expand Up @@ -1082,12 +1089,13 @@ def _create_distributed_trace_data(self):
return

self._compute_sampled_and_priority()
sampled = self.sampled or self.ct_sampled
data = {
"ty": "App",
"ac": account_id,
"ap": application_id,
"tr": self.trace_id,
"sa": self.sampled,
"sa": sampled,
"pr": self.priority,
"tx": self.guid,
"ti": int(time.time() * 1000.0),
Expand Down
70 changes: 69 additions & 1 deletion newrelic/common/streaming_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import collections
import logging
import threading
import sys

try:
from newrelic.core.infinite_tracing_pb2 import AttributeValue, SpanBatch
Expand All @@ -25,13 +26,76 @@
_logger = logging.getLogger(__name__)


def get_deep_size(obj, seen=None):
"""Recursively calculates the size of an object including nested lists and dicts."""
if seen is None:
seen = set()
size = -8*3 # Subtract 8 for each of the 3 attribute lists as those don't count.
else:
size = 0

# Avoid recursion for already seen objects (handle circular references)
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)

if isinstance(obj, str):
size += len(obj)
return size
elif isinstance(obj, float) or isinstance(obj, int):
size += 8
return size
elif isinstance(obj, bool):
size += 1
return size
elif isinstance(obj, dict):
size += sum(get_deep_size(k, seen) + get_deep_size(v, seen) for k, v in obj.items())
elif isinstance(obj, (list, tuple, set, frozenset)):
size += 8 + sum(get_deep_size(i, seen) for i in obj)
else:
size += 8

return size


def get_deep_size_protobuf(obj):
"""Recursively calculates the size of an object including nested lists and dicts."""
size = 0
if hasattr(obj, "string_value"):
size += len(obj.string_value)
return size
elif hasattr(obj, "double_value"):
size += 8
return size
elif hasattr(obj, "int_value"):
size += 8
return size
elif hasattr(obj, "bool_value"):
size += 1
return size

if hasattr(obj, "agent_attributes"):
size += sum(len(k) + get_deep_size_protobuf(v) for k, v in obj.agent_attributes.items())
if hasattr(obj, "user_attributes"):
size += sum(len(k) + get_deep_size_protobuf(v) for k, v in obj.user_attributes.items())
if hasattr(obj, "intrinsics"):
size += sum(len(k) + get_deep_size_protobuf(v) for k, v in obj.intrinsics.items())
else:
size += 8

return size


class StreamBuffer:
def __init__(self, maxlen, batching=False):
self._queue = collections.deque(maxlen=maxlen)
self._notify = self.condition()
self._shutdown = False
self._seen = 0
self._dropped = 0
self._bytes = 0
self._ct_processing_time = 0
self._settings = None

self.batching = batching
Expand All @@ -50,7 +114,9 @@
if self._shutdown:
return

self._seen += 1

Check failure on line 117 in newrelic/common/streaming_utils.py

View workflow job for this annotation

GitHub Actions / MegaLinter

Ruff (G004)

newrelic/common/streaming_utils.py:117:27: G004 Logging statement uses f-string
_logger.debug(f"{item.intrinsics['name']} [{len(item.intrinsics)}, {len(item.user_attributes)}, {len(item.agent_attributes)}] {get_deep_size_protobuf(item)}")
self._bytes += get_deep_size_protobuf(item)

# NOTE: dropped can be over-counted as the queue approaches
# capacity while data is still being transmitted.
Expand All @@ -67,8 +133,10 @@
with self._notify:
seen, dropped = self._seen, self._dropped
self._seen, self._dropped = 0, 0
_bytes, ct_processing_time = self._bytes, self._ct_processing_time
self._bytes, self._ct_processing_time = 0, 0

return seen, dropped
return seen, dropped, _bytes, ct_processing_time

def __bool__(self):
return bool(self._queue)
Expand Down
3 changes: 3 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ def _process_configuration(section):
_process_setting(section, "custom_insights_events.max_attribute_value", "getint", None)
_process_setting(section, "ml_insights_events.enabled", "getboolean", None)
_process_setting(section, "distributed_tracing.enabled", "getboolean", None)
_process_setting(section, "distributed_tracing.drop_inprocess_spans.enabled", "getboolean", None)
_process_setting(section, "distributed_tracing.unique_spans.enabled", "getboolean", None)
_process_setting(section, "distributed_tracing.minimize_attributes.enabled", "getboolean", None)
_process_setting(section, "distributed_tracing.exclude_newrelic_header", "getboolean", None)
_process_setting(section, "span_events.enabled", "getboolean", None)
_process_setting(section, "span_events.max_samples_stored", "getint", None)
Expand Down
14 changes: 12 additions & 2 deletions newrelic/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,11 @@ def connect_to_data_collector(self, activate_agent):
sampling_target_period = 60.0
else:
sampling_target_period = configuration.sampling_target_period_in_seconds
self.adaptive_sampler = AdaptiveSampler(configuration.sampling_target, sampling_target_period)
sampling_target = configuration.sampling_target
# If span reduction is enabled double the transaction reservoir size.
if configuration.distributed_tracing.drop_inprocess_spans.enabled or configuration.distributed_tracing.unique_spans.enabled:
sampling_target = configuration.sampling_target*2
self.adaptive_sampler = AdaptiveSampler(sampling_target, sampling_target_period)

active_session.connect_span_stream(self._stats_engine.span_stream, self.record_custom_metric)

Expand Down Expand Up @@ -1367,11 +1371,14 @@ def harvest(self, shutdown=False, flexible=False):
span_stream = stats.span_stream
# Only merge stats as part of default harvest
if span_stream is not None and not flexible:
spans_seen, spans_dropped = span_stream.stats()
spans_seen, spans_dropped, _bytes, ct_processing_time = span_stream.stats()
spans_sent = spans_seen - spans_dropped

internal_count_metric("Supportability/InfiniteTracing/Span/Seen", spans_seen)
internal_count_metric("Supportability/InfiniteTracing/Span/Sent", spans_sent)
print(f"spans sent: {spans_sent}")
internal_count_metric("Supportability/InfiniteTracing/Bytes/Seen", _bytes)
internal_count_metric("Supportability/CoreTracing/TotalTime", ct_processing_time*1000) # Time in ms.
else:
spans = stats.span_events
if spans:
Expand All @@ -1388,6 +1395,9 @@ def harvest(self, shutdown=False, flexible=False):
spans_sampled = spans.num_samples
internal_count_metric("Supportability/SpanEvent/TotalEventsSeen", spans_seen)
internal_count_metric("Supportability/SpanEvent/TotalEventsSent", spans_sampled)
print(f"spans sent: {spans_sampled}")
internal_count_metric("Supportability/DistributedTracing/Bytes/Seen", spans.bytes)
internal_count_metric("Supportability/SpanEvent/TotalCoreTracingTime", spans.ct_processing_time*1000) # Time in ms.

stats.reset_span_events()

Expand Down
17 changes: 17 additions & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,23 @@
"server.address",
}

SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES = {
"cloud.account.id",
"cloud.platform",
"cloud.region",
"cloud.resource_id",
"db.instance",
"db.system",
"http.url",
"messaging.destination.name",
"messaging.system",
"peer.hostname",
"server.address",
"server.port",
"span.kind",
}


MAX_NUM_USER_ATTRIBUTES = 128
MAX_ATTRIBUTE_LENGTH = 255
MAX_NUM_ML_USER_ATTRIBUTES = 64
Expand Down
18 changes: 18 additions & 0 deletions newrelic/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,18 @@ class DistributedTracingSettings(Settings):
pass


class DistributedTracingDropInprocessSpansSettings(Settings):
pass


class DistributedTracingUniqueSpansSettings(Settings):
pass


class DistributedTracingMinimizeAttributesSettings(Settings):
pass


class ServerlessModeSettings(Settings):
pass

Expand Down Expand Up @@ -493,6 +505,9 @@ class EventHarvestConfigHarvestLimitSettings(Settings):
_settings.datastore_tracer.instance_reporting = DatastoreTracerInstanceReportingSettings()
_settings.debug = DebugSettings()
_settings.distributed_tracing = DistributedTracingSettings()
_settings.distributed_tracing.drop_inprocess_spans = DistributedTracingDropInprocessSpansSettings()
_settings.distributed_tracing.unique_spans = DistributedTracingUniqueSpansSettings()
_settings.distributed_tracing.minimize_attributes = DistributedTracingMinimizeAttributesSettings()
_settings.error_collector = ErrorCollectorSettings()
_settings.error_collector.attributes = ErrorCollectorAttributesSettings()
_settings.event_harvest_config = EventHarvestConfigSettings()
Expand Down Expand Up @@ -814,6 +829,9 @@ def default_otlp_host(host):
_settings.ml_insights_events.enabled = False

_settings.distributed_tracing.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_ENABLED", default=True)
_settings.distributed_tracing.drop_inprocess_spans.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_DROP_INPROCESS_SPANS_ENABLED", default=False)
_settings.distributed_tracing.unique_spans.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_UNIQUE_SPANS_ENABLED", default=False)
_settings.distributed_tracing.minimize_attributes.enabled = _environ_as_bool("NEW_RELIC_DISTRIBUTED_TRACING_MINIMIZE_ATTRIBUTES_ENABLED", default=False)
_settings.distributed_tracing.exclude_newrelic_header = False
_settings.span_events.enabled = _environ_as_bool("NEW_RELIC_SPAN_EVENTS_ENABLED", default=True)
_settings.span_events.attributes.enabled = True
Expand Down
7 changes: 3 additions & 4 deletions newrelic/core/external_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,15 @@
start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=None
)

def span_event(self, *args, **kwargs):
def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, *args, **kwargs):
self.agent_attributes["http.url"] = self.http_url
attrs = super().span_event(*args, **kwargs)
i_attrs = attrs[0]

i_attrs = base_attrs and base_attrs.copy() or attr_class()
i_attrs["category"] = "http"
i_attrs["span.kind"] = "client"
_, i_attrs["component"] = attribute.process_user_attribute("component", self.library)

if self.method:
_, i_attrs["http.method"] = attribute.process_user_attribute("http.method", self.method)

return attrs
return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs)

Check failure on line 183 in newrelic/core/external_node.py

View workflow job for this annotation

GitHub Actions / MegaLinter

Ruff (B026)

newrelic/core/external_node.py:183:113: B026 Star-arg unpacking after a keyword argument is strongly discouraged
8 changes: 3 additions & 5 deletions newrelic/core/function_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,8 @@
start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=self.label
)

def span_event(self, *args, **kwargs):
attrs = super().span_event(*args, **kwargs)
i_attrs = attrs[0]

def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, *args, **kwargs):
i_attrs = base_attrs and base_attrs.copy() or attr_class()
i_attrs["name"] = f"{self.group}/{self.name}"

return attrs
return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs)

Check failure on line 121 in newrelic/core/function_node.py

View workflow job for this annotation

GitHub Actions / MegaLinter

Ruff (B026)

newrelic/core/function_node.py:121:113: B026 Star-arg unpacking after a keyword argument is strongly discouraged
8 changes: 3 additions & 5 deletions newrelic/core/loop_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,8 @@
start_time=start_time, end_time=end_time, name=name, params=params, children=children, label=None
)

def span_event(self, *args, **kwargs):
attrs = super().span_event(*args, **kwargs)
i_attrs = attrs[0]

def span_event(self, settings, base_attrs=None, parent_guid=None, attr_class=dict, *args, **kwargs):
i_attrs = base_attrs and base_attrs.copy() or attr_class()
i_attrs["name"] = f"EventLoop/Wait/{self.name}"

return attrs
return super().span_event(settings, base_attrs=i_attrs, parent_guid=parent_guid, attr_class=attr_class, *args, **kwargs)

Check failure on line 86 in newrelic/core/loop_node.py

View workflow job for this annotation

GitHub Actions / MegaLinter

Ruff (B026)

newrelic/core/loop_node.py:86:113: B026 Star-arg unpacking after a keyword argument is strongly discouraged
Loading
Loading