Skip to content

Commit c0593e6

Browse files
authored
feat(kafkajs): instrument transaction send and sendBatch (#2939)
1 parent 9d90781 commit c0593e6

File tree

3 files changed

+786
-199
lines changed

3 files changed

+786
-199
lines changed

packages/instrumentation-kafkajs/src/instrumentation.ts

Lines changed: 119 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
289289
instrumentation._wrap(
290290
newProducer,
291291
'sendBatch',
292-
instrumentation._getProducerSendBatchPatch()
292+
instrumentation._getSendBatchPatch()
293293
);
294294

295295
if (isWrapped(newProducer.send)) {
@@ -298,7 +298,16 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
298298
instrumentation._wrap(
299299
newProducer,
300300
'send',
301-
instrumentation._getProducerSendPatch()
301+
instrumentation._getSendPatch()
302+
);
303+
304+
if (isWrapped(newProducer.transaction)) {
305+
instrumentation._unwrap(newProducer, 'transaction');
306+
}
307+
instrumentation._wrap(
308+
newProducer,
309+
'transaction',
310+
instrumentation._getProducerTransactionPatch()
302311
);
303312

304313
instrumentation._setKafkaEventListeners(newProducer);
@@ -503,11 +512,113 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
503512
};
504513
}
505514

506-
private _getProducerSendBatchPatch() {
515+
private _getProducerTransactionPatch() {
507516
const instrumentation = this;
508-
return (original: Producer['sendBatch']) => {
509-
return function sendBatch(
517+
return (original: Producer['transaction']) => {
518+
return function transaction(
510519
this: Producer,
520+
...args: Parameters<Producer['transaction']>
521+
): ReturnType<Producer['transaction']> {
522+
const transactionSpan = instrumentation.tracer.startSpan('transaction');
523+
524+
const transactionPromise = original.apply(this, args);
525+
526+
transactionPromise
527+
.then((transaction: kafkaJs.Transaction) => {
528+
const originalSend = transaction.send;
529+
transaction.send = function send(
530+
this: kafkaJs.Transaction,
531+
...args
532+
) {
533+
return context.with(
534+
trace.setSpan(context.active(), transactionSpan),
535+
() => {
536+
const patched = instrumentation._getSendPatch()(originalSend);
537+
return patched.apply(this, args).catch(err => {
538+
transactionSpan.setStatus({
539+
code: SpanStatusCode.ERROR,
540+
message: err?.message,
541+
});
542+
transactionSpan.recordException(err);
543+
throw err;
544+
});
545+
}
546+
);
547+
};
548+
549+
const originalSendBatch = transaction.sendBatch;
550+
transaction.sendBatch = function sendBatch(
551+
this: kafkaJs.Transaction,
552+
...args
553+
) {
554+
return context.with(
555+
trace.setSpan(context.active(), transactionSpan),
556+
() => {
557+
const patched =
558+
instrumentation._getSendBatchPatch()(originalSendBatch);
559+
return patched.apply(this, args).catch(err => {
560+
transactionSpan.setStatus({
561+
code: SpanStatusCode.ERROR,
562+
message: err?.message,
563+
});
564+
transactionSpan.recordException(err);
565+
throw err;
566+
});
567+
}
568+
);
569+
};
570+
571+
const originalCommit = transaction.commit;
572+
transaction.commit = function commit(
573+
this: kafkaJs.Transaction,
574+
...args
575+
) {
576+
const originCommitPromise = originalCommit
577+
.apply(this, args)
578+
.then(() => {
579+
transactionSpan.setStatus({ code: SpanStatusCode.OK });
580+
});
581+
return instrumentation._endSpansOnPromise(
582+
[transactionSpan],
583+
[],
584+
originCommitPromise
585+
);
586+
};
587+
588+
const originalAbort = transaction.abort;
589+
transaction.abort = function abort(
590+
this: kafkaJs.Transaction,
591+
...args
592+
) {
593+
const originAbortPromise = originalAbort.apply(this, args);
594+
return instrumentation._endSpansOnPromise(
595+
[transactionSpan],
596+
[],
597+
originAbortPromise
598+
);
599+
};
600+
})
601+
.catch(err => {
602+
transactionSpan.setStatus({
603+
code: SpanStatusCode.ERROR,
604+
message: err?.message,
605+
});
606+
transactionSpan.recordException(err);
607+
transactionSpan.end();
608+
});
609+
610+
return transactionPromise;
611+
};
612+
};
613+
}
614+
615+
private _getSendBatchPatch() {
616+
const instrumentation = this;
617+
return (
618+
original: Producer['sendBatch'] | kafkaJs.Transaction['sendBatch']
619+
) => {
620+
return function sendBatch(
621+
this: kafkaJs.Producer | kafkaJs.Transaction,
511622
...args: Parameters<Producer['sendBatch']>
512623
): ReturnType<Producer['sendBatch']> {
513624
const batch = args[0];
@@ -550,11 +661,11 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
550661
};
551662
}
552663

553-
private _getProducerSendPatch() {
664+
private _getSendPatch() {
554665
const instrumentation = this;
555-
return (original: Producer['send']) => {
666+
return (original: Producer['send'] | kafkaJs.Transaction['send']) => {
556667
return function send(
557-
this: Producer,
668+
this: Producer | kafkaJs.Transaction,
558669
...args: Parameters<Producer['send']>
559670
): ReturnType<Producer['send']> {
560671
const record = args[0];

0 commit comments

Comments
 (0)