From 996a6075147f57e5619326522e251eeb90a55483 Mon Sep 17 00:00:00 2001 From: Brian Phillips <28457+brianphillips@users.noreply.github.com> Date: Wed, 12 Mar 2025 14:20:52 -0500 Subject: [PATCH 1/7] feat(instrumentation-kafkajs): update semantic conventions Signed-off-by: Brian Phillips <28457+brianphillips@users.noreply.github.com> --- package-lock.json | 5 +- .../node/instrumentation-kafkajs/README.md | 30 ++- .../node/instrumentation-kafkajs/package.json | 2 +- .../src/instrumentation.ts | 156 ++++++++++---- .../instrumentation-kafkajs/src/semconv.ts | 138 +++++++++++++ .../test/kafkajs.test.ts | 192 ++++++++++++++---- 6 files changed, 430 insertions(+), 93 deletions(-) create mode 100644 plugins/node/instrumentation-kafkajs/src/semconv.ts diff --git a/package-lock.json b/package-lock.json index 0f23d64e6b..bef692546e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10478,6 +10478,7 @@ "version": "1.30.0", "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.30.0.tgz", "integrity": "sha512-4VlGgo32k2EQ2wcCY3vEU28A0O13aOtHz3Xt2/2U5FAh9EfhD6t6DqL5Z6yAnRCntbTFDU4YfbpyzSlHNWycPw==", + "license": "Apache-2.0", "engines": { "node": ">=14" } @@ -36311,7 +36312,7 @@ "license": "Apache-2.0", "dependencies": { "@opentelemetry/instrumentation": "^0.57.2", - "@opentelemetry/semantic-conventions": "^1.27.0" + "@opentelemetry/semantic-conventions": "^1.30.0" }, "devDependencies": { "@opentelemetry/api": "^1.3.0", @@ -47428,7 +47429,7 @@ "@opentelemetry/contrib-test-utils": "^0.45.1", "@opentelemetry/instrumentation": "^0.57.2", "@opentelemetry/sdk-trace-base": "^1.24.0", - "@opentelemetry/semantic-conventions": "^1.27.0", + "@opentelemetry/semantic-conventions": "^1.30.0", "@types/mocha": "10.0.10", "@types/node": "18.18.14", "@types/sinon": "^17.0.0", diff --git a/plugins/node/instrumentation-kafkajs/README.md b/plugins/node/instrumentation-kafkajs/README.md index a5f54ee2ee..6d955cf576 100644 --- a/plugins/node/instrumentation-kafkajs/README.md +++ b/plugins/node/instrumentation-kafkajs/README.md @@ -23,7 +23,9 @@ npm install --save @opentelemetry/instrumentation-kafkajs ```js const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node'); -const { KafkaJsInstrumentation } = require('@opentelemetry/instrumentation-kafkajs'); +const { + KafkaJsInstrumentation, +} = require('@opentelemetry/instrumentation-kafkajs'); const { registerInstrumentations } = require('@opentelemetry/instrumentation'); const provider = new NodeTracerProvider(); @@ -42,22 +44,28 @@ registerInstrumentations({ You can set the following: -| Options | Type | Description | -| ---------------------------- | -------------------------------------- | -------------------------------------------------------------------------------------------------------------------- | -| `producerHook` | `KafkaProducerCustomAttributeFunction` | Function called before a producer message is sent. Allows for adding custom attributes to the span. | -| `consumerHook` | `KafkaConsumerCustomAttributeFunction` | Function called before a consumer message is processed. Allows for adding custom attributes to the span. | +| Options | Type | Description | +| -------------- | -------------------------------------- | -------------------------------------------------------------------------------------------------------- | +| `producerHook` | `KafkaProducerCustomAttributeFunction` | Function called before a producer message is sent. Allows for adding custom attributes to the span. | +| `consumerHook` | `KafkaConsumerCustomAttributeFunction` | Function called before a consumer message is processed. Allows for adding custom attributes to the span. | ## Semantic Conventions -This package uses `@opentelemetry/semantic-conventions` version `1.24+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md) +This package uses `@opentelemetry/semantic-conventions` version `1.30+`, which implements Semantic Convention [Version 1.30.0](https://github.com/open-telemetry/semantic-conventions/blob/v1.30.0/docs/README.md) Attributes collected: -| Attribute | Short Description | -| -----------------------------| ----------------------------------------------------- | -| `messaging.system` | An identifier for the messaging system being used. | -| `messaging.destination` | The message destination name. | -| `messaging.operation` | A string identifying the kind of messaging operation. | +| Attribute | Short Description | +| ------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `messaging.system` | An identifier for the messaging system being used (i.e. `"kafka"`). | +| `messaging.destination.name` | The message destination name. | +| `messaging.operation.type` | A string identifying the type of messaging operation. | +| `messaging.operation.name` | The system-specific name of the messaging operation. | +| `messaging.operation.name` | The system-specific name of the messaging operation. | +| `messaging.kafka.message.key` | A stringified value representing the key of the Kafka message (if present). | +| `messaging.kafka.message.tombstone` | A boolean that is true if the message is a tombstone. | +| `messaging.kafka.offset` | The offset of a record in the corresponding Kafka partition. | +| `messaging.destination.partition.id` | The identifier of the partition messages are sent to or received from, unique within the `messaging.destination.name`. **Note:** only available on producer spans. | ## Useful links diff --git a/plugins/node/instrumentation-kafkajs/package.json b/plugins/node/instrumentation-kafkajs/package.json index 2ff0f5ce3a..122512dbb0 100644 --- a/plugins/node/instrumentation-kafkajs/package.json +++ b/plugins/node/instrumentation-kafkajs/package.json @@ -56,7 +56,7 @@ }, "dependencies": { "@opentelemetry/instrumentation": "^0.57.2", - "@opentelemetry/semantic-conventions": "^1.27.0" + "@opentelemetry/semantic-conventions": "^1.30.0" }, "homepage": "https://github.com/open-telemetry/opentelemetry-js-contrib/tree/main/plugins/node/instrumentation-kafkajs#readme" } diff --git a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts index 1dacb4351c..2138c33470 100644 --- a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts +++ b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts @@ -24,14 +24,24 @@ import { trace, context, ROOT_CONTEXT, + Attributes, } from '@opentelemetry/api'; +import { ATTR_ERROR_TYPE } from '@opentelemetry/semantic-conventions'; import { - MESSAGINGOPERATIONVALUES_PROCESS, - MESSAGINGOPERATIONVALUES_RECEIVE, - SEMATTRS_MESSAGING_SYSTEM, - SEMATTRS_MESSAGING_DESTINATION, - SEMATTRS_MESSAGING_OPERATION, -} from '@opentelemetry/semantic-conventions'; + ATTR_MESSAGING_BATCH_MESSAGE_COUNT, + ATTR_MESSAGING_DESTINATION_NAME, + ATTR_MESSAGING_DESTINATION_PARTITION_ID, + ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE, + ATTR_MESSAGING_KAFKA_OFFSET, + ATTR_MESSAGING_KAFKA_MESSAGE_KEY, + ATTR_MESSAGING_OPERATION_NAME, + ATTR_MESSAGING_OPERATION_TYPE, + ATTR_MESSAGING_SYSTEM, + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + MESSAGING_OPERATION_TYPE_VALUE_RECEIVE, + MESSAGING_OPERATION_TYPE_VALUE_SEND, + MESSAGING_SYSTEM_VALUE_KAFKA, +} from './semconv'; import type * as kafkaJs from 'kafkajs'; import type { EachBatchHandler, @@ -54,6 +64,15 @@ import { isWrapped, } from '@opentelemetry/instrumentation'; +interface ConsumerSpanOptions { + topic: string; + message: KafkaMessage | undefined; + operationType: string; + attributes?: Attributes; + context?: Context | undefined; + link?: Link; +} + export class KafkaJsInstrumentation extends InstrumentationBase { constructor(config: KafkaJsInstrumentationConfig = {}) { super(PACKAGE_NAME, PACKAGE_VERSION, config); @@ -194,12 +213,17 @@ export class KafkaJsInstrumentation extends InstrumentationBase { const payload = args[0]; // https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers - const receivingSpan = instrumentation._startConsumerSpan( - payload.batch.topic, - undefined, - MESSAGINGOPERATIONVALUES_RECEIVE, - ROOT_CONTEXT - ); + const receivingSpan = instrumentation._startConsumerSpan({ + topic: payload.batch.topic, + message: undefined, + operationType: MESSAGING_OPERATION_TYPE_VALUE_RECEIVE, + context: ROOT_CONTEXT, + attributes: { + [ATTR_MESSAGING_BATCH_MESSAGE_COUNT]: payload.batch.messages.length, + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + payload.batch.partition + ), + }, + }); return context.with( trace.setSpan(context.active(), receivingSpan), () => { @@ -246,13 +276,17 @@ export class KafkaJsInstrumentation extends InstrumentationBase = original!.apply( @@ -324,19 +358,24 @@ export class KafkaJsInstrumentation extends InstrumentationBase { let errorMessage: string; - if (typeof reason === 'string') errorMessage = reason; - else if ( + let errorType: string | undefined; + if (typeof reason === 'string') { + errorMessage = reason; + } else if ( typeof reason === 'object' && Object.prototype.hasOwnProperty.call(reason, 'message') - ) + ) { errorMessage = reason.message; + errorType = reason.constructor.name; + } - spans.forEach(span => + spans.forEach(span => { + if (errorType) span.setAttribute(ATTR_ERROR_TYPE, errorType); span.setStatus({ code: SpanStatusCode.ERROR, message: errorMessage, - }) - ); + }); + }); throw reason; }) @@ -345,21 +384,38 @@ export class KafkaJsInstrumentation extends InstrumentationBase { topic: 'topic-name-1', messages: [ { + partition: 42, + key: 'message-key-0', value: 'testing message content', }, ], @@ -161,13 +168,25 @@ describe('instrumentation-kafkajs', () => { assert.strictEqual(spans.length, 1); const span = spans[0]; assert.strictEqual(span.kind, SpanKind.PRODUCER); - assert.strictEqual(span.name, 'topic-name-1'); + assert.strictEqual(span.name, 'send topic-name-1'); assert.strictEqual(span.status.code, SpanStatusCode.UNSET); - assert.strictEqual(span.attributes[SEMATTRS_MESSAGING_SYSTEM], 'kafka'); + assert.strictEqual(span.attributes[ATTR_MESSAGING_SYSTEM], 'kafka'); assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_DESTINATION], + span.attributes[ATTR_MESSAGING_DESTINATION_NAME], 'topic-name-1' ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_DESTINATION_PARTITION_ID], + '42' + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE], + undefined + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_KEY], + 'message-key-0' + ); assert.strictEqual(messagesSent.length, 1); expectKafkaHeadersToMatchSpanContext( @@ -176,6 +195,29 @@ describe('instrumentation-kafkajs', () => { ); }); + it('simple send create span with tombstone attribute', async () => { + await producer.send({ + topic: 'topic-name-2', + messages: [ + { + partition: 42, + key: 'message-key-1', + value: null, + }, + ], + }); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.kind, SpanKind.PRODUCER); + assert.strictEqual(span.name, 'send topic-name-2'); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE], + true + ); + }); + it('send two messages', async () => { await producer.send({ topic: 'topic-name-1', @@ -191,8 +233,8 @@ describe('instrumentation-kafkajs', () => { const spans = getTestSpans(); assert.strictEqual(spans.length, 2); - assert.strictEqual(spans[0].name, 'topic-name-1'); - assert.strictEqual(spans[1].name, 'topic-name-1'); + assert.strictEqual(spans[0].name, 'send topic-name-1'); + assert.strictEqual(spans[1].name, 'send topic-name-1'); assert.strictEqual(messagesSent.length, 2); expectKafkaHeadersToMatchSpanContext( @@ -232,9 +274,9 @@ describe('instrumentation-kafkajs', () => { const spans = getTestSpans(); assert.strictEqual(spans.length, 3); - assert.strictEqual(spans[0].name, 'topic-name-1'); - assert.strictEqual(spans[1].name, 'topic-name-1'); - assert.strictEqual(spans[2].name, 'topic-name-2'); + assert.strictEqual(spans[0].name, 'send topic-name-1'); + assert.strictEqual(spans[1].name, 'send topic-name-1'); + assert.strictEqual(spans[2].name, 'send topic-name-2'); assert.strictEqual(messagesSent.length, 3); for (let i = 0; i < 3; i++) { @@ -417,22 +459,34 @@ describe('instrumentation-kafkajs', () => { }); describe('consumer', () => { - const createKafkaMessage = (offset: string): KafkaMessage => { + interface CreateMessageParams { + offset: string; + key?: string | null; + tombstone?: boolean; + } + + const createKafkaMessage = ({ + offset, + key = 'message-key', + tombstone = false, + }: CreateMessageParams): KafkaMessage => { return { - key: Buffer.from('message-key', 'utf8'), - value: Buffer.from('message content', 'utf8'), + key: typeof key === 'string' ? Buffer.from(key, 'utf8') : key, + value: tombstone ? null : Buffer.from('message content', 'utf8'), timestamp: '1234', size: 10, attributes: 1, - offset: offset, + offset, }; }; - const createEachMessagePayload = (): EachMessagePayload => { + const createEachMessagePayload = ( + params: Partial = {} + ): EachMessagePayload => { return { topic: 'topic-name-1', - partition: 0, - message: createKafkaMessage('123'), + partition: 1, + message: createKafkaMessage({ offset: '123', ...params }), heartbeat: async () => {}, pause: () => () => {}, }; @@ -444,7 +498,10 @@ describe('instrumentation-kafkajs', () => { topic: 'topic-name-1', partition: 1234, highWatermark: '4567', - messages: [createKafkaMessage('124'), createKafkaMessage('125')], + messages: [ + createKafkaMessage({ offset: '124' }), + createKafkaMessage({ offset: '125' }), + ], }, } as EachBatchPayload; }; @@ -468,25 +525,80 @@ describe('instrumentation-kafkajs', () => { _payload: EachMessagePayload ): Promise => {}, }); - const payload: EachMessagePayload = createEachMessagePayload(); + const payload = createEachMessagePayload(); await runConfig?.eachMessage!(payload); const spans = getTestSpans(); assert.strictEqual(spans.length, 1); const span = spans[0]; - assert.strictEqual(span.name, 'topic-name-1'); + assert.strictEqual(span.name, 'process topic-name-1'); assert.strictEqual(span.parentSpanId, undefined); assert.strictEqual(span.kind, SpanKind.CONSUMER); assert.strictEqual(span.status.code, SpanStatusCode.UNSET); - assert.strictEqual(span.attributes[SEMATTRS_MESSAGING_SYSTEM], 'kafka'); + assert.strictEqual(span.attributes[ATTR_MESSAGING_SYSTEM], 'kafka'); assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_DESTINATION], + span.attributes[ATTR_MESSAGING_DESTINATION_NAME], 'topic-name-1' ); assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_OPERATION], + span.attributes[ATTR_MESSAGING_OPERATION_TYPE], 'process' ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_DESTINATION_PARTITION_ID], + '1' + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_KEY], + 'message-key' + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE], + undefined + ); + assert.strictEqual(span.attributes[ATTR_MESSAGING_KAFKA_OFFSET], '123'); + }); + + it('consume eachMessage tombstone', async () => { + consumer.run({ + eachMessage: async ( + _payload: EachMessagePayload + ): Promise => {}, + }); + const payload = createEachMessagePayload({ tombstone: true }); + await runConfig?.eachMessage!(payload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.name, 'process topic-name-1'); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_KEY], + 'message-key' + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE], + true + ); + }); + + it('consume eachMessage with null key', async () => { + consumer.run({ + eachMessage: async ( + _payload: EachMessagePayload + ): Promise => {}, + }); + const payload = createEachMessagePayload({ key: null }); + await runConfig?.eachMessage!(payload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.name, 'process topic-name-1'); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_KEY], + undefined + ); }); it('consumer eachMessage with non promise return value', async () => { @@ -678,32 +790,40 @@ describe('instrumentation-kafkajs', () => { const spans = getTestSpans(); assert.strictEqual(spans.length, 3); spans.forEach(span => { - assert.strictEqual(span.name, 'topic-name-1'); assert.strictEqual(span.status.code, SpanStatusCode.UNSET); + assert.strictEqual(span.attributes[ATTR_MESSAGING_SYSTEM], 'kafka'); assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_SYSTEM], - 'kafka' - ); - assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_DESTINATION], + span.attributes[ATTR_MESSAGING_DESTINATION_NAME], 'topic-name-1' ); }); const [recvSpan, msg1Span, msg2Span] = spans; + assert.strictEqual(recvSpan.kind, SpanKind.CLIENT); + assert.strictEqual(recvSpan.name, 'poll topic-name-1'); assert.strictEqual(recvSpan.parentSpanId, undefined); assert.strictEqual( - recvSpan.attributes[SEMATTRS_MESSAGING_OPERATION], + recvSpan.attributes[ATTR_MESSAGING_OPERATION_TYPE], 'receive' ); + assert.strictEqual( + recvSpan.attributes[ATTR_MESSAGING_OPERATION_NAME], + 'poll' + ); + assert.strictEqual(msg1Span.kind, SpanKind.CONSUMER); + assert.strictEqual(msg1Span.name, 'process topic-name-1'); assert.strictEqual( msg1Span.parentSpanId, recvSpan.spanContext().spanId ); assert.strictEqual( - msg1Span.attributes[SEMATTRS_MESSAGING_OPERATION], + msg1Span.attributes[ATTR_MESSAGING_OPERATION_TYPE], + 'process' + ); + assert.strictEqual( + msg1Span.attributes[ATTR_MESSAGING_OPERATION_NAME], 'process' ); @@ -712,7 +832,11 @@ describe('instrumentation-kafkajs', () => { recvSpan.spanContext().spanId ); assert.strictEqual( - msg2Span.attributes[SEMATTRS_MESSAGING_OPERATION], + msg2Span.attributes[ATTR_MESSAGING_OPERATION_TYPE], + 'process' + ); + assert.strictEqual( + msg2Span.attributes[ATTR_MESSAGING_OPERATION_NAME], 'process' ); }); From 45047b4c6de4b6db49754cd91512c5d06939f6b1 Mon Sep 17 00:00:00 2001 From: Brian Phillips <28457+brianphillips@users.noreply.github.com> Date: Thu, 13 Mar 2025 15:08:01 -0500 Subject: [PATCH 2/7] feat(instrumentation-kafkajs): add metrics from messaging semantic conventions --- .../node/instrumentation-kafkajs/README.md | 25 +- .../src/instrumentation.ts | 353 ++++++++++++++---- .../src/internal-types.ts | 29 ++ .../instrumentation-kafkajs/src/semconv.ts | 41 ++ .../test/kafkajs.test.ts | 316 +++++++++++++++- 5 files changed, 681 insertions(+), 83 deletions(-) create mode 100644 plugins/node/instrumentation-kafkajs/src/internal-types.ts diff --git a/plugins/node/instrumentation-kafkajs/README.md b/plugins/node/instrumentation-kafkajs/README.md index 6d955cf576..34684e7be7 100644 --- a/plugins/node/instrumentation-kafkajs/README.md +++ b/plugins/node/instrumentation-kafkajs/README.md @@ -53,7 +53,30 @@ You can set the following: This package uses `@opentelemetry/semantic-conventions` version `1.30+`, which implements Semantic Convention [Version 1.30.0](https://github.com/open-telemetry/semantic-conventions/blob/v1.30.0/docs/README.md) -Attributes collected: +### Spans Emitted + +| KafkaJS Object | Action | Span Kind | Span Name | Operation Type / Name | +| -------------- | -------------------------- | --------- | -------------------------- | --------------------- | +| Consumer | `eachBatch` | Client | `poll ` | `receive` / `poll` | +| Consumer | `eachBatch`, `eachMessage` | Consumer | `process ` [1] | `process` / `process` | +| Producer | `send` | Producer | `send ` | `send` / `send` | + +**[1] `process `:** In the context of `eachBatch`, this span will be emitted for each message in the batch but the timing (start, end, duration) will reflect the timing of the batch. + +### Metrics Emitted + +| KafkaJS Object | Metric Name | Short Description | +| --------------------- | ------------------------------------- | ------------------------------------------------------------ | +| Consumer | `messaging.process.duration` | Duration of processing operation. [1] | +| Consumer | `messaging.client.consumed.messages` | Number of messages that were delivered to the application. | +| Consumer and Producer | `messaging.client.operation.duration` | Number of messages that were delivered to the application. | +| Producer | `messaging.client.sent.messages` | Number of messages producer attempted to send to the broker. | + +**[1] `messaging.process.duration`:** In the context of `eachBatch`, this metric will be emitted once for each message but the value reflects the duration of the entire batch. + +### Attributes Collected + +These attributes are added to both spans and metrics, where possible. | Attribute | Short Description | | ------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ | diff --git a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts index 2138c33470..9ba1454115 100644 --- a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts +++ b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts @@ -15,25 +15,55 @@ */ import { - SpanKind, - Span, - SpanStatusCode, + Attributes, Context, - propagation, - Link, - trace, context, + Counter, + Histogram, + Link, + propagation, ROOT_CONTEXT, - Attributes, + Span, + SpanKind, + SpanStatusCode, + trace, } from '@opentelemetry/api'; -import { ATTR_ERROR_TYPE } from '@opentelemetry/semantic-conventions'; +import { + InstrumentationBase, + InstrumentationNodeModuleDefinition, + isWrapped, + safeExecuteInTheMiddle, +} from '@opentelemetry/instrumentation'; +import { + ATTR_ERROR_TYPE, + ATTR_SERVER_ADDRESS, + ATTR_SERVER_PORT, + ERROR_TYPE_VALUE_OTHER, +} from '@opentelemetry/semantic-conventions'; +import type * as kafkaJs from 'kafkajs'; +import type { + Consumer, + ConsumerRunConfig, + EachBatchHandler, + EachMessageHandler, + KafkaMessage, + Message, + Producer, + RecordMetadata, +} from 'kafkajs'; +import { + ConsumerExtended, + EVENT_LISTENERS_SET, + ProducerExtended, +} from './internal-types'; +import { bufferTextMapGetter } from './propagator'; import { ATTR_MESSAGING_BATCH_MESSAGE_COUNT, ATTR_MESSAGING_DESTINATION_NAME, ATTR_MESSAGING_DESTINATION_PARTITION_ID, + ATTR_MESSAGING_KAFKA_MESSAGE_KEY, ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE, ATTR_MESSAGING_KAFKA_OFFSET, - ATTR_MESSAGING_KAFKA_MESSAGE_KEY, ATTR_MESSAGING_OPERATION_NAME, ATTR_MESSAGING_OPERATION_TYPE, ATTR_MESSAGING_SYSTEM, @@ -41,43 +71,112 @@ import { MESSAGING_OPERATION_TYPE_VALUE_RECEIVE, MESSAGING_OPERATION_TYPE_VALUE_SEND, MESSAGING_SYSTEM_VALUE_KAFKA, + METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES, + METRIC_MESSAGING_CLIENT_OPERATION_DURATION, + METRIC_MESSAGING_CLIENT_SENT_MESSAGES, + METRIC_MESSAGING_PROCESS_DURATION, } from './semconv'; -import type * as kafkaJs from 'kafkajs'; -import type { - EachBatchHandler, - EachMessageHandler, - Producer, - RecordMetadata, - Message, - ConsumerRunConfig, - KafkaMessage, - Consumer, -} from 'kafkajs'; import { KafkaJsInstrumentationConfig } from './types'; /** @knipignore */ import { PACKAGE_NAME, PACKAGE_VERSION } from './version'; -import { bufferTextMapGetter } from './propagator'; -import { - InstrumentationBase, - InstrumentationNodeModuleDefinition, - safeExecuteInTheMiddle, - isWrapped, -} from '@opentelemetry/instrumentation'; interface ConsumerSpanOptions { topic: string; message: KafkaMessage | undefined; operationType: string; attributes?: Attributes; - context?: Context | undefined; + ctx?: Context | undefined; link?: Link; } +interface StandardAttributes extends Attributes { + [ATTR_MESSAGING_SYSTEM]: string; + [ATTR_MESSAGING_OPERATION_NAME]: OP; + [ATTR_ERROR_TYPE]?: string; +} +interface TopicAttributes { + [ATTR_MESSAGING_DESTINATION_NAME]: string; + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]?: string; +} + +interface ClientDurationAttributes + extends StandardAttributes, + Partial { + [ATTR_SERVER_ADDRESS]: string; + [ATTR_SERVER_PORT]: number; + [ATTR_MESSAGING_OPERATION_TYPE]?: string; +} +interface SentMessagesAttributes + extends StandardAttributes<'send'>, + TopicAttributes { + [ATTR_ERROR_TYPE]?: string; +} +type ConsumedMessagesAttributes = StandardAttributes<'receive' | 'process'>; +interface MessageProcessDurationAttributes + extends StandardAttributes<'process'>, + TopicAttributes { + [ATTR_MESSAGING_SYSTEM]: string; + [ATTR_MESSAGING_OPERATION_NAME]: 'process'; + [ATTR_ERROR_TYPE]?: string; +} +type RecordPendingMetric = (errorType?: string | undefined) => void; + +function prepareCounter( + meter: Counter, + value: number, + attributes: T +): RecordPendingMetric { + return (errorType?: string | undefined) => { + meter.add(value, { + ...attributes, + ...(errorType ? { [ATTR_ERROR_TYPE]: errorType } : {}), + }); + }; +} + +function prepareDurationHistogram( + meter: Histogram, + value: number, + attributes: T +): RecordPendingMetric { + return (errorType?: string | undefined) => { + meter.record((Date.now() - value) / 1000, { + ...attributes, + ...(errorType ? { [ATTR_ERROR_TYPE]: errorType } : {}), + }); + }; +} + +const HISTOGRAM_BUCKET_BOUNDARIES = [ + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, +]; export class KafkaJsInstrumentation extends InstrumentationBase { + private _clientDuration!: Histogram; + private _sentMessages!: Counter; + private _consumedMessages!: Counter; + private _processDuration!: Histogram; + constructor(config: KafkaJsInstrumentationConfig = {}) { super(PACKAGE_NAME, PACKAGE_VERSION, config); } + override _updateMetricInstruments() { + this._clientDuration = this.meter.createHistogram( + METRIC_MESSAGING_CLIENT_OPERATION_DURATION, + { advice: { explicitBucketBoundaries: HISTOGRAM_BUCKET_BOUNDARIES } } + ); + this._sentMessages = this.meter.createCounter( + METRIC_MESSAGING_CLIENT_SENT_MESSAGES + ); + this._consumedMessages = this.meter.createCounter( + METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES + ); + this._processDuration = this.meter.createHistogram( + METRIC_MESSAGING_PROCESS_DURATION, + { advice: { explicitBucketBoundaries: HISTOGRAM_BUCKET_BOUNDARIES } } + ); + } + protected init() { const unpatch = (moduleExports: typeof kafkaJs) => { if (isWrapped(moduleExports?.Kafka?.prototype.producer)) { @@ -130,11 +229,39 @@ export class KafkaJsInstrumentation extends InstrumentationBase + this._consumedMessages.add(event.payload.batchSize, { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'receive', + [ATTR_MESSAGING_DESTINATION_NAME]: event.payload.topic, + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + event.payload.partition + ), + }) + ); + consumer.on(consumer.events.REQUEST, event => { + const [address, port] = event.payload.broker.split(':'); + this._clientDuration.record(event.payload.duration / 1000, { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @${event.payload.apiVersion}? + [ATTR_SERVER_ADDRESS]: address, + [ATTR_SERVER_PORT]: Number.parseInt(port, 10), + }); + }); + + consumer[EVENT_LISTENERS_SET] = true; + } + private _getProducerPatch() { const instrumentation = this; return (original: kafkaJs.Kafka['producer']) => { @@ -162,11 +289,29 @@ export class KafkaJsInstrumentation extends InstrumentationBase { + const [address, port] = event.payload.broker.split(':'); + this._clientDuration.record(event.payload.duration / 1000, { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @${event.payload.apiVersion}? + [ATTR_SERVER_ADDRESS]: address, + [ATTR_SERVER_PORT]: Number.parseInt(port), + }); + }); + + producer[EVENT_LISTENERS_SET] = true; + } + private _getConsumerRunPatch() { const instrumentation = this; return (original: Consumer['run']) => { @@ -217,7 +362,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase { return original!.apply(this, args); } ); - return instrumentation._endSpansOnPromise([span], eachMessagePromise); + return instrumentation._endSpansOnPromise( + [span], + pendingMetrics, + eachMessagePromise + ); }; }; } @@ -249,7 +413,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase { - const spans = payload.batch.messages.map( - (message: KafkaMessage) => { - const propagatedContext: Context = propagation.extract( - ROOT_CONTEXT, - message.headers, - bufferTextMapGetter - ); - const spanContext = trace - .getSpan(propagatedContext) - ?.spanContext(); - let origSpanLink: Link | undefined; - if (spanContext) { - origSpanLink = { - context: spanContext, - }; - } - return instrumentation._startConsumerSpan({ + const startTime = Date.now(); + const spans: Span[] = []; + const pendingMetrics: RecordPendingMetric[] = []; + payload.batch.messages.forEach(message => { + const propagatedContext: Context = propagation.extract( + ROOT_CONTEXT, + message.headers, + bufferTextMapGetter + ); + const spanContext = trace + .getSpan(propagatedContext) + ?.spanContext(); + let origSpanLink: Link | undefined; + if (spanContext) { + origSpanLink = { + context: spanContext, + }; + } + spans.push( + instrumentation._startConsumerSpan({ topic: payload.batch.topic, message, operationType: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, @@ -286,9 +453,23 @@ export class KafkaJsInstrumentation extends InstrumentationBase = original!.apply( this, args @@ -296,6 +477,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase { const batch = args[0]; const messages = batch.topicMessages || []; - const spans: Span[] = messages - .map(topicMessage => - topicMessage.messages.map(message => - instrumentation._startProducerSpan(topicMessage.topic, message) - ) - ) - .reduce((acc, val) => acc.concat(val), []); + const spans: Span[] = []; + const pendingMetrics: RecordPendingMetric[] = []; + + messages.forEach(topicMessage => { + topicMessage.messages.forEach(message => { + spans.push( + instrumentation._startProducerSpan(topicMessage.topic, message) + ); + pendingMetrics.push( + prepareCounter(instrumentation._sentMessages, 1, { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_MESSAGING_DESTINATION_NAME]: topicMessage.topic, + ...(message.partition + ? { + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + message.partition + ), + } + : {}), + }) + ); + }); + }); const origSendResult: Promise = original.apply( this, args ); - return instrumentation._endSpansOnPromise(spans, origSendResult); + return instrumentation._endSpansOnPromise( + spans, + pendingMetrics, + origSendResult + ); }; }; } @@ -342,23 +545,46 @@ export class KafkaJsInstrumentation extends InstrumentationBase + prepareCounter(instrumentation._sentMessages, 1, { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_MESSAGING_DESTINATION_NAME]: record.topic, + ...(m.partition + ? { + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + m.partition + ), + } + : {}), + }) + ); const origSendResult: Promise = original.apply( this, args ); - return instrumentation._endSpansOnPromise(spans, origSendResult); + return instrumentation._endSpansOnPromise( + spans, + pendingMetrics, + origSendResult + ); }; }; } private _endSpansOnPromise( spans: Span[], + pendingMetrics: RecordPendingMetric[], sendPromise: Promise ): Promise { return Promise.resolve(sendPromise) + .then(result => { + pendingMetrics.forEach(m => m()); + return result; + }) .catch(reason => { let errorMessage: string; - let errorType: string | undefined; + let errorType = ERROR_TYPE_VALUE_OTHER; if (typeof reason === 'string') { errorMessage = reason; } else if ( @@ -368,6 +594,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase m(errorType)); spans.forEach(span => { if (errorType) span.setAttribute(ATTR_ERROR_TYPE, errorType); @@ -388,7 +615,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase +) { + assert.strictEqual(errors.length, 0); + const { metrics } = resourceMetrics.scopeMetrics[0]; + assert.strictEqual( + Object.keys(expected).length, + metrics.length, + 'A different number of metrics were found than expected' + ); + Object.entries(expected).forEach(([name, values]) => { + const match = metrics.find(metric => metric.descriptor.name === name); + assert.ok(match, `metric ${name} not found`); + + if (match.dataPointType === DataPointType.HISTOGRAM) { + assert.deepStrictEqual( + match.dataPoints.map(d => d.value.count), + values.map(v => v.count), + 'histogram datapoints do not have the same count' + ); + } else { + assert.deepStrictEqual( + match.dataPoints.map(d => d.value), + values.map(v => v.value), + 'counter datapoints do not match' + ); + } + assert.deepStrictEqual( + match.dataPoints.map(d => d.attributes), + values.map(v => v.attributes) + ); + }); +} describe('instrumentation-kafkajs', () => { propagation.setGlobalPropagator( @@ -112,8 +158,10 @@ describe('instrumentation-kafkajs', () => { }; }; + let metricReader: TestMetricReader; beforeEach(() => { messagesSent = []; + metricReader = initMeterProvider(instrumentation); }); describe('producer', () => { @@ -132,21 +180,29 @@ describe('instrumentation-kafkajs', () => { }; describe('successful send', () => { - beforeEach(async () => { - patchProducerSend(async (): Promise => { - return [ - { - topicName: 'topic-name-1', - partition: 0, - errorCode: 123, - offset: '18', - timestamp: '123456', - }, - ]; - }); + const defaultRecordMetadata = [ + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '18', + timestamp: '123456', + }, + ]; + function initializeProducer( + recordMetadata: RecordMetadata[] = defaultRecordMetadata + ) { + patchProducerSend( + async (): Promise => recordMetadata + ); instrumentation.disable(); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); + } + beforeEach(() => { + initializeProducer(); }); it('simple send create span with right attributes, pass return value correctly and propagate context', async () => { @@ -193,11 +249,25 @@ describe('instrumentation-kafkajs', () => { messagesSent[0], span as ReadableSpan ); + + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '42', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + }, + }, + ], + }); }); it('simple send create span with tombstone attribute', async () => { await producer.send({ - topic: 'topic-name-2', + topic: 'topic-name-1', messages: [ { partition: 42, @@ -211,14 +281,43 @@ describe('instrumentation-kafkajs', () => { assert.strictEqual(spans.length, 1); const span = spans[0]; assert.strictEqual(span.kind, SpanKind.PRODUCER); - assert.strictEqual(span.name, 'send topic-name-2'); + assert.strictEqual(span.name, 'send topic-name-1'); assert.strictEqual( span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE], true ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '42', + }, + }, + ], + }); }); it('send two messages', async () => { + initializeProducer([ + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '18', + timestamp: '123456', + }, + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '19', + timestamp: '123456', + }, + ]); await producer.send({ topic: 'topic-name-1', messages: [ @@ -245,9 +344,44 @@ describe('instrumentation-kafkajs', () => { messagesSent[1], spans[1] as ReadableSpan ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + }, + }, + ], + }); }); it('send batch', async () => { + initializeProducer([ + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '18', + timestamp: '123456', + }, + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '19', + timestamp: '123456', + }, + { + topicName: 'topic-name-2', + partition: 1, + errorCode: 123, + offset: '19', + timestamp: '123456', + }, + ]); await producer.sendBatch({ topicMessages: [ { @@ -285,6 +419,26 @@ describe('instrumentation-kafkajs', () => { spans[i] as ReadableSpan ); } + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + }, + }, + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-2', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + }, + }, + ], + }); }); }); @@ -297,7 +451,9 @@ describe('instrumentation-kafkajs', () => { }); instrumentation.disable(); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); }); it('error in send create failed span', async () => { @@ -320,6 +476,19 @@ describe('instrumentation-kafkajs', () => { span.status.message, 'error thrown from kafka client send' ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], + }); }); it('error in send with multiple messages create failed spans', async () => { @@ -346,6 +515,19 @@ describe('instrumentation-kafkajs', () => { 'error thrown from kafka client send' ); }); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], + }); }); it('error in sendBatch should set error to all spans', async () => { @@ -384,6 +566,28 @@ describe('instrumentation-kafkajs', () => { 'error thrown from kafka client send' ); }); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-2', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], + }); }); }); @@ -402,7 +606,9 @@ describe('instrumentation-kafkajs', () => { instrumentation.disable(); instrumentation.setConfig(config); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); }); it('producer hook add span attribute with value from message', async () => { @@ -437,7 +643,9 @@ describe('instrumentation-kafkajs', () => { instrumentation.disable(); instrumentation.setConfig(config); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); }); it('producer hook add span attribute with value from message', async () => { @@ -557,6 +765,19 @@ describe('instrumentation-kafkajs', () => { undefined ); assert.strictEqual(span.attributes[ATTR_MESSAGING_KAFKA_OFFSET], '123'); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + }, + }, + ], + }); }); it('consume eachMessage tombstone', async () => { @@ -719,6 +940,20 @@ describe('instrumentation-kafkajs', () => { span.status.message, 'error thrown from eachMessage callback' ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], + }); }); it('throwing object with no message', async () => { @@ -745,6 +980,20 @@ describe('instrumentation-kafkajs', () => { const span = spans[0]; assert.strictEqual(span.status.code, SpanStatusCode.ERROR); assert.strictEqual(span.status.message, undefined); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: '_OTHER', + }, + }, + ], + }); }); it('throwing non object', async () => { @@ -768,6 +1017,20 @@ describe('instrumentation-kafkajs', () => { const span = spans[0]; assert.strictEqual(span.status.code, SpanStatusCode.ERROR); assert.strictEqual(span.status.message, undefined); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: '_OTHER', + }, + }, + ], + }); }); }); @@ -839,6 +1102,19 @@ describe('instrumentation-kafkajs', () => { msg2Span.attributes[ATTR_MESSAGING_OPERATION_NAME], 'process' ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1234', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + }, + }, + ], + }); }); it('consumer eachBatch with non promise return value', async () => { @@ -865,7 +1141,9 @@ describe('instrumentation-kafkajs', () => { storeRunConfig(); instrumentation.disable(); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); consumer = kafka.consumer({ groupId: 'testing-group-id' }); }); From 00e0787f2f2757e074e392c025881dcc1ade0454 Mon Sep 17 00:00:00 2001 From: Brian Phillips <28457+brianphillips@users.noreply.github.com> Date: Thu, 13 Mar 2025 20:35:00 -0500 Subject: [PATCH 3/7] chore(instrumentation-kafkajs): enable test coverage reporting --- plugins/node/instrumentation-kafkajs/package.json | 2 +- .../instrumentation-kafkajs/src/instrumentation.ts | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/plugins/node/instrumentation-kafkajs/package.json b/plugins/node/instrumentation-kafkajs/package.json index 122512dbb0..3bca581970 100644 --- a/plugins/node/instrumentation-kafkajs/package.json +++ b/plugins/node/instrumentation-kafkajs/package.json @@ -6,7 +6,7 @@ "types": "build/src/index.d.ts", "repository": "open-telemetry/opentelemetry-js-contrib", "scripts": { - "test": "mocha --require @opentelemetry/contrib-test-utils 'test/**/*.test.ts'", + "test": "nyc mocha --require @opentelemetry/contrib-test-utils 'test/**/*.test.ts'", "tdd": "npm run test -- --watch-extensions ts --watch", "clean": "rimraf build/*", "lint": "eslint . --ext .ts", diff --git a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts index 9ba1454115..7eb515d355 100644 --- a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts +++ b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts @@ -84,7 +84,7 @@ interface ConsumerSpanOptions { topic: string; message: KafkaMessage | undefined; operationType: string; - attributes?: Attributes; + attributes: Attributes; ctx?: Context | undefined; link?: Link; } @@ -583,9 +583,9 @@ export class KafkaJsInstrumentation extends InstrumentationBase { - let errorMessage: string; - let errorType = ERROR_TYPE_VALUE_OTHER; - if (typeof reason === 'string') { + let errorMessage: string | undefined; + let errorType: string = ERROR_TYPE_VALUE_OTHER; + if (typeof reason === 'string' || reason === undefined) { errorMessage = reason; } else if ( typeof reason === 'object' && @@ -597,7 +597,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase m(errorType)); spans.forEach(span => { - if (errorType) span.setAttribute(ATTR_ERROR_TYPE, errorType); + span.setAttribute(ATTR_ERROR_TYPE, errorType); span.setStatus({ code: SpanStatusCode.ERROR, message: errorMessage, @@ -617,7 +617,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase Date: Fri, 14 Mar 2025 08:14:49 -0500 Subject: [PATCH 4/7] refactor(instrumentation-kafkajs): rearrange consumed message metric collection for testability --- .../src/instrumentation.ts | 85 ++++++++++--------- .../test/kafkajs.test.ts | 69 ++++++++++++++- 2 files changed, 113 insertions(+), 41 deletions(-) diff --git a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts index 7eb515d355..292bda0983 100644 --- a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts +++ b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts @@ -51,11 +51,7 @@ import type { Producer, RecordMetadata, } from 'kafkajs'; -import { - ConsumerExtended, - EVENT_LISTENERS_SET, - ProducerExtended, -} from './internal-types'; +import { EVENT_LISTENERS_SET } from './internal-types'; import { bufferTextMapGetter } from './propagator'; import { ATTR_MESSAGING_BATCH_MESSAGE_COUNT, @@ -88,6 +84,22 @@ interface ConsumerSpanOptions { ctx?: Context | undefined; link?: Link; } +// This interface acts as a strict subset of the KafkaJS Consumer and +// Producer interfaces (just for the event we're needing) +interface KafkaEventEmitter { + on( + eventName: + | kafkaJs.ConsumerEvents['REQUEST'] + | kafkaJs.ProducerEvents['REQUEST'], + listener: (event: kafkaJs.RequestEvent) => void + ): void; + events: { + REQUEST: + | kafkaJs.ConsumerEvents['REQUEST'] + | kafkaJs.ProducerEvents['REQUEST']; + }; + [EVENT_LISTENERS_SET]?: boolean; +} interface StandardAttributes extends Attributes { [ATTR_MESSAGING_SYSTEM]: string; @@ -229,37 +241,27 @@ export class KafkaJsInstrumentation extends InstrumentationBase - this._consumedMessages.add(event.payload.batchSize, { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: 'receive', - [ATTR_MESSAGING_DESTINATION_NAME]: event.payload.topic, - [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( - event.payload.partition - ), - }) - ); - consumer.on(consumer.events.REQUEST, event => { + kafkaObj.on(kafkaObj.events.REQUEST, event => { const [address, port] = event.payload.broker.split(':'); this._clientDuration.record(event.payload.duration / 1000, { [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @${event.payload.apiVersion}? + [ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @v${event.payload.apiVersion}? [ATTR_SERVER_ADDRESS]: address, [ATTR_SERVER_PORT]: Number.parseInt(port, 10), }); }); - consumer[EVENT_LISTENERS_SET] = true; + kafkaObj[EVENT_LISTENERS_SET] = true; } private _getProducerPatch() { @@ -289,29 +291,13 @@ export class KafkaJsInstrumentation extends InstrumentationBase { - const [address, port] = event.payload.broker.split(':'); - this._clientDuration.record(event.payload.duration / 1000, { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @${event.payload.apiVersion}? - [ATTR_SERVER_ADDRESS]: address, - [ATTR_SERVER_PORT]: Number.parseInt(port), - }); - }); - - producer[EVENT_LISTENERS_SET] = true; - } - private _getConsumerRunPatch() { const instrumentation = this; return (original: Consumer['run']) => { @@ -383,6 +369,14 @@ export class KafkaJsInstrumentation extends InstrumentationBase { const startTime = Date.now(); const spans: Span[] = []; - const pendingMetrics: RecordPendingMetric[] = []; + const pendingMetrics: RecordPendingMetric[] = [ + prepareCounter( + instrumentation._consumedMessages, + payload.batch.messages.length, + { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_MESSAGING_DESTINATION_NAME]: payload.batch.topic, + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + payload.batch.partition + ), + } + ), + ]; payload.batch.messages.forEach(message => { const propagatedContext: Context = propagation.extract( ROOT_CONTEXT, diff --git a/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts b/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts index c7c49b3a71..264205b9ca 100644 --- a/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts +++ b/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts @@ -37,6 +37,7 @@ import { METRIC_MESSAGING_CLIENT_SENT_MESSAGES, MESSAGING_SYSTEM_VALUE_KAFKA, METRIC_MESSAGING_PROCESS_DURATION, + METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES, } from '../src/semconv'; import { getTestSpans, @@ -91,13 +92,13 @@ function assertMetricCollection( assert.deepStrictEqual( match.dataPoints.map(d => d.value.count), values.map(v => v.count), - 'histogram datapoints do not have the same count' + `${name} datapoints do not have the same count` ); } else { assert.deepStrictEqual( match.dataPoints.map(d => d.value), values.map(v => v.value), - 'counter datapoints do not match' + `${name} datapoint values do not match` ); } assert.deepStrictEqual( @@ -777,6 +778,17 @@ describe('instrumentation-kafkajs', () => { }, }, ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + }, + }, + ], }); }); @@ -953,6 +965,18 @@ describe('instrumentation-kafkajs', () => { }, }, ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], }); }); @@ -993,6 +1017,18 @@ describe('instrumentation-kafkajs', () => { }, }, ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: '_OTHER', + }, + }, + ], }); }); @@ -1030,6 +1066,18 @@ describe('instrumentation-kafkajs', () => { }, }, ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: '_OTHER', + }, + }, + ], }); }); }); @@ -1114,6 +1162,17 @@ describe('instrumentation-kafkajs', () => { }, }, ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1234', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + }, + }, + ], }); }); @@ -1283,5 +1342,11 @@ describe('instrumentation-kafkajs', () => { '123' ); }); + it('exposes a keys method', () => { + assert.deepStrictEqual(bufferTextMapGetter.keys({ a: 1, b: 2 }), [ + 'a', + 'b', + ]); + }); }); }); From a51552cf72ee12539fe5024a1008a50657e16a38 Mon Sep 17 00:00:00 2001 From: Brian Phillips <28457+brianphillips@users.noreply.github.com> Date: Fri, 14 Mar 2025 08:47:41 -0500 Subject: [PATCH 5/7] fix(instrumentation-kafkajs): check for definedness, not truthiness on partition IDs a message with a partition ID of `0` should now have the proper attribute set --- .../instrumentation-kafkajs/src/instrumentation.ts | 11 ++++++----- .../node/instrumentation-kafkajs/test/kafkajs.test.ts | 5 +++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts index 292bda0983..cc80515109 100644 --- a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts +++ b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts @@ -516,7 +516,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase { topic: 'topic-name-1', messages: [ { + partition: 0, value: 'message1', }, { + partition: 0, value: 'message2', }, ], @@ -352,6 +354,7 @@ describe('instrumentation-kafkajs', () => { attributes: { [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '0', [ATTR_MESSAGING_OPERATION_NAME]: 'send', }, }, @@ -400,6 +403,7 @@ describe('instrumentation-kafkajs', () => { topic: 'topic-name-2', messages: [ { + partition: 1, value: 'message2-1', }, ], @@ -435,6 +439,7 @@ describe('instrumentation-kafkajs', () => { attributes: { [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-2', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', [ATTR_MESSAGING_OPERATION_NAME]: 'send', }, }, From 7db5038453e7ef9de32b96a76a0ed64ea8a82a12 Mon Sep 17 00:00:00 2001 From: Brian Phillips <28457+brianphillips@users.noreply.github.com> Date: Fri, 14 Mar 2025 09:11:55 -0500 Subject: [PATCH 6/7] refactor(instrumentation-kafkajs): move client duration metric to separate method for testability --- .../src/instrumentation.ts | 25 +++++++----- .../test/kafkajs.test.ts | 40 ++++++++++++++++++- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts index cc80515109..b8b442b7bb 100644 --- a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts +++ b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts @@ -251,19 +251,26 @@ export class KafkaJsInstrumentation extends InstrumentationBase { - const [address, port] = event.payload.broker.split(':'); - this._clientDuration.record(event.payload.duration / 1000, { - [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, - [ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @v${event.payload.apiVersion}? - [ATTR_SERVER_ADDRESS]: address, - [ATTR_SERVER_PORT]: Number.parseInt(port, 10), - }); - }); + kafkaObj.on( + kafkaObj.events.REQUEST, + this._recordClientDurationMetric.bind(this) + ); kafkaObj[EVENT_LISTENERS_SET] = true; } + private _recordClientDurationMetric( + event: Pick + ) { + const [address, port] = event.payload.broker.split(':'); + this._clientDuration.record(event.payload.duration / 1000, { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @v${event.payload.apiVersion}? + [ATTR_SERVER_ADDRESS]: address, + [ATTR_SERVER_PORT]: Number.parseInt(port, 10), + }); + } + private _getProducerPatch() { const instrumentation = this; return (original: kafkaJs.Kafka['producer']) => { diff --git a/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts b/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts index d233dbc514..1d13a7d6d1 100644 --- a/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts +++ b/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts @@ -38,6 +38,7 @@ import { MESSAGING_SYSTEM_VALUE_KAFKA, METRIC_MESSAGING_PROCESS_DURATION, METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES, + METRIC_MESSAGING_CLIENT_OPERATION_DURATION, } from '../src/semconv'; import { getTestSpans, @@ -68,7 +69,11 @@ import { } from 'kafkajs'; import { DummyPropagation } from './DummyPropagation'; import { bufferTextMapGetter } from '../src/propagator'; -import { ATTR_ERROR_TYPE } from '@opentelemetry/semantic-conventions'; +import { + ATTR_ERROR_TYPE, + ATTR_SERVER_ADDRESS, + ATTR_SERVER_PORT, +} from '@opentelemetry/semantic-conventions'; function assertMetricCollection( { errors, resourceMetrics }: CollectionResult, @@ -1334,6 +1339,39 @@ describe('instrumentation-kafkajs', () => { ); }); }); + describe('client duration metric', () => { + it('records the metric', async () => { + instrumentation['_recordClientDurationMetric']({ + payload: { + broker: 'kafka.host:4789', + duration: 242, + apiName: 'some-operation', + apiKey: 123, + apiVersion: 1, + clientId: 'client-id', + correlationId: 456, + createdAt: Date.now(), + pendingDuration: 0, + sentAt: Date.now(), + size: 1024, + }, + }); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_OPERATION_DURATION]: [ + { + count: 1, + value: 0.232, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'some-operation', + [ATTR_SERVER_ADDRESS]: 'kafka.host', + [ATTR_SERVER_PORT]: 4789, + }, + }, + ], + }); + }); + }); describe('bufferTextMapGetter', () => { it('is possible to retrieve keys case insensitively', () => { From 65bd1b72f81467b816803053d2dc94a32c209eab Mon Sep 17 00:00:00 2001 From: Brian Phillips <28457+brianphillips@users.noreply.github.com> Date: Fri, 14 Mar 2025 09:51:33 -0500 Subject: [PATCH 7/7] chore(instrumentation-kafkajs): better assertions on histogram values --- .../test/kafkajs.test.ts | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts b/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts index 1d13a7d6d1..7e2ca03906 100644 --- a/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts +++ b/plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts @@ -79,7 +79,12 @@ function assertMetricCollection( { errors, resourceMetrics }: CollectionResult, expected: Record< string, - { count?: number; value?: number; attributes: Attributes }[] + { + count?: number; + value?: number; + buckets?: Record; + attributes: Attributes; + }[] > ) { assert.strictEqual(errors.length, 0); @@ -99,6 +104,18 @@ function assertMetricCollection( values.map(v => v.count), `${name} datapoints do not have the same count` ); + values.forEach(({ buckets }, i) => { + if (buckets) { + const { boundaries, counts } = match.dataPoints[i].value.buckets; + const actualBuckets = counts.reduce((acc, n, j) => { + if (n > 0) { + acc[boundaries[j]] = n; + } + return acc; + }, {} as Record); + assert.deepStrictEqual(actualBuckets, buckets); + } + }); } else { assert.deepStrictEqual( match.dataPoints.map(d => d.value), @@ -1344,7 +1361,7 @@ describe('instrumentation-kafkajs', () => { instrumentation['_recordClientDurationMetric']({ payload: { broker: 'kafka.host:4789', - duration: 242, + duration: 250, apiName: 'some-operation', apiKey: 123, apiVersion: 1, @@ -1360,7 +1377,7 @@ describe('instrumentation-kafkajs', () => { [METRIC_MESSAGING_CLIENT_OPERATION_DURATION]: [ { count: 1, - value: 0.232, + buckets: { '0.25': 1 }, attributes: { [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, [ATTR_MESSAGING_OPERATION_NAME]: 'some-operation',