Skip to content

Commit 7db5038

Browse files
committed
refactor(instrumentation-kafkajs): move client duration metric to separate method for testability
1 parent a51552c commit 7db5038

File tree

2 files changed

+55
-10
lines changed

2 files changed

+55
-10
lines changed

plugins/node/instrumentation-kafkajs/src/instrumentation.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -251,19 +251,26 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
251251
private _setKafkaEventListeners(kafkaObj: KafkaEventEmitter) {
252252
if (kafkaObj[EVENT_LISTENERS_SET]) return;
253253

254-
kafkaObj.on(kafkaObj.events.REQUEST, event => {
255-
const [address, port] = event.payload.broker.split(':');
256-
this._clientDuration.record(event.payload.duration / 1000, {
257-
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
258-
[ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @v${event.payload.apiVersion}?
259-
[ATTR_SERVER_ADDRESS]: address,
260-
[ATTR_SERVER_PORT]: Number.parseInt(port, 10),
261-
});
262-
});
254+
kafkaObj.on(
255+
kafkaObj.events.REQUEST,
256+
this._recordClientDurationMetric.bind(this)
257+
);
263258

264259
kafkaObj[EVENT_LISTENERS_SET] = true;
265260
}
266261

262+
private _recordClientDurationMetric(
263+
event: Pick<kafkaJs.RequestEvent, 'payload'>
264+
) {
265+
const [address, port] = event.payload.broker.split(':');
266+
this._clientDuration.record(event.payload.duration / 1000, {
267+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
268+
[ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @v${event.payload.apiVersion}?
269+
[ATTR_SERVER_ADDRESS]: address,
270+
[ATTR_SERVER_PORT]: Number.parseInt(port, 10),
271+
});
272+
}
273+
267274
private _getProducerPatch() {
268275
const instrumentation = this;
269276
return (original: kafkaJs.Kafka['producer']) => {

plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import {
3838
MESSAGING_SYSTEM_VALUE_KAFKA,
3939
METRIC_MESSAGING_PROCESS_DURATION,
4040
METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES,
41+
METRIC_MESSAGING_CLIENT_OPERATION_DURATION,
4142
} from '../src/semconv';
4243
import {
4344
getTestSpans,
@@ -68,7 +69,11 @@ import {
6869
} from 'kafkajs';
6970
import { DummyPropagation } from './DummyPropagation';
7071
import { bufferTextMapGetter } from '../src/propagator';
71-
import { ATTR_ERROR_TYPE } from '@opentelemetry/semantic-conventions';
72+
import {
73+
ATTR_ERROR_TYPE,
74+
ATTR_SERVER_ADDRESS,
75+
ATTR_SERVER_PORT,
76+
} from '@opentelemetry/semantic-conventions';
7277

7378
function assertMetricCollection(
7479
{ errors, resourceMetrics }: CollectionResult,
@@ -1334,6 +1339,39 @@ describe('instrumentation-kafkajs', () => {
13341339
);
13351340
});
13361341
});
1342+
describe('client duration metric', () => {
1343+
it('records the metric', async () => {
1344+
instrumentation['_recordClientDurationMetric']({
1345+
payload: {
1346+
broker: 'kafka.host:4789',
1347+
duration: 242,
1348+
apiName: 'some-operation',
1349+
apiKey: 123,
1350+
apiVersion: 1,
1351+
clientId: 'client-id',
1352+
correlationId: 456,
1353+
createdAt: Date.now(),
1354+
pendingDuration: 0,
1355+
sentAt: Date.now(),
1356+
size: 1024,
1357+
},
1358+
});
1359+
assertMetricCollection(await metricReader.collect(), {
1360+
[METRIC_MESSAGING_CLIENT_OPERATION_DURATION]: [
1361+
{
1362+
count: 1,
1363+
value: 0.232,
1364+
attributes: {
1365+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
1366+
[ATTR_MESSAGING_OPERATION_NAME]: 'some-operation',
1367+
[ATTR_SERVER_ADDRESS]: 'kafka.host',
1368+
[ATTR_SERVER_PORT]: 4789,
1369+
},
1370+
},
1371+
],
1372+
});
1373+
});
1374+
});
13371375

13381376
describe('bufferTextMapGetter', () => {
13391377
it('is possible to retrieve keys case insensitively', () => {

0 commit comments

Comments
 (0)