diff --git a/package-lock.json b/package-lock.json index 3ae425c382..ba14253a7a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9608,6 +9608,7 @@ "version": "1.30.0", "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.30.0.tgz", "integrity": "sha512-4VlGgo32k2EQ2wcCY3vEU28A0O13aOtHz3Xt2/2U5FAh9EfhD6t6DqL5Z6yAnRCntbTFDU4YfbpyzSlHNWycPw==", + "license": "Apache-2.0", "engines": { "node": ">=14" } @@ -35305,7 +35306,7 @@ "license": "Apache-2.0", "dependencies": { "@opentelemetry/instrumentation": "^0.200.0", - "@opentelemetry/semantic-conventions": "^1.27.0" + "@opentelemetry/semantic-conventions": "^1.30.0" }, "devDependencies": { "@opentelemetry/api": "^1.3.0", @@ -45783,7 +45784,7 @@ "@opentelemetry/contrib-test-utils": "^0.46.0", "@opentelemetry/instrumentation": "^0.200.0", "@opentelemetry/sdk-trace-base": "^2.0.0", - "@opentelemetry/semantic-conventions": "^1.27.0", + "@opentelemetry/semantic-conventions": "^1.30.0", "@types/mocha": "10.0.10", "@types/node": "18.18.14", "@types/sinon": "^17.0.0", diff --git a/plugins/node/instrumentation-kafkajs/README.md b/plugins/node/instrumentation-kafkajs/README.md index 68b0319a71..6bf6185505 100644 --- a/plugins/node/instrumentation-kafkajs/README.md +++ b/plugins/node/instrumentation-kafkajs/README.md @@ -23,7 +23,9 @@ npm install --save @opentelemetry/instrumentation-kafkajs ```js const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node'); -const { KafkaJsInstrumentation } = require('@opentelemetry/instrumentation-kafkajs'); +const { + KafkaJsInstrumentation, +} = require('@opentelemetry/instrumentation-kafkajs'); const { registerInstrumentations } = require('@opentelemetry/instrumentation'); const provider = new NodeTracerProvider(); @@ -42,22 +44,51 @@ registerInstrumentations({ You can set the following: -| Options | Type | Description | -| ---------------------------- | -------------------------------------- | -------------------------------------------------------------------------------------------------------------------- | -| `producerHook` | `KafkaProducerCustomAttributeFunction` | Function called before a producer message is sent. Allows for adding custom attributes to the span. | -| `consumerHook` | `KafkaConsumerCustomAttributeFunction` | Function called before a consumer message is processed. Allows for adding custom attributes to the span. | +| Options | Type | Description | +| -------------- | -------------------------------------- | -------------------------------------------------------------------------------------------------------- | +| `producerHook` | `KafkaProducerCustomAttributeFunction` | Function called before a producer message is sent. Allows for adding custom attributes to the span. | +| `consumerHook` | `KafkaConsumerCustomAttributeFunction` | Function called before a consumer message is processed. Allows for adding custom attributes to the span. | ## Semantic Conventions -This package uses `@opentelemetry/semantic-conventions` version `1.24+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md) +This package uses `@opentelemetry/semantic-conventions` version `1.30+`, which implements Semantic Convention [Version 1.30.0](https://github.com/open-telemetry/semantic-conventions/blob/v1.30.0/docs/README.md) -Attributes collected: +### Spans Emitted -| Attribute | Short Description | -| -----------------------------| ----------------------------------------------------- | -| `messaging.system` | An identifier for the messaging system being used. | -| `messaging.destination` | The message destination name. | -| `messaging.operation` | A string identifying the kind of messaging operation. | +| KafkaJS Object | Action | Span Kind | Span Name | Operation Type / Name | +| -------------- | -------------------------- | --------- | -------------------------- | --------------------- | +| Consumer | `eachBatch` | Client | `poll ` | `receive` / `poll` | +| Consumer | `eachBatch`, `eachMessage` | Consumer | `process ` [1] | `process` / `process` | +| Producer | `send` | Producer | `send ` | `send` / `send` | + +**[1] `process `:** In the context of `eachBatch`, this span will be emitted for each message in the batch but the timing (start, end, duration) will reflect the timing of the batch. + +### Metrics Emitted + +| KafkaJS Object | Metric Name | Short Description | +| --------------------- | ------------------------------------- | ------------------------------------------------------------ | +| Consumer | `messaging.process.duration` | Duration of processing operation. [1] | +| Consumer | `messaging.client.consumed.messages` | Number of messages that were delivered to the application. | +| Consumer and Producer | `messaging.client.operation.duration` | Number of messages that were delivered to the application. | +| Producer | `messaging.client.sent.messages` | Number of messages producer attempted to send to the broker. | + +**[1] `messaging.process.duration`:** In the context of `eachBatch`, this metric will be emitted once for each message but the value reflects the duration of the entire batch. + +### Attributes Collected + +These attributes are added to both spans and metrics, where possible. + +| Attribute | Short Description | +| ------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `messaging.system` | An identifier for the messaging system being used (i.e. `"kafka"`). | +| `messaging.destination.name` | The message destination name. | +| `messaging.operation.type` | A string identifying the type of messaging operation. | +| `messaging.operation.name` | The system-specific name of the messaging operation. | +| `messaging.operation.name` | The system-specific name of the messaging operation. | +| `messaging.kafka.message.key` | A stringified value representing the key of the Kafka message (if present). | +| `messaging.kafka.message.tombstone` | A boolean that is true if the message is a tombstone. | +| `messaging.kafka.offset` | The offset of a record in the corresponding Kafka partition. | +| `messaging.destination.partition.id` | The identifier of the partition messages are sent to or received from, unique within the `messaging.destination.name`. **Note:** only available on producer spans. | ## Useful links diff --git a/plugins/node/instrumentation-kafkajs/package.json b/plugins/node/instrumentation-kafkajs/package.json index 01143a416a..57c618173c 100644 --- a/plugins/node/instrumentation-kafkajs/package.json +++ b/plugins/node/instrumentation-kafkajs/package.json @@ -6,7 +6,7 @@ "types": "build/src/index.d.ts", "repository": "open-telemetry/opentelemetry-js-contrib", "scripts": { - "test": "mocha --require @opentelemetry/contrib-test-utils 'test/**/*.test.ts'", + "test": "nyc mocha --require @opentelemetry/contrib-test-utils 'test/**/*.test.ts'", "test-all-versions": "tav", "tdd": "npm run test -- --watch-extensions ts --watch", "clean": "rimraf build/*", @@ -58,7 +58,7 @@ }, "dependencies": { "@opentelemetry/instrumentation": "^0.200.0", - "@opentelemetry/semantic-conventions": "^1.27.0" + "@opentelemetry/semantic-conventions": "^1.30.0" }, "homepage": "https://github.com/open-telemetry/opentelemetry-js-contrib/tree/main/plugins/node/instrumentation-kafkajs#readme" } diff --git a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts index 66f333ef8a..4ea12bd502 100644 --- a/plugins/node/instrumentation-kafkajs/src/instrumentation.ts +++ b/plugins/node/instrumentation-kafkajs/src/instrumentation.ts @@ -15,50 +15,180 @@ */ import { - SpanKind, - Span, - SpanStatusCode, + Attributes, Context, - propagation, - Link, - trace, context, + Counter, + Histogram, + Link, + propagation, ROOT_CONTEXT, + Span, + SpanKind, + SpanStatusCode, + trace, } from '@opentelemetry/api'; import { - MESSAGINGOPERATIONVALUES_PROCESS, - MESSAGINGOPERATIONVALUES_RECEIVE, - SEMATTRS_MESSAGING_SYSTEM, - SEMATTRS_MESSAGING_DESTINATION, - SEMATTRS_MESSAGING_OPERATION, + InstrumentationBase, + InstrumentationNodeModuleDefinition, + isWrapped, + safeExecuteInTheMiddle, +} from '@opentelemetry/instrumentation'; +import { + ATTR_ERROR_TYPE, + ATTR_SERVER_ADDRESS, + ATTR_SERVER_PORT, + ERROR_TYPE_VALUE_OTHER, } from '@opentelemetry/semantic-conventions'; import type * as kafkaJs from 'kafkajs'; import type { + Consumer, + ConsumerRunConfig, EachBatchHandler, EachMessageHandler, + KafkaMessage, + Message, Producer, RecordMetadata, - Message, - ConsumerRunConfig, - KafkaMessage, - Consumer, } from 'kafkajs'; +import { EVENT_LISTENERS_SET } from './internal-types'; +import { bufferTextMapGetter } from './propagator'; +import { + ATTR_MESSAGING_BATCH_MESSAGE_COUNT, + ATTR_MESSAGING_DESTINATION_NAME, + ATTR_MESSAGING_DESTINATION_PARTITION_ID, + ATTR_MESSAGING_KAFKA_MESSAGE_KEY, + ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE, + ATTR_MESSAGING_KAFKA_OFFSET, + ATTR_MESSAGING_OPERATION_NAME, + ATTR_MESSAGING_OPERATION_TYPE, + ATTR_MESSAGING_SYSTEM, + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + MESSAGING_OPERATION_TYPE_VALUE_RECEIVE, + MESSAGING_OPERATION_TYPE_VALUE_SEND, + MESSAGING_SYSTEM_VALUE_KAFKA, + METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES, + METRIC_MESSAGING_CLIENT_OPERATION_DURATION, + METRIC_MESSAGING_CLIENT_SENT_MESSAGES, + METRIC_MESSAGING_PROCESS_DURATION, +} from './semconv'; import { KafkaJsInstrumentationConfig } from './types'; /** @knipignore */ import { PACKAGE_NAME, PACKAGE_VERSION } from './version'; -import { bufferTextMapGetter } from './propagator'; -import { - InstrumentationBase, - InstrumentationNodeModuleDefinition, - safeExecuteInTheMiddle, - isWrapped, -} from '@opentelemetry/instrumentation'; +interface ConsumerSpanOptions { + topic: string; + message: KafkaMessage | undefined; + operationType: string; + attributes: Attributes; + ctx?: Context | undefined; + link?: Link; +} +// This interface acts as a strict subset of the KafkaJS Consumer and +// Producer interfaces (just for the event we're needing) +interface KafkaEventEmitter { + on( + eventName: + | kafkaJs.ConsumerEvents['REQUEST'] + | kafkaJs.ProducerEvents['REQUEST'], + listener: (event: kafkaJs.RequestEvent) => void + ): void; + events: { + REQUEST: + | kafkaJs.ConsumerEvents['REQUEST'] + | kafkaJs.ProducerEvents['REQUEST']; + }; + [EVENT_LISTENERS_SET]?: boolean; +} + +interface StandardAttributes extends Attributes { + [ATTR_MESSAGING_SYSTEM]: string; + [ATTR_MESSAGING_OPERATION_NAME]: OP; + [ATTR_ERROR_TYPE]?: string; +} +interface TopicAttributes { + [ATTR_MESSAGING_DESTINATION_NAME]: string; + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]?: string; +} + +interface ClientDurationAttributes + extends StandardAttributes, + Partial { + [ATTR_SERVER_ADDRESS]: string; + [ATTR_SERVER_PORT]: number; + [ATTR_MESSAGING_OPERATION_TYPE]?: string; +} +interface SentMessagesAttributes + extends StandardAttributes<'send'>, + TopicAttributes { + [ATTR_ERROR_TYPE]?: string; +} +type ConsumedMessagesAttributes = StandardAttributes<'receive' | 'process'>; +interface MessageProcessDurationAttributes + extends StandardAttributes<'process'>, + TopicAttributes { + [ATTR_MESSAGING_SYSTEM]: string; + [ATTR_MESSAGING_OPERATION_NAME]: 'process'; + [ATTR_ERROR_TYPE]?: string; +} +type RecordPendingMetric = (errorType?: string | undefined) => void; + +function prepareCounter( + meter: Counter, + value: number, + attributes: T +): RecordPendingMetric { + return (errorType?: string | undefined) => { + meter.add(value, { + ...attributes, + ...(errorType ? { [ATTR_ERROR_TYPE]: errorType } : {}), + }); + }; +} + +function prepareDurationHistogram( + meter: Histogram, + value: number, + attributes: T +): RecordPendingMetric { + return (errorType?: string | undefined) => { + meter.record((Date.now() - value) / 1000, { + ...attributes, + ...(errorType ? { [ATTR_ERROR_TYPE]: errorType } : {}), + }); + }; +} + +const HISTOGRAM_BUCKET_BOUNDARIES = [ + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, +]; export class KafkaJsInstrumentation extends InstrumentationBase { + private _clientDuration!: Histogram; + private _sentMessages!: Counter; + private _consumedMessages!: Counter; + private _processDuration!: Histogram; + constructor(config: KafkaJsInstrumentationConfig = {}) { super(PACKAGE_NAME, PACKAGE_VERSION, config); } + override _updateMetricInstruments() { + this._clientDuration = this.meter.createHistogram( + METRIC_MESSAGING_CLIENT_OPERATION_DURATION, + { advice: { explicitBucketBoundaries: HISTOGRAM_BUCKET_BOUNDARIES } } + ); + this._sentMessages = this.meter.createCounter( + METRIC_MESSAGING_CLIENT_SENT_MESSAGES + ); + this._consumedMessages = this.meter.createCounter( + METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES + ); + this._processDuration = this.meter.createHistogram( + METRIC_MESSAGING_PROCESS_DURATION, + { advice: { explicitBucketBoundaries: HISTOGRAM_BUCKET_BOUNDARIES } } + ); + } + protected init() { const unpatch = (moduleExports: typeof kafkaJs) => { if (isWrapped(moduleExports?.Kafka?.prototype.producer)) { @@ -111,11 +241,36 @@ export class KafkaJsInstrumentation extends InstrumentationBase + ) { + const [address, port] = event.payload.broker.split(':'); + this._clientDuration.record(event.payload.duration / 1000, { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: `${event.payload.apiName}`, // potentially suffix with @v${event.payload.apiVersion}? + [ATTR_SERVER_ADDRESS]: address, + [ATTR_SERVER_PORT]: Number.parseInt(port, 10), + }); + } + private _getProducerPatch() { const instrumentation = this; return (original: kafkaJs.Kafka['producer']) => { @@ -143,6 +298,8 @@ export class KafkaJsInstrumentation extends InstrumentationBase { const payload = args[0]; // https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers - const receivingSpan = instrumentation._startConsumerSpan( - payload.batch.topic, - undefined, - MESSAGINGOPERATIONVALUES_RECEIVE, - ROOT_CONTEXT - ); + const receivingSpan = instrumentation._startConsumerSpan({ + topic: payload.batch.topic, + message: undefined, + operationType: MESSAGING_OPERATION_TYPE_VALUE_RECEIVE, + ctx: ROOT_CONTEXT, + attributes: { + [ATTR_MESSAGING_BATCH_MESSAGE_COUNT]: payload.batch.messages.length, + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + payload.batch.partition + ), + }, + }); return context.with( trace.setSpan(context.active(), receivingSpan), () => { - const spans = payload.batch.messages.map( - (message: KafkaMessage) => { - const propagatedContext: Context = propagation.extract( - ROOT_CONTEXT, - message.headers, - bufferTextMapGetter - ); - const spanContext = trace - .getSpan(propagatedContext) - ?.spanContext(); - let origSpanLink: Link | undefined; - if (spanContext) { - origSpanLink = { - context: spanContext, - }; + const startTime = Date.now(); + const spans: Span[] = []; + const pendingMetrics: RecordPendingMetric[] = [ + prepareCounter( + instrumentation._consumedMessages, + payload.batch.messages.length, + { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_MESSAGING_DESTINATION_NAME]: payload.batch.topic, + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + payload.batch.partition + ), } - return instrumentation._startConsumerSpan( - payload.batch.topic, - message, - MESSAGINGOPERATIONVALUES_PROCESS, - undefined, - origSpanLink - ); + ), + ]; + payload.batch.messages.forEach(message => { + const propagatedContext: Context = propagation.extract( + ROOT_CONTEXT, + message.headers, + bufferTextMapGetter + ); + const spanContext = trace + .getSpan(propagatedContext) + ?.spanContext(); + let origSpanLink: Link | undefined; + if (spanContext) { + origSpanLink = { + context: spanContext, + }; } - ); + spans.push( + instrumentation._startConsumerSpan({ + topic: payload.batch.topic, + message, + operationType: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + link: origSpanLink, + attributes: { + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + payload.batch.partition + ), + }, + }) + ); + pendingMetrics.push( + prepareDurationHistogram( + instrumentation._processDuration, + startTime, + { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_MESSAGING_DESTINATION_NAME]: payload.batch.topic, + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + payload.batch.partition + ), + } + ) + ); + }); const batchMessagePromise: Promise = original!.apply( this, args @@ -262,6 +491,7 @@ export class KafkaJsInstrumentation extends InstrumentationBase { const batch = args[0]; const messages = batch.topicMessages || []; - const spans: Span[] = messages - .map(topicMessage => - topicMessage.messages.map(message => - instrumentation._startProducerSpan(topicMessage.topic, message) - ) - ) - .reduce((acc, val) => acc.concat(val), []); + const spans: Span[] = []; + const pendingMetrics: RecordPendingMetric[] = []; + + messages.forEach(topicMessage => { + topicMessage.messages.forEach(message => { + spans.push( + instrumentation._startProducerSpan(topicMessage.topic, message) + ); + pendingMetrics.push( + prepareCounter(instrumentation._sentMessages, 1, { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_MESSAGING_DESTINATION_NAME]: topicMessage.topic, + ...(message.partition !== undefined + ? { + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + message.partition + ), + } + : {}), + }) + ); + }); + }); const origSendResult: Promise = original.apply( this, args ); - return instrumentation._endSpansOnPromise(spans, origSendResult); + return instrumentation._endSpansOnPromise( + spans, + pendingMetrics, + origSendResult + ); }; }; } @@ -308,35 +559,64 @@ export class KafkaJsInstrumentation extends InstrumentationBase + prepareCounter(instrumentation._sentMessages, 1, { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_MESSAGING_DESTINATION_NAME]: record.topic, + ...(m.partition !== undefined + ? { + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String( + m.partition + ), + } + : {}), + }) + ); const origSendResult: Promise = original.apply( this, args ); - return instrumentation._endSpansOnPromise(spans, origSendResult); + return instrumentation._endSpansOnPromise( + spans, + pendingMetrics, + origSendResult + ); }; }; } private _endSpansOnPromise( spans: Span[], + pendingMetrics: RecordPendingMetric[], sendPromise: Promise ): Promise { return Promise.resolve(sendPromise) + .then(result => { + pendingMetrics.forEach(m => m()); + return result; + }) .catch(reason => { - let errorMessage: string; - if (typeof reason === 'string') errorMessage = reason; - else if ( + let errorMessage: string | undefined; + let errorType: string = ERROR_TYPE_VALUE_OTHER; + if (typeof reason === 'string' || reason === undefined) { + errorMessage = reason; + } else if ( typeof reason === 'object' && Object.prototype.hasOwnProperty.call(reason, 'message') - ) + ) { errorMessage = reason.message; + errorType = reason.constructor.name; + } + pendingMetrics.forEach(m => m(errorType)); - spans.forEach(span => + spans.forEach(span => { + span.setAttribute(ATTR_ERROR_TYPE, errorType); span.setStatus({ code: SpanStatusCode.ERROR, message: errorMessage, - }) - ); + }); + }); throw reason; }) @@ -345,25 +625,42 @@ export class KafkaJsInstrumentation extends InstrumentationBase; + attributes: Attributes; + }[] + > +) { + assert.strictEqual(errors.length, 0); + const { metrics } = resourceMetrics.scopeMetrics[0]; + assert.strictEqual( + Object.keys(expected).length, + metrics.length, + 'A different number of metrics were found than expected' + ); + Object.entries(expected).forEach(([name, values]) => { + const match = metrics.find(metric => metric.descriptor.name === name); + assert.ok(match, `metric ${name} not found`); + + if (match.dataPointType === DataPointType.HISTOGRAM) { + assert.deepStrictEqual( + match.dataPoints.map(d => d.value.count), + values.map(v => v.count), + `${name} datapoints do not have the same count` + ); + values.forEach(({ buckets }, i) => { + if (buckets) { + const { boundaries, counts } = match.dataPoints[i].value.buckets; + const actualBuckets = counts.reduce((acc, n, j) => { + if (n > 0) { + acc[boundaries[j]] = n; + } + return acc; + }, {} as Record); + assert.deepStrictEqual(actualBuckets, buckets); + } + }); + } else { + assert.deepStrictEqual( + match.dataPoints.map(d => d.value), + values.map(v => v.value), + `${name} datapoint values do not match` + ); + } + assert.deepStrictEqual( + match.dataPoints.map(d => d.attributes), + values.map(v => v.attributes) + ); + }); +} describe('instrumentation-kafkajs', () => { propagation.setGlobalPropagator( @@ -107,8 +181,10 @@ describe('instrumentation-kafkajs', () => { }; }; + let metricReader: TestMetricReader; beforeEach(() => { messagesSent = []; + metricReader = initMeterProvider(instrumentation); }); describe('producer', () => { @@ -127,21 +203,29 @@ describe('instrumentation-kafkajs', () => { }; describe('successful send', () => { - beforeEach(async () => { - patchProducerSend(async (): Promise => { - return [ - { - topicName: 'topic-name-1', - partition: 0, - errorCode: 123, - offset: '18', - timestamp: '123456', - }, - ]; - }); + const defaultRecordMetadata = [ + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '18', + timestamp: '123456', + }, + ]; + function initializeProducer( + recordMetadata: RecordMetadata[] = defaultRecordMetadata + ) { + patchProducerSend( + async (): Promise => recordMetadata + ); instrumentation.disable(); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); + } + beforeEach(() => { + initializeProducer(); }); it('simple send create span with right attributes, pass return value correctly and propagate context', async () => { @@ -149,6 +233,8 @@ describe('instrumentation-kafkajs', () => { topic: 'topic-name-1', messages: [ { + partition: 42, + key: 'message-key-0', value: 'testing message content', }, ], @@ -161,29 +247,109 @@ describe('instrumentation-kafkajs', () => { assert.strictEqual(spans.length, 1); const span = spans[0]; assert.strictEqual(span.kind, SpanKind.PRODUCER); - assert.strictEqual(span.name, 'topic-name-1'); + assert.strictEqual(span.name, 'send topic-name-1'); assert.strictEqual(span.status.code, SpanStatusCode.UNSET); - assert.strictEqual(span.attributes[SEMATTRS_MESSAGING_SYSTEM], 'kafka'); + assert.strictEqual(span.attributes[ATTR_MESSAGING_SYSTEM], 'kafka'); assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_DESTINATION], + span.attributes[ATTR_MESSAGING_DESTINATION_NAME], 'topic-name-1' ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_DESTINATION_PARTITION_ID], + '42' + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE], + undefined + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_KEY], + 'message-key-0' + ); assert.strictEqual(messagesSent.length, 1); expectKafkaHeadersToMatchSpanContext( messagesSent[0], span as ReadableSpan ); + + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '42', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + }, + }, + ], + }); + }); + + it('simple send create span with tombstone attribute', async () => { + await producer.send({ + topic: 'topic-name-1', + messages: [ + { + partition: 42, + key: 'message-key-1', + value: null, + }, + ], + }); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.kind, SpanKind.PRODUCER); + assert.strictEqual(span.name, 'send topic-name-1'); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE], + true + ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '42', + }, + }, + ], + }); }); it('send two messages', async () => { + initializeProducer([ + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '18', + timestamp: '123456', + }, + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '19', + timestamp: '123456', + }, + ]); await producer.send({ topic: 'topic-name-1', messages: [ { + partition: 0, value: 'message1', }, { + partition: 0, value: 'message2', }, ], @@ -191,8 +357,8 @@ describe('instrumentation-kafkajs', () => { const spans = getTestSpans(); assert.strictEqual(spans.length, 2); - assert.strictEqual(spans[0].name, 'topic-name-1'); - assert.strictEqual(spans[1].name, 'topic-name-1'); + assert.strictEqual(spans[0].name, 'send topic-name-1'); + assert.strictEqual(spans[1].name, 'send topic-name-1'); assert.strictEqual(messagesSent.length, 2); expectKafkaHeadersToMatchSpanContext( @@ -203,9 +369,45 @@ describe('instrumentation-kafkajs', () => { messagesSent[1], spans[1] as ReadableSpan ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '0', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + }, + }, + ], + }); }); it('send batch', async () => { + initializeProducer([ + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '18', + timestamp: '123456', + }, + { + topicName: 'topic-name-1', + partition: 0, + errorCode: 123, + offset: '19', + timestamp: '123456', + }, + { + topicName: 'topic-name-2', + partition: 1, + errorCode: 123, + offset: '19', + timestamp: '123456', + }, + ]); await producer.sendBatch({ topicMessages: [ { @@ -223,6 +425,7 @@ describe('instrumentation-kafkajs', () => { topic: 'topic-name-2', messages: [ { + partition: 1, value: 'message2-1', }, ], @@ -232,9 +435,9 @@ describe('instrumentation-kafkajs', () => { const spans = getTestSpans(); assert.strictEqual(spans.length, 3); - assert.strictEqual(spans[0].name, 'topic-name-1'); - assert.strictEqual(spans[1].name, 'topic-name-1'); - assert.strictEqual(spans[2].name, 'topic-name-2'); + assert.strictEqual(spans[0].name, 'send topic-name-1'); + assert.strictEqual(spans[1].name, 'send topic-name-1'); + assert.strictEqual(spans[2].name, 'send topic-name-2'); assert.strictEqual(messagesSent.length, 3); for (let i = 0; i < 3; i++) { @@ -243,6 +446,27 @@ describe('instrumentation-kafkajs', () => { spans[i] as ReadableSpan ); } + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + }, + }, + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-2', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + }, + }, + ], + }); }); }); @@ -255,7 +479,9 @@ describe('instrumentation-kafkajs', () => { }); instrumentation.disable(); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); }); it('error in send create failed span', async () => { @@ -278,6 +504,19 @@ describe('instrumentation-kafkajs', () => { span.status.message, 'error thrown from kafka client send' ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], + }); }); it('error in send with multiple messages create failed spans', async () => { @@ -304,6 +543,19 @@ describe('instrumentation-kafkajs', () => { 'error thrown from kafka client send' ); }); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], + }); }); it('error in sendBatch should set error to all spans', async () => { @@ -342,6 +594,28 @@ describe('instrumentation-kafkajs', () => { 'error thrown from kafka client send' ); }); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_SENT_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-2', + [ATTR_MESSAGING_OPERATION_NAME]: 'send', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], + }); }); }); @@ -360,7 +634,9 @@ describe('instrumentation-kafkajs', () => { instrumentation.disable(); instrumentation.setConfig(config); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); }); it('producer hook add span attribute with value from message', async () => { @@ -395,7 +671,9 @@ describe('instrumentation-kafkajs', () => { instrumentation.disable(); instrumentation.setConfig(config); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); }); it('producer hook add span attribute with value from message', async () => { @@ -417,22 +695,34 @@ describe('instrumentation-kafkajs', () => { }); describe('consumer', () => { - const createKafkaMessage = (offset: string): KafkaMessage => { + interface CreateMessageParams { + offset: string; + key?: string | null; + tombstone?: boolean; + } + + const createKafkaMessage = ({ + offset, + key = 'message-key', + tombstone = false, + }: CreateMessageParams): KafkaMessage => { return { - key: Buffer.from('message-key', 'utf8'), - value: Buffer.from('message content', 'utf8'), + key: typeof key === 'string' ? Buffer.from(key, 'utf8') : key, + value: tombstone ? null : Buffer.from('message content', 'utf8'), timestamp: '1234', size: 10, attributes: 1, - offset: offset, + offset, }; }; - const createEachMessagePayload = (): EachMessagePayload => { + const createEachMessagePayload = ( + params: Partial = {} + ): EachMessagePayload => { return { topic: 'topic-name-1', - partition: 0, - message: createKafkaMessage('123'), + partition: 1, + message: createKafkaMessage({ offset: '123', ...params }), heartbeat: async () => {}, pause: () => () => {}, }; @@ -444,7 +734,10 @@ describe('instrumentation-kafkajs', () => { topic: 'topic-name-1', partition: 1234, highWatermark: '4567', - messages: [createKafkaMessage('124'), createKafkaMessage('125')], + messages: [ + createKafkaMessage({ offset: '124' }), + createKafkaMessage({ offset: '125' }), + ], }, } as EachBatchPayload; }; @@ -468,25 +761,104 @@ describe('instrumentation-kafkajs', () => { _payload: EachMessagePayload ): Promise => {}, }); - const payload: EachMessagePayload = createEachMessagePayload(); + const payload = createEachMessagePayload(); await runConfig?.eachMessage!(payload); const spans = getTestSpans(); assert.strictEqual(spans.length, 1); const span = spans[0]; - assert.strictEqual(span.name, 'topic-name-1'); + assert.strictEqual(span.name, 'process topic-name-1'); assert.strictEqual(span.parentSpanContext?.spanId, undefined); assert.strictEqual(span.kind, SpanKind.CONSUMER); assert.strictEqual(span.status.code, SpanStatusCode.UNSET); - assert.strictEqual(span.attributes[SEMATTRS_MESSAGING_SYSTEM], 'kafka'); + assert.strictEqual(span.attributes[ATTR_MESSAGING_SYSTEM], 'kafka'); assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_DESTINATION], + span.attributes[ATTR_MESSAGING_DESTINATION_NAME], 'topic-name-1' ); assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_OPERATION], + span.attributes[ATTR_MESSAGING_OPERATION_TYPE], 'process' ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_DESTINATION_PARTITION_ID], + '1' + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_KEY], + 'message-key' + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE], + undefined + ); + assert.strictEqual(span.attributes[ATTR_MESSAGING_KAFKA_OFFSET], '123'); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + }, + }, + ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + }, + }, + ], + }); + }); + + it('consume eachMessage tombstone', async () => { + consumer.run({ + eachMessage: async ( + _payload: EachMessagePayload + ): Promise => {}, + }); + const payload = createEachMessagePayload({ tombstone: true }); + await runConfig?.eachMessage!(payload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.name, 'process topic-name-1'); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_KEY], + 'message-key' + ); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE], + true + ); + }); + + it('consume eachMessage with null key', async () => { + consumer.run({ + eachMessage: async ( + _payload: EachMessagePayload + ): Promise => {}, + }); + const payload = createEachMessagePayload({ key: null }); + await runConfig?.eachMessage!(payload); + + const spans = getTestSpans(); + assert.strictEqual(spans.length, 1); + const span = spans[0]; + assert.strictEqual(span.name, 'process topic-name-1'); + assert.strictEqual( + span.attributes[ATTR_MESSAGING_KAFKA_MESSAGE_KEY], + undefined + ); }); it('consumer eachMessage with non promise return value', async () => { @@ -607,6 +979,32 @@ describe('instrumentation-kafkajs', () => { span.status.message, 'error thrown from eachMessage callback' ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: 'Error', + }, + }, + ], + }); }); it('throwing object with no message', async () => { @@ -633,6 +1031,32 @@ describe('instrumentation-kafkajs', () => { const span = spans[0]; assert.strictEqual(span.status.code, SpanStatusCode.ERROR); assert.strictEqual(span.status.message, undefined); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: '_OTHER', + }, + }, + ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: '_OTHER', + }, + }, + ], + }); }); it('throwing non object', async () => { @@ -656,6 +1080,32 @@ describe('instrumentation-kafkajs', () => { const span = spans[0]; assert.strictEqual(span.status.code, SpanStatusCode.ERROR); assert.strictEqual(span.status.message, undefined); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: '_OTHER', + }, + }, + ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 1, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + [ATTR_ERROR_TYPE]: '_OTHER', + }, + }, + ], + }); }); }); @@ -678,32 +1128,40 @@ describe('instrumentation-kafkajs', () => { const spans = getTestSpans(); assert.strictEqual(spans.length, 3); spans.forEach(span => { - assert.strictEqual(span.name, 'topic-name-1'); assert.strictEqual(span.status.code, SpanStatusCode.UNSET); + assert.strictEqual(span.attributes[ATTR_MESSAGING_SYSTEM], 'kafka'); assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_SYSTEM], - 'kafka' - ); - assert.strictEqual( - span.attributes[SEMATTRS_MESSAGING_DESTINATION], + span.attributes[ATTR_MESSAGING_DESTINATION_NAME], 'topic-name-1' ); }); const [recvSpan, msg1Span, msg2Span] = spans; + assert.strictEqual(recvSpan.kind, SpanKind.CLIENT); + assert.strictEqual(recvSpan.name, 'poll topic-name-1'); assert.strictEqual(recvSpan.parentSpanContext?.spanId, undefined); assert.strictEqual( - recvSpan.attributes[SEMATTRS_MESSAGING_OPERATION], + recvSpan.attributes[ATTR_MESSAGING_OPERATION_TYPE], 'receive' ); + assert.strictEqual( + recvSpan.attributes[ATTR_MESSAGING_OPERATION_NAME], + 'poll' + ); + assert.strictEqual(msg1Span.kind, SpanKind.CONSUMER); + assert.strictEqual(msg1Span.name, 'process topic-name-1'); assert.strictEqual( msg1Span.parentSpanContext?.spanId, recvSpan.spanContext().spanId ); assert.strictEqual( - msg1Span.attributes[SEMATTRS_MESSAGING_OPERATION], + msg1Span.attributes[ATTR_MESSAGING_OPERATION_TYPE], + 'process' + ); + assert.strictEqual( + msg1Span.attributes[ATTR_MESSAGING_OPERATION_NAME], 'process' ); @@ -712,9 +1170,37 @@ describe('instrumentation-kafkajs', () => { recvSpan.spanContext().spanId ); assert.strictEqual( - msg2Span.attributes[SEMATTRS_MESSAGING_OPERATION], + msg2Span.attributes[ATTR_MESSAGING_OPERATION_TYPE], 'process' ); + assert.strictEqual( + msg2Span.attributes[ATTR_MESSAGING_OPERATION_NAME], + 'process' + ); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_PROCESS_DURATION]: [ + { + count: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1234', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + }, + }, + ], + [METRIC_MESSAGING_CLIENT_CONSUMED_MESSAGES]: [ + { + value: 2, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_DESTINATION_NAME]: 'topic-name-1', + [ATTR_MESSAGING_DESTINATION_PARTITION_ID]: '1234', + [ATTR_MESSAGING_OPERATION_NAME]: 'process', + }, + }, + ], + }); }); it('consumer eachBatch with non promise return value', async () => { @@ -741,7 +1227,9 @@ describe('instrumentation-kafkajs', () => { storeRunConfig(); instrumentation.disable(); instrumentation.enable(); - producer = kafka.producer(); + producer = kafka.producer({ + createPartitioner: kafkajs.Partitioners.LegacyPartitioner, + }); consumer = kafka.consumer({ groupId: 'testing-group-id' }); }); @@ -868,6 +1356,39 @@ describe('instrumentation-kafkajs', () => { ); }); }); + describe('client duration metric', () => { + it('records the metric', async () => { + instrumentation['_recordClientDurationMetric']({ + payload: { + broker: 'kafka.host:4789', + duration: 250, + apiName: 'some-operation', + apiKey: 123, + apiVersion: 1, + clientId: 'client-id', + correlationId: 456, + createdAt: Date.now(), + pendingDuration: 0, + sentAt: Date.now(), + size: 1024, + }, + }); + assertMetricCollection(await metricReader.collect(), { + [METRIC_MESSAGING_CLIENT_OPERATION_DURATION]: [ + { + count: 1, + buckets: { '0.25': 1 }, + attributes: { + [ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA, + [ATTR_MESSAGING_OPERATION_NAME]: 'some-operation', + [ATTR_SERVER_ADDRESS]: 'kafka.host', + [ATTR_SERVER_PORT]: 4789, + }, + }, + ], + }); + }); + }); describe('bufferTextMapGetter', () => { it('is possible to retrieve keys case insensitively', () => { @@ -881,5 +1402,11 @@ describe('instrumentation-kafkajs', () => { '123' ); }); + it('exposes a keys method', () => { + assert.deepStrictEqual(bufferTextMapGetter.keys({ a: 1, b: 2 }), [ + 'a', + 'b', + ]); + }); }); });