Skip to content

Commit c6606a1

Browse files
committed
refactor(instrumentation-kafkajs): rearrange consumed message metric collection for testability
1 parent 00e0787 commit c6606a1

File tree

2 files changed

+113
-41
lines changed

2 files changed

+113
-41
lines changed

plugins/node/instrumentation-kafkajs/src/instrumentation.ts

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,7 @@ import type {
5151
Producer,
5252
RecordMetadata,
5353
} from 'kafkajs';
54-
import {
55-
ConsumerExtended,
56-
EVENT_LISTENERS_SET,
57-
ProducerExtended,
58-
} from './internal-types';
54+
import { EVENT_LISTENERS_SET } from './internal-types';
5955
import { bufferTextMapGetter } from './propagator';
6056
import {
6157
ATTR_MESSAGING_BATCH_MESSAGE_COUNT,
@@ -88,6 +84,22 @@ interface ConsumerSpanOptions {
8884
ctx?: Context | undefined;
8985
link?: Link;
9086
}
87+
// This interface acts as a strict subset of the KafkaJS Consumer and
88+
// Producer interfaces (just for the event we're needing)
89+
interface KafkaEventEmitter {
90+
on(
91+
eventName:
92+
| kafkaJs.ConsumerEvents['REQUEST']
93+
| kafkaJs.ProducerEvents['REQUEST'],
94+
listener: (event: kafkaJs.RequestEvent) => void
95+
): void;
96+
events: {
97+
REQUEST:
98+
| kafkaJs.ConsumerEvents['REQUEST']
99+
| kafkaJs.ProducerEvents['REQUEST'];
100+
};
101+
[EVENT_LISTENERS_SET]?: boolean;
102+
}
91103

92104
interface StandardAttributes<OP extends string = string> extends Attributes {
93105
[ATTR_MESSAGING_SYSTEM]: string;
@@ -229,37 +241,27 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
229241
instrumentation._getConsumerRunPatch()
230242
);
231243

232-
instrumentation._setConsumerEventListeners(newConsumer);
244+
instrumentation._setKafkaEventListeners(newConsumer);
233245

234246
return newConsumer;
235247
};
236248
};
237249
}
238250

