1515import collections
1616import logging
1717import threading
18+ import sys
1819
1920try :
2021 from newrelic .core .infinite_tracing_pb2 import AttributeValue , SpanBatch
2526_logger = logging .getLogger (__name__ )
2627
2728
29+ def get_deep_size (obj , seen = None ):
30+ """Recursively calculates the size of an object including nested lists and dicts."""
31+ if seen is None :
32+ seen = set ()
33+ size = - 8 * 3 # Subtract 8 for each of the 3 attribute lists as those don't count.
34+ else :
35+ size = 0
36+
37+ # Avoid recursion for already seen objects (handle circular references)
38+ obj_id = id (obj )
39+ if obj_id in seen :
40+ return 0
41+ seen .add (obj_id )
42+
43+ if isinstance (obj , str ):
44+ size += len (obj )
45+ return size
46+ elif isinstance (obj , float ) or isinstance (obj , int ):
47+ size += 8
48+ return size
49+ elif isinstance (obj , bool ):
50+ size += 1
51+ return size
52+ elif isinstance (obj , dict ):
53+ size += sum (get_deep_size (k , seen ) + get_deep_size (v , seen ) for k , v in obj .items ())
54+ elif isinstance (obj , (list , tuple , set , frozenset )):
55+ size += 8 + sum (get_deep_size (i , seen ) for i in obj )
56+ else :
57+ size += 8
58+
59+ return size
60+
61+
62+ def get_deep_size_protobuf (obj ):
63+ """Recursively calculates the size of an object including nested lists and dicts."""
64+ size = 0
65+ if hasattr (obj , "string_value" ):
66+ size += len (obj .string_value )
67+ return size
68+ elif hasattr (obj , "double_value" ):
69+ size += 8
70+ return size
71+ elif hasattr (obj , "int_value" ):
72+ size += 8
73+ return size
74+ elif hasattr (obj , "bool_value" ):
75+ size += 1
76+ return size
77+
78+ if hasattr (obj , "agent_attributes" ):
79+ size += sum (len (k ) + get_deep_size_protobuf (v ) for k , v in obj .agent_attributes .items ())
80+ if hasattr (obj , "user_attributes" ):
81+ size += sum (len (k ) + get_deep_size_protobuf (v ) for k , v in obj .user_attributes .items ())
82+ if hasattr (obj , "intrinsics" ):
83+ size += sum (len (k ) + get_deep_size_protobuf (v ) for k , v in obj .intrinsics .items ())
84+ else :
85+ size += 8
86+
87+ return size
88+
89+
2890class StreamBuffer :
2991 def __init__ (self , maxlen , batching = False ):
3092 self ._queue = collections .deque (maxlen = maxlen )
3193 self ._notify = self .condition ()
3294 self ._shutdown = False
3395 self ._seen = 0
3496 self ._dropped = 0
97+ self ._bytes = 0
98+ self ._ct_processing_time = 0
3599 self ._settings = None
36100
37101 self .batching = batching
@@ -51,6 +115,8 @@ def put(self, item):
51115 return
52116
53117 self ._seen += 1
118+ _logger .debug (f"{ item .intrinsics ['name' ]} [{ len (item .intrinsics )} , { len (item .user_attributes )} , { len (item .agent_attributes )} ] { get_deep_size_protobuf (item )} " )
119+ self ._bytes += get_deep_size_protobuf (item )
54120
55121 # NOTE: dropped can be over-counted as the queue approaches
56122 # capacity while data is still being transmitted.
@@ -67,8 +133,10 @@ def stats(self):
67133 with self ._notify :
68134 seen , dropped = self ._seen , self ._dropped
69135 self ._seen , self ._dropped = 0 , 0
136+ _bytes , ct_processing_time = self ._bytes , self ._ct_processing_time
137+ self ._bytes , self ._ct_processing_time = 0 , 0
70138
71- return seen , dropped
139+ return seen , dropped , _bytes , ct_processing_time
72140
73141 def __bool__ (self ):
74142 return bool (self ._queue )
0 commit comments