Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit 88a9cae

Browse files
author
Amir Blum
authored
fix(instrumentation-kafkajs): support non promise return value from consumer callbacks (#99)
1 parent 12cfb28 commit 88a9cae

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

packages/instrumentation-kafkajs/src/kafkajs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase<typeof kafkaJs>
218218
}
219219

220220
private _endSpansOnPromise<T>(spans: Span[], sendPromise: Promise<T>): Promise<T> {
221-
return sendPromise
221+
return Promise.resolve(sendPromise)
222222
.catch((reason) => {
223223
let errorMessage;
224224
if (typeof reason === 'string') errorMessage = reason;

packages/instrumentation-kafkajs/test/kafkajs.spec.ts

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -422,12 +422,12 @@ describe('instrumentation-kafkajs', () => {
422422
consumer = kafka.consumer({
423423
groupId: 'testing-group-id',
424424
});
425-
consumer.run({
426-
eachMessage: async (payload: EachMessagePayload): Promise<void> => {},
427-
});
428425
});
429426

430427
it('consume eachMessage create span with expected attributes', async () => {
428+
consumer.run({
429+
eachMessage: async (_payload: EachMessagePayload): Promise<void> => {},
430+
});
431431
const payload: EachMessagePayload = createEachMessagePayload();
432432
await runConfig.eachMessage(payload);
433433

@@ -443,6 +443,21 @@ describe('instrumentation-kafkajs', () => {
443443
expect(span.attributes[MessagingAttribute.MESSAGING_DESTINATION]).toStrictEqual('topic-name-1');
444444
expect(span.attributes[MessagingAttribute.MESSAGING_OPERATION]).toStrictEqual('process');
445445
});
446+
447+
it('consumer eachMessage with non promise return value', async () => {
448+
consumer.run({
449+
// the usecase of kafkajs callback not returning promise
450+
// is not typescript valid, but it might (and is) implemented in real life (nestjs)
451+
// and does not break the library.
452+
// @ts-ignore
453+
eachMessage: (_payload: EachMessagePayload) => {},
454+
});
455+
const payload: EachMessagePayload = createEachMessagePayload();
456+
await runConfig.eachMessage(payload);
457+
458+
const spans = memoryExporter.getFinishedSpans();
459+
expect(spans.length).toBe(1);
460+
});
446461
});
447462

448463
describe('successful consumer hook', () => {
@@ -592,12 +607,12 @@ describe('instrumentation-kafkajs', () => {
592607
consumer = kafka.consumer({
593608
groupId: 'testing-group-id',
594609
});
595-
consumer.run({
596-
eachBatch: async (payload: EachBatchPayload): Promise<void> => {},
597-
});
598610
});
599611

600612
it('consume eachBatch create span with expected attributes', async () => {
613+
consumer.run({
614+
eachBatch: async (payload: EachBatchPayload): Promise<void> => {},
615+
});
601616
const payload: EachBatchPayload = createEachBatchPayload();
602617
await runConfig.eachBatch(payload);
603618

@@ -622,6 +637,23 @@ describe('instrumentation-kafkajs', () => {
622637
expect(msg2Span.parentSpanId).toStrictEqual(recvSpan.spanContext.spanId);
623638
expect(msg2Span.attributes[MessagingAttribute.MESSAGING_OPERATION]).toStrictEqual('process');
624639
});
640+
641+
it('consumer eachBatch with non promise return value', async () => {
642+
consumer.run({
643+
// the usecase of kafkajs callback not returning promise
644+
// is not typescript valid, but it might (and is) implemented in real life (nestjs)
645+
// and does not break the library.
646+
// @ts-ignore
647+
eachBatch: async (_payload: EachBatchPayload) => {
648+
return;
649+
},
650+
});
651+
const payload: EachBatchPayload = createEachBatchPayload();
652+
await runConfig.eachBatch(payload);
653+
654+
const spans = memoryExporter.getFinishedSpans();
655+
expect(spans.length).toBe(3);
656+
});
625657
});
626658

627659
describe('moduleVersionAttributeName config', () => {

0 commit comments

Comments
 (0)