Skip to content

Commit 3424a4b

Browse files
committed
feat(instrumentation-kafkajs): instrument producer.transaction() and wrap transaction send and sendBatch methods
1 parent de22600 commit 3424a4b

File tree

1 file changed

+104
-4
lines changed

1 file changed

+104
-4
lines changed

packages/instrumentation-kafkajs/src/instrumentation.ts

Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,15 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
301301
instrumentation._getProducerSendPatch()
302302
);
303303

304+
if (isWrapped(newProducer.transaction)) {
305+
instrumentation._unwrap(newProducer, 'transaction');
306+
}
307+
instrumentation._wrap(
308+
newProducer,
309+
'transaction',
310+
instrumentation._getProducerTransactionPatch()
311+
);
312+
304313
instrumentation._setKafkaEventListeners(newProducer);
305314

306315
return newProducer;
@@ -503,11 +512,102 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
503512
};
504513
}
505514

515+
private _getProducerTransactionPatch() {
516+
const instrumentation = this;
517+
return (original: Producer['transaction']) => {
518+
return function transaction(
519+
this: Producer,
520+
...args: Parameters<Producer['transaction']>
521+
): ReturnType<Producer['transaction']> {
522+
const transactionSpan = instrumentation.tracer.startSpan(
523+
'transaction',
524+
{
525+
kind: SpanKind.INTERNAL,
526+
}
527+
);
528+
529+
const transactionPromise = original.apply(this, args);
530+
531+
transactionPromise
532+
.then((transaction: kafkaJs.Transaction) => {
533+
const originalSend = transaction.send.bind(transaction);
534+
transaction.send = function send(
535+
this: kafkaJs.Transaction,
536+
...args
537+
) {
538+
return context.with(
539+
trace.setSpan(context.active(), transactionSpan),
540+
() => {
541+
const patched =
542+
instrumentation._getProducerSendPatch()(originalSend);
543+
return patched.apply(this, args);
544+
}
545+
);
546+
};
547+
548+
const originalSendBatch = transaction.sendBatch.bind(transaction);
549+
transaction.sendBatch = function sendBatch(
550+
this: kafkaJs.Transaction,
551+
...args
552+
) {
553+
return context.with(
554+
trace.setSpan(context.active(), transactionSpan),
555+
() => {
556+
const patched =
557+
instrumentation._getProducerSendBatchPatch()(
558+
originalSendBatch
559+
);
560+
return patched.apply(this, args);
561+
}
562+
);
563+
};
564+
565+
const originalCommit = transaction.commit.bind(transaction);
566+
transaction.commit = function commit(
567+
this: kafkaJs.Transaction,
568+
...args
569+
) {
570+
const originCommitPromise = originalCommit.apply(this, args);
571+
return instrumentation._endSpansOnPromise(
572+
[transactionSpan],
573+
[],
574+
originCommitPromise
575+
);
576+
};
577+
578+
const originalAbort = transaction.abort.bind(transaction);
579+
transaction.abort = function abort(
580+
this: kafkaJs.Transaction,
581+
...args
582+
) {
583+
const originAbortPromise = originalAbort.apply(this, args);
584+
return instrumentation._endSpansOnPromise(
585+
[transactionSpan],
586+
[],
587+
originAbortPromise
588+
);
589+
};
590+
})
591+
.catch(err => {
592+
transactionSpan.setStatus({
593+
code: SpanStatusCode.ERROR,
594+
});
595+
transactionSpan.recordException(err);
596+
transactionSpan.end();
597+
});
598+
599+
return transactionPromise;
600+
};
601+
};
602+
}
603+
506604
private _getProducerSendBatchPatch() {
507605
const instrumentation = this;
508-
return (original: Producer['sendBatch']) => {
606+
return (
607+
original: Producer['sendBatch'] | kafkaJs.Transaction['sendBatch']
608+
) => {
509609
return function sendBatch(
510-
this: Producer,
610+
this: kafkaJs.Producer | kafkaJs.Transaction,
511611
...args: Parameters<Producer['sendBatch']>
512612
): ReturnType<Producer['sendBatch']> {
513613
const batch = args[0];
@@ -552,9 +652,9 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
552652

553653
private _getProducerSendPatch() {
554654
const instrumentation = this;
555-
return (original: Producer['send']) => {
655+
return (original: Producer['send'] | kafkaJs.Transaction['send']) => {
556656
return function send(
557-
this: Producer,
657+
this: Producer | kafkaJs.Transaction,
558658
...args: Parameters<Producer['send']>
559659
): ReturnType<Producer['send']> {
560660
const record = args[0];

0 commit comments

Comments
 (0)