1313# limitations under the License.
1414import sys
1515
16+ from kafka .serializer import Serializer
17+
1618from newrelic .api .application import application_instance
19+ from newrelic .api .function_trace import FunctionTraceWrapper
1720from newrelic .api .message_trace import MessageTrace
1821from newrelic .api .message_transaction import MessageTransaction
19- from newrelic .api .time_trace import notice_error
22+ from newrelic .api .time_trace import current_trace , notice_error
2023from newrelic .api .transaction import current_transaction
21- from newrelic .common .object_wrapper import wrap_function_wrapper
24+ from newrelic .common .object_wrapper import (
25+ ObjectProxy ,
26+ function_wrapper ,
27+ wrap_function_wrapper ,
28+ )
2229
2330HEARTBEAT_POLL = "MessageBroker/Kafka/Heartbeat/Poll"
2431HEARTBEAT_SENT = "MessageBroker/Kafka/Heartbeat/Sent"
@@ -47,6 +54,7 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
4754 destination_type = "Topic" ,
4855 destination_name = topic or "Default" ,
4956 source = wrapped ,
57+ terminal = False ,
5058 ) as trace :
5159 dt_headers = [(k , v .encode ("utf-8" )) for k , v in trace .generate_request_headers (transaction )]
5260 headers .extend (dt_headers )
@@ -57,49 +65,6 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
5765 raise
5866
5967
60- def metric_wrapper (metric_name , check_result = False ):
61- def _metric_wrapper (wrapped , instance , args , kwargs ):
62- result = wrapped (* args , ** kwargs )
63-
64- application = application_instance (activate = False )
65- if application :
66- if not check_result or check_result and result :
67- # If the result does not need validated, send metric.
68- # If the result does need validated, ensure it is True.
69- application .record_custom_metric (metric_name , 1 )
70-
71- return result
72-
73- return _metric_wrapper
74-
75-
76- def instrument_kafka_heartbeat (module ):
77- if hasattr (module , "Heartbeat" ):
78- if hasattr (module .Heartbeat , "poll" ):
79- wrap_function_wrapper (module , "Heartbeat.poll" , metric_wrapper (HEARTBEAT_POLL ))
80-
81- if hasattr (module .Heartbeat , "fail_heartbeat" ):
82- wrap_function_wrapper (module , "Heartbeat.fail_heartbeat" , metric_wrapper (HEARTBEAT_FAIL ))
83-
84- if hasattr (module .Heartbeat , "sent_heartbeat" ):
85- wrap_function_wrapper (module , "Heartbeat.sent_heartbeat" , metric_wrapper (HEARTBEAT_SENT ))
86-
87- if hasattr (module .Heartbeat , "received_heartbeat" ):
88- wrap_function_wrapper (module , "Heartbeat.received_heartbeat" , metric_wrapper (HEARTBEAT_RECEIVE ))
89-
90- if hasattr (module .Heartbeat , "session_timeout_expired" ):
91- wrap_function_wrapper (
92- module ,
93- "Heartbeat.session_timeout_expired" ,
94- metric_wrapper (HEARTBEAT_SESSION_TIMEOUT , check_result = True ),
95- )
96-
97- if hasattr (module .Heartbeat , "poll_timeout_expired" ):
98- wrap_function_wrapper (
99- module , "Heartbeat.poll_timeout_expired" , metric_wrapper (HEARTBEAT_POLL_TIMEOUT , check_result = True )
100- )
101-
102-
10368def wrap_kafkaconsumer_next (wrapped , instance , args , kwargs ):
10469 if hasattr (instance , "_nr_transaction" ) and not instance ._nr_transaction .stopped :
10570 instance ._nr_transaction .__exit__ (* sys .exc_info ())
@@ -110,7 +75,12 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
11075 # StopIteration is an expected error, indicating the end of an iterable,
11176 # that should not be captured.
11277 if not isinstance (e , StopIteration ):
113- notice_error ()
78+ if current_transaction ():
79+ # Report error on existing transaction if there is one
80+ notice_error ()
81+ else :
82+ # Report error on application
83+ notice_error (application = application_instance (activate = False ))
11484 raise
11585
11686 if record :
@@ -177,11 +147,126 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
177147 return record
178148
179149
150+ def wrap_KafkaProducer_init (wrapped , instance , args , kwargs ):
151+ get_config_key = lambda key : kwargs .get (key , instance .DEFAULT_CONFIG [key ]) # noqa: E731
152+
153+ kwargs ["key_serializer" ] = wrap_serializer (
154+ instance , "Serialization/Key" , "MessageBroker" , get_config_key ("key_serializer" )
155+ )
156+ kwargs ["value_serializer" ] = wrap_serializer (
157+ instance , "Serialization/Value" , "MessageBroker" , get_config_key ("value_serializer" )
158+ )
159+
160+ return wrapped (* args , ** kwargs )
161+
162+
163+ class NewRelicSerializerWrapper (ObjectProxy ):
164+ def __init__ (self , wrapped , serializer_name , group_prefix ):
165+ ObjectProxy .__init__ .__get__ (self )(wrapped )
166+
167+ self ._nr_serializer_name = serializer_name
168+ self ._nr_group_prefix = group_prefix
169+
170+ def serialize (self , topic , object ):
171+ wrapped = self .__wrapped__ .serialize
172+ args = (topic , object )
173+ kwargs = {}
174+
175+ if not current_transaction ():
176+ return wrapped (* args , ** kwargs )
177+
178+ group = "%s/Kafka/Topic" % self ._nr_group_prefix
179+ name = "Named/%s/%s" % (topic , self ._nr_serializer_name )
180+
181+ return FunctionTraceWrapper (wrapped , name = name , group = group )(* args , ** kwargs )
182+
183+
184+ def wrap_serializer (client , serializer_name , group_prefix , serializer ):
185+ @function_wrapper
186+ def _wrap_serializer (wrapped , instance , args , kwargs ):
187+ transaction = current_transaction ()
188+ if not transaction :
189+ return wrapped (* args , ** kwargs )
190+
191+ topic = "Unknown"
192+ if isinstance (transaction , MessageTransaction ):
193+ topic = transaction .destination_name
194+ else :
195+ # Find parent message trace to retrieve topic
196+ message_trace = current_trace ()
197+ while message_trace is not None and not isinstance (message_trace , MessageTrace ):
198+ message_trace = message_trace .parent
199+ if message_trace :
200+ topic = message_trace .destination_name
201+
202+ group = "%s/Kafka/Topic" % group_prefix
203+ name = "Named/%s/%s" % (topic , serializer_name )
204+
205+ return FunctionTraceWrapper (wrapped , name = name , group = group )(* args , ** kwargs )
206+
207+ try :
208+ # Apply wrapper to serializer
209+ if serializer is None :
210+ # Do nothing
211+ return serializer
212+ elif isinstance (serializer , Serializer ):
213+ return NewRelicSerializerWrapper (serializer , group_prefix = group_prefix , serializer_name = serializer_name )
214+ else :
215+ # Wrap callable in wrapper
216+ return _wrap_serializer (serializer )
217+ except Exception :
218+ return serializer # Avoid crashes from immutable serializers
219+
220+
221+ def metric_wrapper (metric_name , check_result = False ):
222+ def _metric_wrapper (wrapped , instance , args , kwargs ):
223+ result = wrapped (* args , ** kwargs )
224+
225+ application = application_instance (activate = False )
226+ if application :
227+ if not check_result or check_result and result :
228+ # If the result does not need validated, send metric.
229+ # If the result does need validated, ensure it is True.
230+ application .record_custom_metric (metric_name , 1 )
231+
232+ return result
233+
234+ return _metric_wrapper
235+
236+
180237def instrument_kafka_producer (module ):
181238 if hasattr (module , "KafkaProducer" ):
239+ wrap_function_wrapper (module , "KafkaProducer.__init__" , wrap_KafkaProducer_init )
182240 wrap_function_wrapper (module , "KafkaProducer.send" , wrap_KafkaProducer_send )
183241
184242
185243def instrument_kafka_consumer_group (module ):
186244 if hasattr (module , "KafkaConsumer" ):
187- wrap_function_wrapper (module .KafkaConsumer , "__next__" , wrap_kafkaconsumer_next )
245+ wrap_function_wrapper (module , "KafkaConsumer.__next__" , wrap_kafkaconsumer_next )
246+
247+
248+ def instrument_kafka_heartbeat (module ):
249+ if hasattr (module , "Heartbeat" ):
250+ if hasattr (module .Heartbeat , "poll" ):
251+ wrap_function_wrapper (module , "Heartbeat.poll" , metric_wrapper (HEARTBEAT_POLL ))
252+
253+ if hasattr (module .Heartbeat , "fail_heartbeat" ):
254+ wrap_function_wrapper (module , "Heartbeat.fail_heartbeat" , metric_wrapper (HEARTBEAT_FAIL ))
255+
256+ if hasattr (module .Heartbeat , "sent_heartbeat" ):
257+ wrap_function_wrapper (module , "Heartbeat.sent_heartbeat" , metric_wrapper (HEARTBEAT_SENT ))
258+
259+ if hasattr (module .Heartbeat , "received_heartbeat" ):
260+ wrap_function_wrapper (module , "Heartbeat.received_heartbeat" , metric_wrapper (HEARTBEAT_RECEIVE ))
261+
262+ if hasattr (module .Heartbeat , "session_timeout_expired" ):
263+ wrap_function_wrapper (
264+ module ,
265+ "Heartbeat.session_timeout_expired" ,
266+ metric_wrapper (HEARTBEAT_SESSION_TIMEOUT , check_result = True ),
267+ )
268+
269+ if hasattr (module .Heartbeat , "poll_timeout_expired" ):
270+ wrap_function_wrapper (
271+ module , "Heartbeat.poll_timeout_expired" , metric_wrapper (HEARTBEAT_POLL_TIMEOUT , check_result = True )
272+ )
0 commit comments