Skip to content

Commit 0192178

Browse files
committed
feat(kafkajs-instrumentation): propagate send/sendBatch errors to transaction span, rename assert helper, and improve test structure
1 parent e060f37 commit 0192178

File tree

3 files changed

+112
-81
lines changed

3 files changed

+112
-81
lines changed

packages/instrumentation-kafkajs/src/instrumentation.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,14 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
540540
() => {
541541
const patched =
542542
instrumentation._getProducerSendPatch()(originalSend);
543-
return patched.apply(this, args);
543+
return patched.apply(this, args).catch(err => {
544+
transactionSpan.setStatus({
545+
code: SpanStatusCode.ERROR,
546+
message: err?.message,
547+
});
548+
transactionSpan.recordException(err);
549+
throw err;
550+
});
544551
}
545552
);
546553
};
@@ -557,7 +564,14 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
557564
instrumentation._getProducerSendBatchPatch()(
558565
originalSendBatch
559566
);
560-
return patched.apply(this, args);
567+
return patched.apply(this, args).catch(err => {
568+
transactionSpan.setStatus({
569+
code: SpanStatusCode.ERROR,
570+
message: err?.message,
571+
});
572+
transactionSpan.recordException(err);
573+
throw err;
574+
});
561575
}
562576
);
563577
};
@@ -591,6 +605,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
591605
.catch(err => {
592606
transactionSpan.setStatus({
593607
code: SpanStatusCode.ERROR,
608+
message: err?.message,
594609
});
595610
transactionSpan.recordException(err);
596611
transactionSpan.end();

packages/instrumentation-kafkajs/test/kafkajs.test.ts

Lines changed: 93 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ import {
7373
ATTR_SERVER_PORT,
7474
} from '@opentelemetry/semantic-conventions';
7575
import {
76-
assertFailedSendSpansGroupedByTopic,
76+
assertFailedSendSpans,
7777
assertMetricCollection,
78-
assertSuccessfulSendSpansGroupedByTopic,
78+
assertSuccessfulSendSpans,
7979
haveSameTraceId,
8080
} from './utils';
8181

@@ -255,7 +255,7 @@ describe('instrumentation-kafkajs', () => {
255255
assert.strictEqual(spans.length, 1);
256256
const span = spans[0];
257257
assert.strictEqual(span.name, 'send topic-name-1');
258-
await assertSuccessfulSendSpansGroupedByTopic({
258+
await assertSuccessfulSendSpans({
259259
spans: [span],
260260
metricReader,
261261
expectedMetrics: [
@@ -295,7 +295,7 @@ describe('instrumentation-kafkajs', () => {
295295
const span = spans[0];
296296
assert.strictEqual(span.name, 'send topic-name-1');
297297

298-
await assertSuccessfulSendSpansGroupedByTopic({
298+
await assertSuccessfulSendSpans({
299299
spans: [span],
300300
metricReader,
301301
expectedMetrics: [
@@ -431,25 +431,12 @@ describe('instrumentation-kafkajs', () => {
431431
spans[i] as ReadableSpan
432432
);
433433
}
434-
assertMetricCollection(await metricReader.collect(), {
435-
[METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [
436-
{
437-
value: 2,
438-
attributes: {
439-
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
440-
[ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1',
441-
[ATTR_MESSAGING_OPERATION_NAME]: 'send',
442-
},
443-
},
444-
{
445-
value: 1,
446-
attributes: {
447-
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
448-
[ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-2',
449-
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1',
450-
[ATTR_MESSAGING_OPERATION_NAME]: 'send',
451-
},
452-
},
434+
assertSuccessfulSendSpans({
435+
spans,
436+
metricReader,
437+
expectedMetrics: [
438+
{ topic: 'topic-name-1', value: 2 },
439+
{ topic: 'topic-name-2', value: 1, partitionId: '1' },
453440
],
454441
});
455442
});
@@ -482,7 +469,7 @@ describe('instrumentation-kafkajs', () => {
482469
const spans = getTestSpans();
483470
assert.strictEqual(spans.length, 1);
484471
const span = spans[0];
485-
await assertFailedSendSpansGroupedByTopic({
472+
await assertFailedSendSpans({
486473
spans: [span],
487474
metricReader,
488475
errorMessage: 'error thrown from kafka client send',
@@ -509,7 +496,7 @@ describe('instrumentation-kafkajs', () => {
509496

510497
const spans = getTestSpans();
511498
assert.strictEqual(spans.length, 2);
512-
await assertFailedSendSpansGroupedByTopic({
499+
await assertFailedSendSpans({
513500
spans: spans,
514501
metricReader,
515502
errorMessage: 'error thrown from kafka client send',
@@ -548,7 +535,7 @@ describe('instrumentation-kafkajs', () => {
548535

549536
const spans = getTestSpans();
550537
assert.strictEqual(spans.length, 3);
551-
await assertFailedSendSpansGroupedByTopic({
538+
await assertFailedSendSpans({
552539
spans,
553540
metricReader,
554541
errorMessage: 'error thrown from kafka client send',
@@ -666,13 +653,17 @@ describe('instrumentation-kafkajs', () => {
666653
await tx.commit();
667654

668655
const spans = getTestSpans();
669-
const transactionSpan = spans.find(s => s.name === 'transaction')!;
656+
const transactionSpan = spans.find(s => s.name === 'transaction');
670657
const sendSpans = spans.filter(s => s.name === 'send topic-name-1');
671-
658+
assert.ok(transactionSpan)
672659
assert.strictEqual(spans.length, 3);
673660
assert.strictEqual(transactionSpan.kind, SpanKind.INTERNAL);
674661
assert.strictEqual(transactionSpan.status.code, SpanStatusCode.UNSET);
675-
await assertSuccessfulSendSpansGroupedByTopic({
662+
663+
assert.strictEqual(sendSpans.length, 2);
664+
assert.strictEqual(sendSpans[0].name, 'send topic-name-1')
665+
assert.strictEqual(sendSpans[1].name, 'send topic-name-1')
666+
await assertSuccessfulSendSpans({
676667
spans: sendSpans,
677668
metricReader,
678669
expectedMetrics: [{ topic: 'topic-name-1', value: 2 }],
@@ -707,8 +698,10 @@ describe('instrumentation-kafkajs', () => {
707698
await assert.rejects(tx.commit());
708699

709700
const spans = getTestSpans();
710-
const transactionSpan = spans.find(s => s.name === 'transaction')!;
711-
const sendSpan = spans.find(s => s.name.startsWith('send'))!;
701+
const transactionSpan = spans.find(s => s.name === 'transaction');
702+
const sendSpan = spans.find(s => s.name.startsWith('send'));
703+
assert.ok(transactionSpan)
704+
assert.ok(sendSpan)
712705
assert.strictEqual(transactionSpan.kind, SpanKind.INTERNAL);
713706
assertSpanHasParent(transactionSpan, sendSpan);
714707
assert.strictEqual(transactionSpan.status.code, SpanStatusCode.ERROR);
@@ -717,7 +710,7 @@ describe('instrumentation-kafkajs', () => {
717710
'error thrown from kafka client transaction commit'
718711
);
719712
assert.strictEqual(sendSpan.status.code, SpanStatusCode.UNSET);
720-
713+
assert.strictEqual(sendSpan.name, 'send topic-name-1')
721714
expectKafkaHeadersToMatchSpanContext(
722715
messagesSent[0],
723716
sendSpan as ReadableSpan
@@ -727,26 +720,30 @@ describe('instrumentation-kafkajs', () => {
727720
});
728721

729722
describe('transaction abort', () => {
730-
it('sets both spans to unset when abort succeeds', async () => {
723+
it('spans remain unset when abort succeeds', async () => {
731724
const producer = prepareTestProducer();
732725
const tx = await producer.transaction();
733726
await tx.send({ topic: 'topic-name-1', messages: [{ value: 'a' }] });
734727

735728
await tx.abort();
736729

737730
const spans = getTestSpans();
738-
const [transactionSpan, send] = [
739-
spans.find(s => s.name === 'transaction')!,
740-
spans.find(s => s.name === 'send topic-name-1')!,
731+
const [transactionSpan, sendSpan] = [
732+
spans.find(s => s.name === 'transaction'),
733+
spans.find(s => s.name === 'send topic-name-1'),
741734
];
742-
assertSpanHasParent(transactionSpan, send);
735+
assert.ok(transactionSpan)
736+
assert.ok(sendSpan)
737+
assertSpanHasParent(transactionSpan, sendSpan);
743738
expectKafkaHeadersToMatchSpanContext(
744739
messagesSent[0],
745-
send as ReadableSpan
740+
sendSpan as ReadableSpan
746741
);
747742
assert.strictEqual(transactionSpan.kind, SpanKind.INTERNAL);
748743
assert.strictEqual(transactionSpan.status.code, SpanStatusCode.UNSET);
749-
assert.strictEqual(send.status.code, SpanStatusCode.UNSET);
744+
assert.strictEqual(sendSpan.status.code, SpanStatusCode.UNSET);
745+
assert.strictEqual(sendSpan.name, 'send topic-name-1');
746+
750747
assert.ok(haveSameTraceId(spans));
751748
});
752749

@@ -763,18 +760,20 @@ describe('instrumentation-kafkajs', () => {
763760
await assert.rejects(tx.abort());
764761

765762
const spans = getTestSpans();
766-
const [transactionSpan, send] = [
767-
spans.find(s => s.name === 'transaction')!,
768-
spans.find(s => s.name.startsWith('send'))!,
763+
const [transactionSpan, sendSpan] = [
764+
spans.find(s => s.name === 'transaction'),
765+
spans.find(s => s.name.startsWith('send')),
769766
];
770-
assertSpanHasParent(transactionSpan, send);
767+
assert.ok(transactionSpan)
768+
assert.ok(sendSpan)
769+
assertSpanHasParent(transactionSpan, sendSpan);
771770
assert.strictEqual(transactionSpan.kind, SpanKind.INTERNAL);
772771
assert.strictEqual(transactionSpan.status.code, SpanStatusCode.ERROR);
773772
assert.strictEqual(
774773
transactionSpan.status.message,
775774
'error thrown from kafka client transaction abort'
776775
);
777-
assert.strictEqual(send.status.code, SpanStatusCode.UNSET);
776+
assert.strictEqual(sendSpan.status.code, SpanStatusCode.UNSET);
778777
assert.ok(haveSameTraceId(spans));
779778
});
780779
});
@@ -797,10 +796,11 @@ describe('instrumentation-kafkajs', () => {
797796
await tx.commit();
798797

799798
const spans = getTestSpans();
800-
const parent = spans.find(s => s.name === 'transaction')!;
801-
const sends = spans.filter(s => s.name.startsWith('send'));
802-
803-
sends.forEach(s => assertSpanHasParent(parent, s));
799+
const transactionSpan = spans.find(s => s.name === 'transaction');
800+
const sendSpans = spans.filter(s => s.name.startsWith('send'));
801+
assert.ok(transactionSpan)
802+
assert.ok(sendSpans)
803+
sendSpans.forEach(s => assertSpanHasParent(transactionSpan, s));
804804
assert.ok(haveSameTraceId(spans));
805805
});
806806

@@ -816,19 +816,23 @@ describe('instrumentation-kafkajs', () => {
816816
],
817817
});
818818
await tx.commit();
819-
820819
const spans = getTestSpans();
821-
const parent = spans.find(s => s.name === 'transaction')!;
822-
const sends = spans.filter(s => s.name.startsWith('send'));
823-
assertSuccessfulSendSpansGroupedByTopic({
824-
spans: sends,
820+
const transactionSpan = spans.find(s => s.name === 'transaction');
821+
const sendSpans = spans.filter(s => s.name.startsWith('send'));
822+
assert.ok(transactionSpan)
823+
assert.strictEqual(sendSpans.length, 3)
824+
assert.strictEqual(spans[0].name, 'send topic-name-1');
825+
assert.strictEqual(spans[1].name, 'send topic-name-1');
826+
assert.strictEqual(spans[2].name, 'send topic-name-2');
827+
assertSuccessfulSendSpans({
828+
spans: sendSpans,
825829
metricReader,
826830
expectedMetrics: [
827831
{ topic: 'topic-name-1', value: 2 },
828832
{ topic: 'topic-name-2', value: 1 },
829833
],
830834
});
831-
sends.forEach(s => assertSpanHasParent(parent, s));
835+
sendSpans.forEach(s => assertSpanHasParent(transactionSpan, s));
832836
assert.ok(haveSameTraceId(spans));
833837
});
834838
});
@@ -844,32 +848,37 @@ describe('instrumentation-kafkajs', () => {
844848
producer = kafka.producer();
845849
});
846850

847-
it('sets send span to error when send fails', async () => {
851+
it('sets send span and transaction span to error when send fails', async () => {
848852
const tx = await producer.transaction();
849853
await assert.rejects(
850854
tx.send({ topic: 'topic-name-1', messages: [{ value: 'oops' }] })
851855
);
852856
await tx.abort();
853857

854858
const spans = getTestSpans();
855-
const [transactionSpan, send] = [
856-
spans.find(s => s.name === 'transaction')!,
857-
spans.find(s => s.name.startsWith('send'))!,
859+
const [transactionSpan, sendSpan] = [
860+
spans.find(s => s.name === 'transaction'),
861+
spans.find(s => s.name === 'send topic-name-1'),
858862
];
859-
assertSpanHasParent(transactionSpan, send);
860-
await assertFailedSendSpansGroupedByTopic({
861-
spans: [send],
863+
assert.ok(transactionSpan)
864+
assert.ok(sendSpan)
865+
const errorMessage =
866+
'error thrown from kafka client transaction send';
867+
assertSpanHasParent(transactionSpan, sendSpan);
868+
await assertFailedSendSpans({
869+
spans: [sendSpan],
862870
metricReader,
863-
errorMessage: 'error thrown from kafka client transaction send',
871+
errorMessage,
864872
expectedTopicCounts: {
865873
'topic-name-1': 1,
866874
},
867875
});
868-
assert.strictEqual(transactionSpan.status.code, SpanStatusCode.UNSET);
876+
assert.strictEqual(transactionSpan.status.code, SpanStatusCode.ERROR);
877+
assert.strictEqual(transactionSpan.status.message, errorMessage);
869878
assert.ok(haveSameTraceId(spans));
870879
});
871880

872-
it('sets all sendBatch spans to error when sendBatch fails', async () => {
881+
it('sets all sendBatch spans and transaction span to error when sendBatch fails', async () => {
873882
const tx = await producer.transaction();
874883
await assert.rejects(
875884
tx.sendBatch({
@@ -884,20 +893,26 @@ describe('instrumentation-kafkajs', () => {
884893
await tx.abort();
885894

886895
const spans = getTestSpans();
887-
const parent = spans.find(s => s.name === 'transaction')!;
888-
const sends = spans.filter(s => s.name === 'send topic-name-1');
889-
await assertFailedSendSpansGroupedByTopic({
890-
spans: sends,
896+
const transactionSpan = spans.find(s => s.name === 'transaction');
897+
const sendSpans = spans.filter(s => s.name === 'send topic-name-1');
898+
assert.ok(transactionSpan)
899+
assert.ok(sendSpans)
900+
await assertFailedSendSpans({
901+
spans: sendSpans,
891902
metricReader,
892903
errorMessage:
893904
'error thrown from kafka client transaction sendBatch',
894905
expectedTopicCounts: {
895906
'topic-name-1': 2,
896907
},
897908
});
898-
sends.forEach(s => {
899-
assertSpanHasParent(parent, s);
909+
const errorMessage =
910+
'error thrown from kafka client transaction sendBatch';
911+
sendSpans.forEach(s => {
912+
assertSpanHasParent(transactionSpan, s);
900913
});
914+
assert.strictEqual(transactionSpan.status.code, SpanStatusCode.ERROR);
915+
assert.strictEqual(transactionSpan.status.message, errorMessage);
901916
assert.ok(haveSameTraceId(spans));
902917
});
903918
});
@@ -919,19 +934,22 @@ describe('instrumentation-kafkajs', () => {
919934
);
920935

921936
const spans = getTestSpans();
922-
assert.equal(spans.length, 1);
937+
assert.strictEqual(spans.length, 1);
923938

924-
const transactionSpan = spans.find(s => s.name === 'transaction')!;
939+
const transactionSpan = spans.find(s => s.name === 'transaction');
925940
assert.ok(transactionSpan);
926-
assert.equal(transactionSpan.kind, SpanKind.INTERNAL);
927-
assert.equal(transactionSpan.status.code, SpanStatusCode.ERROR);
941+
assert.strictEqual(transactionSpan.kind, SpanKind.INTERNAL);
942+
assert.strictEqual(transactionSpan.status.code, SpanStatusCode.ERROR);
928943

929944
assert.strictEqual(
930945
transactionSpan.status.code,
931946
SpanStatusCode.ERROR,
932947
`Expected transactionSpan status.code to be ERROR`
933948
);
934-
949+
assert.strictEqual(
950+
transactionSpan.status.message,
951+
'error thrown from kafka client transaction'
952+
);
935953
const exceptionEvent = transactionSpan.events.find(
936954
e => e.name === 'exception'
937955
);

0 commit comments

Comments
 (0)