Skip to content

Commit 3862b63

Browse files
committed
test: add test for kafkajs tracing
1 parent c3daa4e commit 3862b63

File tree

15 files changed

+545
-61
lines changed

15 files changed

+545
-61
lines changed

eslint.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ module.exports = tsEsLint.config(
4545
2,
4646
],
4747
'mocha/no-exclusive-tests': 'error',
48+
'no-case-declarations': 'off',
4849
'simpleImportSort/exports': 'error',
4950
'unusedImports/no-unused-imports': 'error',
5051
},

src/init.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
1313
import { NodeSDK, NodeSDKConfiguration } from '@opentelemetry/sdk-node';
1414
import { BatchSpanProcessor, SpanProcessor } from '@opentelemetry/sdk-trace-base';
1515
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-node';
16-
import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs';
1716

1817
import PodUidDetector from './detectors/node/opentelemetry-resource-detector-kubernetes-pod';
1918
import ServiceNameFallbackDetector from './detectors/node/opentelemetry-resource-detector-service-name-fallback';
2019
import { FileSpanExporter } from './util/FileSpanExporter';
2120
import { hasOptedIn, hasOptedOut, parseNumericEnvironmentVariableWithDefault } from './util/environment';
21+
import { kafkaJsInstrumentation } from './util/kafkajs';
2222

2323
const logPrefix = 'Dash0 OpenTelemetry distribution for Node.js:';
2424
const debugOutput = hasOptedIn('DASH0_DEBUG');
@@ -67,7 +67,7 @@ const configuration: Partial<NodeSDKConfiguration> = {
6767
instrumentations: [
6868
//
6969
getNodeAutoInstrumentations(createInstrumentationConfig()),
70-
new KafkaJsInstrumentation(),
70+
kafkaJsInstrumentation,
7171
],
7272
resource: resource(),
7373
resourceDetectors: resourceDetectors(),

src/util/kafkajs.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs';
5+
6+
export const kafkaJsInstrumentation = new KafkaJsInstrumentation();

test/apps/empty-event-loop/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"name": "dash0-app-under-test-empty-event-loop",
33
"version": "1.0.0",
4+
"private": true,
45
"description": "",
56
"main": "app.ts",
67
"scripts": {

test/apps/express-typescript/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"name": "dash0-app-under-test-express-typescript",
33
"version": "1.0.0",
4+
"private": true,
45
"description": "",
56
"main": "app.ts",
67
"scripts": {

test/apps/kafkajs/app.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
import { inspect } from 'node:util';
5+
import * as kafkajs from 'kafkajs';
6+
import {
7+
Consumer,
8+
ConsumerRunConfig,
9+
EachMessagePayload,
10+
Kafka,
11+
KafkaJSError,
12+
KafkaMessage,
13+
Producer,
14+
RecordMetadata,
15+
} from 'kafkajs';
16+
import { IpcRequest } from '../../util/ipc';
17+
import sendToParentProcess, { sendReadyToParentProcess } from '../../util/sendToParentProcess';
18+
19+
// for testing purposes, we need a reference to the kafakjs instrumentation
20+
import { kafkaJsInstrumentation } from '../../../src/util/kafkajs';
21+
22+
const kafka = new Kafka({
23+
clientId: 'dash0-kafkajs-tests',
24+
brokers: ['dummy:1302'],
25+
});
26+
27+
let producer: Producer;
28+
let consumer: Consumer;
29+
let runConfig: ConsumerRunConfig | undefined;
30+
31+
process.on('message', async message => {
32+
const ipcRequest = <IpcRequest>message;
33+
const command = ipcRequest.command;
34+
const id = ipcRequest.id;
35+
switch (command) {
36+
case 'produce-message':
37+
await produceMessage(id);
38+
break;
39+
case 'consume-message':
40+
await consumeMessage(id);
41+
break;
42+
default:
43+
const errorMsg = `Unknown message: ${inspect(message)}`;
44+
sendToParentProcess({ id, error: errorMsg });
45+
console.error(errorMsg);
46+
}
47+
});
48+
49+
(function initKafka() {
50+
producer = kafka.producer();
51+
52+
getRunConfig();
53+
54+
// Since we patch the consumer, we need to disable/enable the instrumentation to make sure it the instrumentation is
55+
// still applied, see
56+
// https://github.com/open-telemetry/opentelemetry-js-contrib/blob/2c32e5869ef9b6d582ba4da02623a030309bcaf3/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts#L142-L143.
57+
kafkaJsInstrumentation.disable();
58+
kafkaJsInstrumentation.enable();
59+
60+
consumer = kafka.consumer({
61+
groupId: 'testing-group-id',
62+
});
63+
64+
sendReadyToParentProcess();
65+
})();
66+
67+
function getRunConfig() {
68+
const origConsumerFactory = kafkajs.Kafka.prototype.consumer;
69+
kafkajs.Kafka.prototype.consumer = function (...args): Consumer {
70+
const consumer: Consumer = origConsumerFactory.apply(this, args);
71+
consumer.run = function (config?: ConsumerRunConfig): Promise<void> {
72+
runConfig = config;
73+
return Promise.resolve();
74+
};
75+
return consumer;
76+
};
77+
}
78+
79+
async function produceMessage(id: number) {
80+
try {
81+
const res: RecordMetadata[] = await producer.send({
82+
topic: 'test-topic',
83+
messages: [
84+
{
85+
value: 'testing message content',
86+
},
87+
],
88+
});
89+
sendToParentProcess({ id, ok: true, res });
90+
} catch (err) {
91+
if (err instanceof KafkaJSError && err.message === 'The producer is disconnected') {
92+
// This is expected, since we are not starting an actual Kafka broker for the integration tests, hence we are
93+
// also not connected to any broker.
94+
sendToParentProcess({ id, ok: true });
95+
} else {
96+
sendToParentProcess({ id, error: err });
97+
}
98+
}
99+
}
100+
101+
async function consumeMessage(id: number) {
102+
consumer.run({
103+
eachMessage: async (): Promise<void> => {},
104+
});
105+
const payload = createPayload();
106+
await runConfig?.eachMessage!(payload);
107+
sendToParentProcess({ id, ok: true });
108+
}
109+
110+
function createPayload(): EachMessagePayload {
111+
return {
112+
topic: 'test-topic',
113+
partition: 0,
114+
message: createMessage('456'),
115+
heartbeat: async () => {},
116+
pause: () => () => {},
117+
};
118+
}
119+
120+
function createMessage(offset: string): KafkaMessage {
121+
return {
122+
key: Buffer.from('message-key', 'utf8'),
123+
value: Buffer.from('message content', 'utf8'),
124+
timestamp: '1234',
125+
size: 10,
126+
attributes: 1,
127+
offset: offset,
128+
};
129+
}

test/apps/kafkajs/package-lock.json

Lines changed: 205 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)