239-
private _setConsumerEventListeners(consumer: ConsumerExtended) {
240-
if (consumer[EVENT_LISTENERS_SET]) return;
251+
private _setKafkaEventListeners(kafkaObj: KafkaEventEmitter) {
252+
if (kafkaObj[EVENT_LISTENERS_SET]) return;
241253

242-
consumer.on(consumer.events.END_BATCH_PROCESS, event =>
243-
this._consumedMessages.add(event.payload.batchSize, {
244-
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
245-
[ATTR_MESSAGING_OPERATION_NAME]: 'receive',
246-
[ATTR_MESSAGING_DESTINATION_NAME]: event.payload.topic,
247-
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(
248-
event.payload.partition
249-
),
250-
})
251-
);
252-
consumer.on(consumer.events.REQUEST, event => {
254+
kafkaObj.on(kafkaObj.events.REQUEST, event => {
253255
const [address, port] = event.payload.broker.split(':');
254256
this._clientDuration.record(event.payload.duration / 1000, {
255257
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
256-
[ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @${event.payload.apiVersion}?
258+
[ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @v${event.payload.apiVersion}?
257259
[ATTR_SERVER_ADDRESS]: address,
258260
[ATTR_SERVER_PORT]: Number.parseInt(port, 10),
259261
});
260262
});
261263

262-
consumer[EVENT_LISTENERS_SET] = true;
264+
kafkaObj[EVENT_LISTENERS_SET] = true;
263265
}
264266

265267
private _getProducerPatch() {
@@ -289,29 +291,13 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
289291
instrumentation._getProducerSendPatch()
290292
);
291293

292-
instrumentation._setProducerEventListeners(newProducer);
294+
instrumentation._setKafkaEventListeners(newProducer);
293295

294296
return newProducer;
295297
};
296298
};
297299
}
298300

299-
private _setProducerEventListeners(producer: ProducerExtended) {
300-
if (producer[EVENT_LISTENERS_SET]) return;
301-
302-
producer.on(producer.events.REQUEST, event => {
303-
const [address, port] = event.payload.broker.split(':');
304-
this._clientDuration.record(event.payload.duration / 1000, {
305-
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
306-
[ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @${event.payload.apiVersion}?
307-
[ATTR_SERVER_ADDRESS]: address,
308-
[ATTR_SERVER_PORT]: Number.parseInt(port),
309-
});
310-
});
311-
312-
producer[EVENT_LISTENERS_SET] = true;
313-
}
314-
315301
private _getConsumerRunPatch() {
316302
const instrumentation = this;
317303
return (original: Consumer['run']) => {
@@ -383,6 +369,14 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
383369
),
384370
}
385371
),
372+
prepareCounter(instrumentation._consumedMessages, 1, {
373+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
374+
[ATTR_MESSAGING_OPERATION_NAME]: 'process',
375+
[ATTR_MESSAGING_DESTINATION_NAME]: payload.topic,
376+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(
377+
payload.partition
378+
),
379+
}),
386380
];
387381

388382
const eachMessagePromise = context.with(
@@ -426,7 +420,20 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
426420
() => {
427421
const startTime = Date.now();
428422
const spans: Span[] = [];
429-
const pendingMetrics: RecordPendingMetric[] = [];
423+
const pendingMetrics: RecordPendingMetric[] = [
424+
prepareCounter(
425+
instrumentation._consumedMessages,
426+
payload.batch.messages.length,
427+
{
428+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
429+
[ATTR_MESSAGING_OPERATION_NAME]: 'process',
430+
[ATTR_MESSAGING_DESTINATION_NAME]: payload.batch.topic,
431+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(
432+
payload.batch.partition
433+
),
434+
}
435+
),
436+
];
430437
payload.batch.messages.forEach(message => {
431438
const propagatedContext: Context = propagation.extract(
432439
ROOT_CONTEXT,

plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import {
3737
METRIC_MESSAGING_CLIENT_SENT_MESSAGES,
3838
MESSAGING_SYSTEM_VALUE_KAFKA,
3939
METRIC_MESSAGING_PROCESS_DURATION,
40+
METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES,
4041
} from '../src/semconv';
4142
import {
4243
getTestSpans,
@@ -91,13 +92,13 @@ function assertMetricCollection(
9192
assert.deepStrictEqual(
9293
match.dataPoints.map(d => d.value.count),
9394
values.map(v => v.count),
94-
'histogram datapoints do not have the same count'
95+
`${name} datapoints do not have the same count`
9596
);
9697
} else {
9798
assert.deepStrictEqual(
9899
match.dataPoints.map(d => d.value),
99100
values.map(v => v.value),
100-
'counter datapoints do not match'
101+
`${name} datapoint values do not match`
101102
);
102103
}
103104
assert.deepStrictEqual(
@@ -777,6 +778,17 @@ describe('instrumentation-kafkajs', () => {
777778
},
778779
},
779780
],
781+
[METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [
782+
{
783+
value: 1,
784+
attributes: {
785+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
786+
[ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1',
787+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1',
788+
[ATTR_MESSAGING_OPERATION_NAME]: 'process',
789+
},
790+
},
791+
],
780792
});
781793
});
782794

@@ -953,6 +965,18 @@ describe('instrumentation-kafkajs', () => {
953965
},
954966
},
955967
],
968+
[METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [
969+
{
970+
value: 1,
971+
attributes: {
972+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
973+
[ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1',
974+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1',
975+
[ATTR_MESSAGING_OPERATION_NAME]: 'process',
976+
[ATTR_ERROR_TYPE]: 'Error',
977+
},
978+
},
979+
],
956980
});
957981
});
958982

@@ -993,6 +1017,18 @@ describe('instrumentation-kafkajs', () => {
9931017
},
9941018
},
9951019
],
1020+
[METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [
1021+
{
1022+
value: 1,
1023+
attributes: {
1024+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
1025+
[ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1',
1026+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1',
1027+
[ATTR_MESSAGING_OPERATION_NAME]: 'process',
1028+
[ATTR_ERROR_TYPE]: '_OTHER',
1029+
},
1030+
},
1031+
],
9961032
});
9971033
});
9981034

@@ -1030,6 +1066,18 @@ describe('instrumentation-kafkajs', () => {
10301066
},
10311067
},
10321068
],
1069+
[METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [
1070+
{
1071+
value: 1,
1072+
attributes: {
1073+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
1074+
[ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1',
1075+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1',
1076+
[ATTR_MESSAGING_OPERATION_NAME]: 'process',
1077+
[ATTR_ERROR_TYPE]: '_OTHER',
1078+
},
1079+
},
1080+
],
10331081
});
10341082
});
10351083
});
@@ -1114,6 +1162,17 @@ describe('instrumentation-kafkajs', () => {
11141162
},
11151163
},
11161164
],
1165+
[METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [
1166+
{
1167+
value: 2,
1168+
attributes: {
1169+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
1170+
[ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1',
1171+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1234',
1172+
[ATTR_MESSAGING_OPERATION_NAME]: 'process',
1173+
},
1174+
},
1175+
],
11171176
});
11181177
});
11191178

@@ -1283,5 +1342,11 @@ describe('instrumentation-kafkajs', () => {
12831342
'123'
12841343
);
12851344
});
1345+
it('exposes a keys method', () => {
1346+
assert.deepStrictEqual(bufferTextMapGetter.keys({ a: 1, b: 2 }), [
1347+
'a',
1348+
'b',
1349+
]);
1350+
});
12861351
});
12871352
});

0 commit comments

Comments
 (0)