1- import { SpanKind , Span , SpanStatusCode , Context , propagation , Link , getSpan , setSpan , context , diag } from '@opentelemetry/api' ;
1+ import {
2+ SpanKind ,
3+ Span ,
4+ SpanStatusCode ,
5+ Context ,
6+ propagation ,
7+ Link ,
8+ getSpan ,
9+ setSpan ,
10+ context ,
11+ diag ,
12+ } from '@opentelemetry/api' ;
213import { ROOT_CONTEXT } from '@opentelemetry/context-base' ;
314import { MessagingAttribute , MessagingOperationName } from '@opentelemetry/semantic-conventions' ;
415import * as kafkaJs from 'kafkajs' ;
@@ -50,7 +61,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase<typeof kafkaJs>
5061
5162 protected patch ( moduleExports : typeof kafkaJs , moduleVersion : string ) {
5263 diag . debug ( 'kafkajs instrumentation: applying patch' ) ;
53- this . moduleVersion = moduleVersion
64+ this . moduleVersion = moduleVersion ;
5465
5566 this . unpatch ( moduleExports ) ;
5667 this . _wrap ( moduleExports ?. Kafka ?. prototype , 'producer' , this . _getProducerPatch . bind ( this ) ) ;
@@ -77,11 +88,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase<typeof kafkaJs>
7788 if ( isWrapped ( newConsumer . run ) ) {
7889 self . _unwrap ( newConsumer , 'run' ) ;
7990 }
80- self . _wrap (
81- newConsumer ,
82- 'run' ,
83- self . _getConsumerRunPatch . bind ( self )
84- ) ;
91+ self . _wrap ( newConsumer , 'run' , self . _getConsumerRunPatch . bind ( self ) ) ;
8592
8693 return newConsumer ;
8794 } ;
@@ -95,20 +102,12 @@ export class KafkaJsInstrumentation extends InstrumentationBase<typeof kafkaJs>
95102 if ( isWrapped ( newProducer . sendBatch ) ) {
96103 self . _unwrap ( newProducer , 'sendBatch' ) ;
97104 }
98- self . _wrap (
99- newProducer ,
100- 'sendBatch' ,
101- self . _getProducerSendBatchPatch . bind ( self )
102- ) ;
105+ self . _wrap ( newProducer , 'sendBatch' , self . _getProducerSendBatchPatch . bind ( self ) ) ;
103106
104107 if ( isWrapped ( newProducer . send ) ) {
105108 self . _unwrap ( newProducer , 'send' ) ;
106109 }
107- self . _wrap (
108- newProducer ,
109- 'send' ,
110- self . _getProducerSendPatch . bind ( self )
111- ) ;
110+ self . _wrap ( newProducer , 'send' , self . _getProducerSendPatch . bind ( self ) ) ;
112111
113112 return newProducer ;
114113 } ;
@@ -121,21 +120,13 @@ export class KafkaJsInstrumentation extends InstrumentationBase<typeof kafkaJs>
121120 if ( isWrapped ( config . eachMessage ) ) {
122121 self . _unwrap ( config , 'eachMessage' ) ;
123122 }
124- self . _wrap (
125- config ,
126- 'eachMessage' ,
127- self . _getConsumerEachMessagePatch . bind ( self )
128- ) ;
123+ self . _wrap ( config , 'eachMessage' , self . _getConsumerEachMessagePatch . bind ( self ) ) ;
129124 }
130125 if ( config ?. eachBatch ) {
131126 if ( isWrapped ( config . eachBatch ) ) {
132127 self . _unwrap ( config , 'eachBatch' ) ;
133128 }
134- self . _wrap (
135- config ,
136- 'eachBatch' ,
137- self . _getConsumerEachBatchPatch . bind ( self )
138- ) ;
129+ self . _wrap ( config , 'eachBatch' , self . _getConsumerEachBatchPatch . bind ( self ) ) ;
139130 }
140131 return original . call ( this , config ) ;
141132 } ;
@@ -206,9 +197,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase<typeof kafkaJs>
206197 const self = this ;
207198 return function ( batch : ProducerBatch ) : Promise < RecordMetadata [ ] > {
208199 const spans : Span [ ] = batch . topicMessages . flatMap ( ( topicMessage ) =>
209- topicMessage . messages . map ( ( message ) =>
210- self . _startProducerSpan ( topicMessage . topic , message )
211- )
200+ topicMessage . messages . map ( ( message ) => self . _startProducerSpan ( topicMessage . topic , message ) )
212201 ) ;
213202
214203 const origSendResult : Promise < RecordMetadata [ ] > = original . apply ( this , arguments ) ;
0 commit comments