3636from elasticapm .conf import constants
3737from elasticapm .instrumentation .packages .base import AbstractInstrumentedModule
3838from elasticapm .traces import DroppedSpan , capture_span , execution_context
39- from elasticapm .utils .disttracing import TraceParent
39+ from elasticapm .utils .disttracing import TraceParent , TracingOptions
4040
4141
4242class KafkaInstrumentation (AbstractInstrumentedModule ):
@@ -48,6 +48,7 @@ class KafkaInstrumentation(AbstractInstrumentedModule):
4848 ]
4949 provider_name = "kafka"
5050 name = "kafka"
51+ creates_transactions = True
5152
5253 def _trace_send (self , instance , wrapped , * args , destination_info = None , ** kwargs ):
5354 topic = args [0 ] if args else kwargs ["topic" ]
@@ -68,7 +69,10 @@ def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs)
6869 ) as span :
6970 transaction = execution_context .get_transaction ()
7071 if transaction :
71- tp = transaction .trace_parent .copy_from (span_id = span .id )
72+ tp = transaction .trace_parent .copy_from (
73+ span_id = span .id if span else transaction .id ,
74+ trace_options = None if span else TracingOptions (recorded = False ),
75+ )
7276 if headers :
7377 headers .append ((constants .TRACEPARENT_BINARY_HEADER_NAME , tp .to_binary ()))
7478 else :
@@ -79,22 +83,17 @@ def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs)
7983 else :
8084 kwargs ["headers" ] = headers
8185 result = wrapped (* args , ** kwargs )
82- if instance and instance ._metadata .controller and not isinstance (span , DroppedSpan ):
86+ if span and instance and instance ._metadata .controller and not isinstance (span , DroppedSpan ):
8387 address = instance ._metadata .controller [1 ]
8488 port = instance ._metadata .controller [2 ]
8589 span .context ["destination" ]["address" ] = address
8690 span .context ["destination" ]["port" ] = port
8791 return result
8892
89- def call_if_sampling (self , module , method , wrapped , instance , args , kwargs ):
90- # Contrasting to the superclass implementation, we *always* want to
91- # return a proxied connection, even if there is no ongoing elasticapm
92- # transaction yet. This ensures that we instrument the cursor once
93- # the transaction started.
94- return self .call (module , method , wrapped , instance , args , kwargs )
95-
9693 def call (self , module , method , wrapped , instance , args , kwargs ):
9794 client = get_client ()
95+ if client is None :
96+ return wrapped (* args , ** kwargs )
9897 destination_info = {
9998 "service" : {"name" : "kafka" , "resource" : "kafka/" , "type" : "messaging" },
10099 }
@@ -118,7 +117,7 @@ def call(self, module, method, wrapped, instance, args, kwargs):
118117 "destination" : destination_info ,
119118 },
120119 ) as span :
121- if not isinstance (span , DroppedSpan ) and instance ._subscription .subscription :
120+ if span and not isinstance (span , DroppedSpan ) and instance ._subscription .subscription :
122121 span .name += " from " + ", " .join (sorted (instance ._subscription .subscription ))
123122 results = wrapped (* args , ** kwargs )
124123 return results
@@ -146,7 +145,7 @@ def call(self, module, method, wrapped, instance, args, kwargs):
146145 except StopIteration :
147146 span .cancel ()
148147 raise
149- if not isinstance (span , DroppedSpan ):
148+ if span and not isinstance (span , DroppedSpan ):
150149 topic = result [0 ]
151150 if client .should_ignore_topic (topic ):
152151 span .cancel ()
0 commit comments