diff --git a/packages/instrumentation-amqplib/README.md b/packages/instrumentation-amqplib/README.md index eb3553ce9d..b5ea1adc00 100644 --- a/packages/instrumentation-amqplib/README.md +++ b/packages/instrumentation-amqplib/README.md @@ -89,24 +89,72 @@ By default, the tests that connect to RabbitMQ are skipped. To make sure these t ## Semantic Conventions -This package uses `@opentelemetry/semantic-conventions` version `1.22+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md) - -Attributes collected: - -| Attribute | Short Description | -| -------------------------------- | ---------------------------------------------------------------------- | -| `messaging.destination` | The message destination name. | -| `messaging.destination_kind` | The kind of message destination. | -| `messaging.rabbitmq.routing_key` | RabbitMQ message routing key. | -| `messaging.operation` | A string identifying the kind of message consumption. | -| `messaging.message_id` | A value used by the messaging system as an identifier for the message. | -| `messaging.conversation_id` | The ID identifying the conversation to which the message belongs. | -| `messaging.protocol` | The name of the transport protocol. | -| `messaging.protocol_version` | The version of the transport protocol. | -| `messaging.system` | A string identifying the messaging system. | -| `messaging.url` | The connection string. | -| `net.peer.name` | Remote hostname or similar. | -| `net.peer.port` | Remote port number. | +This package supports both legacy and future stable OpenTelemetry semantic conventions for messaging systems. The behavior is controlled by the `OTEL_SEMCONV_STABILITY_OPT_IN` environment variable. + +**Note**: The v1.36.0+ conventions are not yet stable but will become stable in the future. This instrumentation is progressively implementing the new attributes and span names in preparation for the transition to stable conventions. + +Configure the instrumentation using one of the following options: + +- **Empty (default)**: Emit only legacy v1.7.0 conventions ([messaging spec](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md)) +- **`messaging`**: Emit only stable v1.36.0+ conventions ([messaging spec](https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/messaging/messaging-spans.md) or [RabbitMQ messaging spec](https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/messaging/rabbitmq.md)) +- **`messaging/dup`**: Emit both legacy and stable conventions simultaneously for migration purposes + +### Attributes Collected + +| v1.7.0 semconv | v1.36.0+ semconv | Description | +| --------------------------------- | --------------------------------------------- | ----------- | +| `messaging.protocol` | `network.protocol.name` | The name of the transport protocol (`AMQP`) | +| `messaging.protocol_version` | `network.protocol.version` | The version of the transport protocol (`0.9.1`) | +| `net.peer.name` | `network.peer.address` | Remote hostname or similar | +| `net.peer.port` | `network.peer.port` | Remote port number | +| - | `server.address` | Server hostname or similar | +| - | `server.port` | Server port number | +| `messaging.system` | `messaging.system` | A string identifying the messaging system (`rabbitmq`) | +| `messaging.url` | Removed | The connection string (with credentials masked) | +| `messaging.destination_kind` | Removed | The kind of message destination (always `topic` for RabbitMQ) | +| - | `messaging.operation.type` | A string identifying the type of operation (`send`, `receive`) | +| `messaging.operation` | `messaging.operation.name` | A string identifying the name of operation (`publish`, `consume`) | +| `messaging.destination` | `messaging.destination.name` | The message destination name (exchange name or destination) | +| `messaging.message_id` | `messaging.message.id` | A value used by the messaging system as an identifier for the message | +| `messaging.conversation_id` | `messaging.message.conversation_id` | The ID identifying the conversation to which the message belongs | +| - | `messaging.message.body.size` | The size of the message body in bytes | +| `messaging.rabbitmq.routing_key` | `messaging.rabbitmq.destination.routing_key` | RabbitMQ message routing key | +| - | `messaging.rabbitmq.message.delivery_tag` | RabbitMQ message delivery tag (consume operations only) | + +### Span Naming Conventions + +The instrumentation generates different span names based on the semantic convention version: + +#### Publish Operations + +- **Legacy**: `publish {exchange}` (or `publish ` for default exchange) +- **Stable**: `publish {destination}` where destination follows the pattern: + - `{exchange}:{routing_key}` when both are present + - `{exchange}` when only exchange is present + - `{routing_key}` when only routing key is present + - `amq.default` when neither is present + +#### Consume Operations + +- **Legacy**: `{queue} process` +- **Stable**: `consume {destination}` where destination follows this priority pattern: + - `{exchange}:{routing_key}:{queue}` when all are present and routing_key ≠ queue + - `{exchange}:{routing_key}` when all are present and routing_key = queue, or when exchange and routing_key are present + - `{exchange}:{queue}` when exchange and queue are present (no routing_key) + - `{routing_key}:{queue}` when routing_key and queue are present (no exchange) + - `{exchange}` when only exchange is present + - `{routing_key}` when only routing_key is present + - `{queue}` when only queue is present + - `amq.default` when none are present + +### Migration Guide + +When upgrading to the new semantic conventions, it is recommended to follow this migration path: + +1. **Upgrade** `@opentelemetry/instrumentation-amqplib` to the latest version +2. **Enable dual mode**: Set `OTEL_SEMCONV_STABILITY_OPT_IN=messaging/dup` to emit both old and new semantic conventions +3. **Update monitoring**: Modify alerts, dashboards, metrics, and other processes to use the new semantic conventions +4. **Switch to stable**: Set `OTEL_SEMCONV_STABILITY_OPT_IN=messaging` to emit only the new semantic conventions ## Useful links diff --git a/packages/instrumentation-amqplib/src/amqplib.ts b/packages/instrumentation-amqplib/src/amqplib.ts index bcdf9c7acb..fb177f4df5 100644 --- a/packages/instrumentation-amqplib/src/amqplib.ts +++ b/packages/instrumentation-amqplib/src/amqplib.ts @@ -36,17 +36,9 @@ import { InstrumentationNodeModuleFile, isWrapped, safeExecuteInTheMiddle, + SemconvStability, + semconvStabilityFromStr, } from '@opentelemetry/instrumentation'; -import { - SEMATTRS_MESSAGING_DESTINATION, - SEMATTRS_MESSAGING_DESTINATION_KIND, - MESSAGINGDESTINATIONKINDVALUES_TOPIC, - SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY, - SEMATTRS_MESSAGING_OPERATION, - MESSAGINGOPERATIONVALUES_PROCESS, - SEMATTRS_MESSAGING_MESSAGE_ID, - SEMATTRS_MESSAGING_CONVERSATION_ID, -} from '@opentelemetry/semantic-conventions'; import type { Connection, ConsumeMessage, @@ -65,13 +57,16 @@ import { CONNECTION_ATTRIBUTES, getConnectionAttributesFromServer, getConnectionAttributesFromUrl, + getConsumeAttributes, + getConsumeSpanName, + getPublishAttributes, + getPublishSpanName, InstrumentationConsumeChannel, InstrumentationMessage, InstrumentationPublishChannel, isConfirmChannelTracing, markConfirmChannelTracing, MESSAGE_STORED_SPAN, - normalizeExchange, unmarkConfirmChannelTracing, } from './utils'; /** @knipignore */ @@ -80,8 +75,14 @@ import { PACKAGE_NAME, PACKAGE_VERSION } from './version'; const supportedVersions = ['>=0.5.5 <1']; export class AmqplibInstrumentation extends InstrumentationBase { + private _semconvStability: SemconvStability = SemconvStability.OLD; + constructor(config: AmqplibInstrumentationConfig = {}) { super(PACKAGE_NAME, PACKAGE_VERSION, { ...DEFAULT_CONFIG, ...config }); + this._semconvStability = semconvStabilityFromStr( + 'messaging', + process.env.OTEL_SEMCONV_STABILITY_OPT_IN + ); } override setConfig(config: AmqplibInstrumentationConfig = {}) { @@ -243,6 +244,7 @@ export class AmqplibInstrumentation extends InstrumentationBase void ) => Connection ) { + const self = this; return function patchedConnect( this: unknown, url: string | Options.Connect, @@ -255,7 +257,10 @@ export class AmqplibInstrumentation extends InstrumentationBase - exchangeName !== '' ? exchangeName : ''; - const censorPassword = (url: string): string => { return url.replace(/:[^:@/]*@/, ':***@'); }; @@ -97,32 +120,13 @@ const getHostname = (hostnameFromUrl: string | undefined): string => { return hostnameFromUrl || 'localhost'; }; -const extractConnectionAttributeOrLog = ( - url: string | amqp.Options.Connect, - attributeKey: string, - attributeValue: AttributeValue, - nameForLog: string -): Attributes => { - if (attributeValue) { - return { [attributeKey]: attributeValue }; - } else { - diag.error( - `amqplib instrumentation: could not extract connection attribute ${nameForLog} from user supplied url`, - { - url, - } - ); - return {}; - } -}; - export const getConnectionAttributesFromServer = ( conn: amqp.Connection['connection'] ): Attributes => { const product = conn.serverProperties.product?.toLowerCase?.(); if (product) { return { - [SEMATTRS_MESSAGING_SYSTEM]: product, + [ATTR_MESSAGING_SYSTEM]: product, }; } else { return {}; @@ -130,83 +134,53 @@ export const getConnectionAttributesFromServer = ( }; export const getConnectionAttributesFromUrl = ( - url: string | amqp.Options.Connect + url: string | amqp.Options.Connect, + semconvStability: SemconvStability ): Attributes => { - const attributes: Attributes = { + const oldAttributes: Attributes = { [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', // this is the only protocol supported by the instrumented library }; + const stableAttributes: Attributes = { + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', // this is the only protocol supported by the instrumented library + }; url = url || 'amqp://localhost'; if (typeof url === 'object') { - const connectOptions = url as amqp.Options.Connect; - - const protocol = getProtocol(connectOptions?.protocol); - Object.assign(attributes, { - ...extractConnectionAttributeOrLog( - url, - SEMATTRS_MESSAGING_PROTOCOL, - protocol, - 'protocol' - ), - }); - - const hostname = getHostname(connectOptions?.hostname); - Object.assign(attributes, { - ...extractConnectionAttributeOrLog( - url, - SEMATTRS_NET_PEER_NAME, - hostname, - 'hostname' - ), - }); - - const port = getPort(connectOptions.port, protocol); - Object.assign(attributes, { - ...extractConnectionAttributeOrLog( - url, - SEMATTRS_NET_PEER_PORT, - port, - 'port' - ), - }); + const protocol = getProtocol(url.protocol); + const hostname = getHostname(url.hostname); + const port = getPort(url.port, protocol); + + oldAttributes[SEMATTRS_MESSAGING_PROTOCOL] = protocol; + oldAttributes[SEMATTRS_NET_PEER_NAME] = hostname; + oldAttributes[SEMATTRS_NET_PEER_PORT] = port; + + stableAttributes[ATTR_NETWORK_PROTOCOL_NAME] = protocol; + stableAttributes[ATTR_NETWORK_PEER_ADDRESS] = hostname; + stableAttributes[ATTR_NETWORK_PEER_PORT] = port; + stableAttributes[ATTR_SERVER_ADDRESS] = hostname; + stableAttributes[ATTR_SERVER_PORT] = port; } else { const censoredUrl = censorPassword(url); - attributes[SEMATTRS_MESSAGING_URL] = censoredUrl; + oldAttributes[SEMATTRS_MESSAGING_URL] = censoredUrl; + try { const urlParts = new URL(censoredUrl); - const protocol = getProtocol(urlParts.protocol); - Object.assign(attributes, { - ...extractConnectionAttributeOrLog( - censoredUrl, - SEMATTRS_MESSAGING_PROTOCOL, - protocol, - 'protocol' - ), - }); - const hostname = getHostname(urlParts.hostname); - Object.assign(attributes, { - ...extractConnectionAttributeOrLog( - censoredUrl, - SEMATTRS_NET_PEER_NAME, - hostname, - 'hostname' - ), - }); - const port = getPort( urlParts.port ? parseInt(urlParts.port) : undefined, protocol ); - Object.assign(attributes, { - ...extractConnectionAttributeOrLog( - censoredUrl, - SEMATTRS_NET_PEER_PORT, - port, - 'port' - ), - }); + + oldAttributes[SEMATTRS_MESSAGING_PROTOCOL] = protocol; + oldAttributes[SEMATTRS_NET_PEER_NAME] = hostname; + oldAttributes[SEMATTRS_NET_PEER_PORT] = port; + + stableAttributes[ATTR_NETWORK_PROTOCOL_NAME] = protocol; + stableAttributes[ATTR_NETWORK_PEER_ADDRESS] = hostname; + stableAttributes[ATTR_NETWORK_PEER_PORT] = port; + stableAttributes[ATTR_SERVER_ADDRESS] = hostname; + stableAttributes[ATTR_SERVER_PORT] = port; } catch (err) { diag.error( 'amqplib instrumentation: error while extracting connection details from connection url', @@ -217,9 +191,144 @@ export const getConnectionAttributesFromUrl = ( ); } } + + let attributes: Attributes = {}; + if (semconvStability & SemconvStability.OLD) { + attributes = oldAttributes; + } + if (semconvStability & SemconvStability.STABLE) { + attributes = { ...attributes, ...stableAttributes }; + } return attributes; }; +export const getPublishSpanName = ( + exchange: string, + routingKey: string, + semconvStability: SemconvStability +): string => { + if (semconvStability & SemconvStability.STABLE) { + return `publish ${getPublishDestinationName(exchange, routingKey)}`; + } + return `publish ${normalizeExchange(exchange)}`; +}; + +export const getPublishAttributes = ( + exchange: string, + routingKey: string, + contentLength: number, + options: amqp.Options.Publish = {}, + semconvStability: SemconvStability +): Attributes => { + const oldAttributes: Attributes = { + [SEMATTRS_MESSAGING_DESTINATION]: exchange, + [SEMATTRS_MESSAGING_DESTINATION_KIND]: MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: routingKey, + [SEMATTRS_MESSAGING_MESSAGE_ID]: options?.messageId, + [SEMATTRS_MESSAGING_CONVERSATION_ID]: options?.correlationId, + }; + const stableAttributes: Attributes = { + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: getPublishDestinationName( + exchange, + routingKey + ), + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: routingKey, + [ATTR_MESSAGING_MESSAGE_ID]: options?.messageId, + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: options?.correlationId, + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: contentLength, + }; + + let attributes: Attributes = {}; + if (semconvStability & SemconvStability.OLD) { + attributes = oldAttributes; + } + if (semconvStability & SemconvStability.STABLE) { + attributes = { ...attributes, ...stableAttributes }; + } + return attributes; +}; + +const getPublishDestinationName = ( + exchange: string, + routingKey: string +): string => { + if (exchange && routingKey) return `${exchange}:${routingKey}`; + if (exchange) return exchange; + if (routingKey) return routingKey; + return 'amq.default'; +}; + +const normalizeExchange = (exchangeName: string) => + exchangeName !== '' ? exchangeName : ''; + +export const getConsumeSpanName = ( + queue: string, + msg: amqp.ConsumeMessage, + semconvStability: SemconvStability +): string => { + if (semconvStability & SemconvStability.STABLE) { + return `consume ${getConsumeDestinationName( + msg.fields?.exchange, + msg.fields?.routingKey, + queue + )}`; + } + return `${queue} process`; +}; + +export const getConsumeAttributes = ( + queue: string, + msg: amqp.ConsumeMessage, + semconvStability: SemconvStability +): Attributes => { + const oldAttributes: Attributes = { + [SEMATTRS_MESSAGING_DESTINATION]: msg.fields?.exchange, + [SEMATTRS_MESSAGING_DESTINATION_KIND]: MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_OPERATION]: MESSAGINGOPERATIONVALUES_PROCESS, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: msg.fields?.routingKey, + [SEMATTRS_MESSAGING_MESSAGE_ID]: msg.properties?.messageId, + [SEMATTRS_MESSAGING_CONVERSATION_ID]: msg.properties?.correlationId, + }; + const stableAttributes: Attributes = { + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: getConsumeDestinationName( + msg.fields?.exchange, + msg.fields?.routingKey, + queue + ), + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: msg.fields?.routingKey, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: msg.fields?.deliveryTag, + [ATTR_MESSAGING_MESSAGE_ID]: msg.properties?.messageId, + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: msg.properties?.correlationId, + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msg.content?.length, + }; + + let attributes: Attributes = {}; + if (semconvStability & SemconvStability.OLD) { + attributes = oldAttributes; + } + if (semconvStability & SemconvStability.STABLE) { + attributes = { ...attributes, ...stableAttributes }; + } + return attributes; +}; + +const getConsumeDestinationName = ( + exchange: string, + routingKey: string, + queue: string +): string => { + const parts: string[] = []; + if (exchange && !parts.includes(exchange)) parts.push(exchange); + if (routingKey && !parts.includes(routingKey)) parts.push(routingKey); + if (queue && !parts.includes(queue)) parts.push(queue); + + return parts.length ? parts.join(':') : 'amq.default'; +}; + export const markConfirmChannelTracing = (context: Context) => { return context.setValue(IS_CONFIRM_CHANNEL_CONTEXT_KEY, true); }; diff --git a/packages/instrumentation-amqplib/test/amqplib-callbacks-stable.test.ts b/packages/instrumentation-amqplib/test/amqplib-callbacks-stable.test.ts new file mode 100644 index 0000000000..814337ae2c --- /dev/null +++ b/packages/instrumentation-amqplib/test/amqplib-callbacks-stable.test.ts @@ -0,0 +1,587 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import 'mocha'; +import { expect } from 'expect'; +import { AmqplibInstrumentation } from '../src'; +import { + getTestSpans, + registerInstrumentationTesting, +} from '@opentelemetry/contrib-test-utils'; + +const instrumentation = registerInstrumentationTesting( + new AmqplibInstrumentation() +); + +import * as amqpCallback from 'amqplib/callback_api'; +import { + ATTR_NETWORK_PEER_ADDRESS, + ATTR_NETWORK_PEER_PORT, + ATTR_NETWORK_PROTOCOL_NAME, + ATTR_NETWORK_PROTOCOL_VERSION, + ATTR_SERVER_ADDRESS, + ATTR_SERVER_PORT, +} from '@opentelemetry/semantic-conventions'; +import { Baggage, context, propagation, SpanKind } from '@opentelemetry/api'; +import { asyncConfirmSend, asyncConsume, shouldTest } from './utils'; +import { rabbitMqUrl, TEST_RABBITMQ_HOST, TEST_RABBITMQ_PORT } from './config'; +import { + CompositePropagator, + W3CBaggagePropagator, + W3CTraceContextPropagator, +} from '@opentelemetry/core'; +import { SemconvStability } from '@opentelemetry/instrumentation'; +import { + ATTR_MESSAGING_SYSTEM, + ATTR_MESSAGING_MESSAGE_BODY_SIZE, + ATTR_MESSAGING_OPERATION_TYPE, + MESSAGING_OPERATION_TYPE_VALUE_SEND, + ATTR_MESSAGING_OPERATION_NAME, + ATTR_MESSAGING_DESTINATION_NAME, + ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY, + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG, +} from '@opentelemetry/semantic-conventions/incubating'; + +const msgPayload = 'payload from test'; +const queueName = 'queue-name-from-unittest'; + +describe('amqplib instrumentation callback model - stable semconv', () => { + let conn: amqpCallback.Connection; + before(() => { + propagation.setGlobalPropagator( + new CompositePropagator({ + propagators: [ + new W3CBaggagePropagator(), + new W3CTraceContextPropagator(), + ], + }) + ); + }); + before(function (done) { + instrumentation['_semconvStability'] = SemconvStability.STABLE; + if (!shouldTest) { + this.skip(); + } else { + amqpCallback.connect(rabbitMqUrl, (err, connection) => { + conn = connection; + done(err); + }); + } + }); + after(done => { + instrumentation['_semconvStability'] = SemconvStability.OLD; + if (!shouldTest) { + done(); + } else { + conn.close(() => done()); + } + }); + + describe('channel', () => { + let channel: amqpCallback.Channel; + beforeEach(done => { + conn.createChannel( + context.bind(context.active(), (err, c) => { + channel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', () => {}); + channel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + channel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + channel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toMatch(`publish ${queueName}`); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toMatch(`consume ${queueName}`); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert context propagation + expect(consumeSpan.spanContext().traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + + it('baggage is available while consuming', done => { + const baggageContext = propagation.setBaggage( + context.active(), + propagation.createBaggage({ + key1: { value: 'value1' }, + }) + ); + context.with(baggageContext, () => { + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + let extractedBaggage: Baggage | undefined; + asyncConsume( + channel, + queueName, + [ + msg => { + extractedBaggage = propagation.getActiveBaggage(); + }, + ], + { + noAck: true, + } + ).then(() => { + expect(extractedBaggage).toBeDefined(); + expect(extractedBaggage!.getEntry('key1')).toBeDefined(); + done(); + }); + }); + }); + + it('end span with ack sync', done => { + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + + asyncConsume(channel, queueName, [msg => channel.ack(msg)]).then(() => { + // assert consumed message span has ended + expect(getTestSpans().length).toBe(2); + done(); + }); + }); + + it('end span with ack async', done => { + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + + asyncConsume(channel, queueName, [ + msg => + setTimeout(() => { + channel.ack(msg); + expect(getTestSpans().length).toBe(2); + done(); + }, 1), + ]); + }); + }); + + describe('confirm channel', () => { + let confirmChannel: amqpCallback.ConfirmChannel; + beforeEach(done => { + conn.createConfirmChannel( + context.bind(context.active(), (err, c) => { + confirmChannel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', () => {}); + confirmChannel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + confirmChannel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + confirmChannel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + asyncConfirmSend(confirmChannel, queueName, msgPayload).then(() => { + asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual(`publish ${queueName}`); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual(`consume ${queueName}`); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert context propagation + expect(consumeSpan.spanContext().traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + }); + + it('end span with ack sync', done => { + asyncConfirmSend(confirmChannel, queueName, msgPayload).then(() => { + asyncConsume(confirmChannel, queueName, [ + msg => confirmChannel.ack(msg), + ]).then(() => { + // assert consumed message span has ended + expect(getTestSpans().length).toBe(2); + done(); + }); + }); + }); + + it('end span with ack async', done => { + asyncConfirmSend(confirmChannel, queueName, msgPayload).then(() => { + asyncConsume(confirmChannel, queueName, [ + msg => + setTimeout(() => { + confirmChannel.ack(msg); + expect(getTestSpans().length).toBe(2); + done(); + }, 1), + ]); + }); + }); + }); + + describe('channel with links config', () => { + let channel: amqpCallback.Channel; + beforeEach(done => { + instrumentation.setConfig({ + useLinksForConsume: true, + }); + conn.createChannel( + context.bind(context.active(), (err, c) => { + channel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', () => {}); + channel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + channel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + channel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual(`publish ${queueName}`); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual(`consume ${queueName}`); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + }); + + describe('confirm channel with links config', () => { + let confirmChannel: amqpCallback.ConfirmChannel; + beforeEach(done => { + instrumentation.setConfig({ + useLinksForConsume: true, + }); + conn.createConfirmChannel( + context.bind(context.active(), (err, c) => { + confirmChannel = c; + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', () => {}); + confirmChannel.assertQueue( + queueName, + { durable: false }, + context.bind(context.active(), (err, ok) => { + confirmChannel.purgeQueue( + queueName, + context.bind(context.active(), (err, ok) => { + done(); + }) + ); + }) + ); + }) + ); + }); + + afterEach(done => { + try { + confirmChannel.close(err => { + done(); + }); + } catch {} + }); + + it('simple publish and consume from queue callback', done => { + asyncConfirmSend(confirmChannel, queueName, msgPayload).then(() => { + asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ).then(() => { + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual(`publish ${queueName}`); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual(`consume ${queueName}`); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + done(); + }); + }); + }); + }); +}); diff --git a/packages/instrumentation-amqplib/test/amqplib-connection-stable.test.ts b/packages/instrumentation-amqplib/test/amqplib-connection-stable.test.ts new file mode 100644 index 0000000000..4abeefa954 --- /dev/null +++ b/packages/instrumentation-amqplib/test/amqplib-connection-stable.test.ts @@ -0,0 +1,195 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import 'mocha'; +import { expect } from 'expect'; +import { shouldTest } from './utils'; +import { + rabbitMqUrl, + TEST_RABBITMQ_HOST, + TEST_RABBITMQ_PASS, + TEST_RABBITMQ_PORT, + TEST_RABBITMQ_USER, +} from './config'; +import { AmqplibInstrumentation } from '../src'; +import { + getTestSpans, + registerInstrumentationTesting, +} from '@opentelemetry/contrib-test-utils'; + +const instrumentation = registerInstrumentationTesting( + new AmqplibInstrumentation() +); +registerInstrumentationTesting(new AmqplibInstrumentation()); + +import * as amqp from 'amqplib'; +import { + ATTR_NETWORK_PEER_ADDRESS, + ATTR_NETWORK_PEER_PORT, + ATTR_NETWORK_PROTOCOL_NAME, + ATTR_NETWORK_PROTOCOL_VERSION, + ATTR_SERVER_ADDRESS, + ATTR_SERVER_PORT, +} from '@opentelemetry/semantic-conventions'; +import { SemconvStability } from '@opentelemetry/instrumentation'; +import { ATTR_MESSAGING_SYSTEM } from '@opentelemetry/semantic-conventions/incubating'; + +describe('amqplib instrumentation connection - stable semconv', () => { + before(function () { + instrumentation['_semconvStability'] = SemconvStability.STABLE; + + if (!shouldTest) { + this.skip(); + } + }); + after(async () => { + instrumentation['_semconvStability'] = SemconvStability.OLD; + }); + + describe('connect with url object', () => { + it('should extract connection attributes form url options', async function () { + const testName = this.test!.title; + const conn = await amqp.connect({ + protocol: 'amqp', + username: TEST_RABBITMQ_USER, + password: TEST_RABBITMQ_PASS, + hostname: TEST_RABBITMQ_HOST, + port: TEST_RABBITMQ_PORT, + }); + + try { + const channel = await conn.createChannel(); + channel.sendToQueue( + testName, + Buffer.from('message created only to test connection attributes') + ); + const [publishSpan] = getTestSpans(); + + expect(publishSpan.attributes).toEqual( + expect.objectContaining({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }) + ); + } finally { + await conn.close(); + } + }); + + it('should use default protocol', async function () { + const testName = this.test!.title; + const conn = await amqp.connect({ + username: TEST_RABBITMQ_USER, + password: TEST_RABBITMQ_PASS, + hostname: TEST_RABBITMQ_HOST, + port: TEST_RABBITMQ_PORT, + }); + + try { + const channel = await conn.createChannel(); + channel.sendToQueue( + testName, + Buffer.from('message created only to test connection attributes') + ); + const [publishSpan] = getTestSpans(); + expect(publishSpan.attributes).toEqual( + expect.objectContaining({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }) + ); + } finally { + await conn.close(); + } + }); + + it('should use default host', async function () { + if (TEST_RABBITMQ_HOST !== 'localhost') { + return; + } + + const testName = this.test!.title; + const conn = await amqp.connect({ + protocol: 'amqp', + username: TEST_RABBITMQ_USER, + password: TEST_RABBITMQ_PASS, + port: TEST_RABBITMQ_PORT, + }); + + try { + const channel = await conn.createChannel(); + channel.sendToQueue( + testName, + Buffer.from('message created only to test connection attributes') + ); + const [publishSpan] = getTestSpans(); + expect(publishSpan.attributes).toEqual( + expect.objectContaining({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }) + ); + } finally { + await conn.close(); + } + }); + }); + + describe('connect with url string', () => { + it('should extract connection attributes from url options', async function () { + const testName = this.test!.title; + const conn = await amqp.connect(rabbitMqUrl); + + try { + const msgPayload = Buffer.from( + 'message created only to test connection attributes' + ); + + const channel = await conn.createChannel(); + channel.sendToQueue(testName, msgPayload); + const [publishSpan] = getTestSpans(); + + expect(publishSpan.attributes).toEqual( + expect.objectContaining({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }) + ); + } finally { + await conn.close(); + } + }); + }); +}); diff --git a/packages/instrumentation-amqplib/test/amqplib-promise-stable.test.ts b/packages/instrumentation-amqplib/test/amqplib-promise-stable.test.ts new file mode 100644 index 0000000000..db57761942 --- /dev/null +++ b/packages/instrumentation-amqplib/test/amqplib-promise-stable.test.ts @@ -0,0 +1,1571 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import 'mocha'; +import { expect } from 'expect'; +import * as sinon from 'sinon'; +import * as lodash from 'lodash'; +import { + AmqplibInstrumentation, + ConsumeEndInfo, + ConsumeInfo, + EndOperation, + PublishInfo, +} from '../src'; +import { + getTestSpans, + registerInstrumentationTesting, +} from '@opentelemetry/contrib-test-utils'; + +const instrumentation = registerInstrumentationTesting( + new AmqplibInstrumentation() +); + +import * as amqp from 'amqplib'; +import { ConsumeMessage } from 'amqplib'; +import { + ATTR_MESSAGING_DESTINATION_NAME, + ATTR_MESSAGING_MESSAGE_BODY_SIZE, + ATTR_MESSAGING_OPERATION_NAME, + ATTR_MESSAGING_OPERATION_TYPE, + ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY, + ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG, + ATTR_MESSAGING_SYSTEM, + ATTR_NETWORK_PEER_ADDRESS, + ATTR_NETWORK_PEER_PORT, + ATTR_NETWORK_PROTOCOL_NAME, + ATTR_NETWORK_PROTOCOL_VERSION, + ATTR_SERVER_ADDRESS, + ATTR_SERVER_PORT, + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + MESSAGING_OPERATION_TYPE_VALUE_SEND, +} from '@opentelemetry/semantic-conventions/incubating'; +import { Span, SpanKind, SpanStatusCode } from '@opentelemetry/api'; +import { asyncConfirmPublish, asyncConfirmSend, asyncConsume } from './utils'; +import { shouldTest } from './utils'; +import { rabbitMqUrl, TEST_RABBITMQ_HOST, TEST_RABBITMQ_PORT } from './config'; +import { SinonSpy } from 'sinon'; +import { SemconvStability } from '@opentelemetry/instrumentation'; + +const msgPayload = 'payload from test'; +const queueName = 'queue-name-from-unittest'; + +// signal that the channel is closed in test, thus it should not be closed again in afterEach. +// could not find a way to get this from amqplib directly. +const CHANNEL_CLOSED_IN_TEST = Symbol( + 'opentelemetry.amqplib.unittest.channel_closed_in_test' +); + +describe('amqplib instrumentation promise model - stable semconv', () => { + let conn: amqp.Connection; + before(async function () { + instrumentation['_semconvStability'] = SemconvStability.STABLE; + + if (!shouldTest) { + this.skip(); + } else { + conn = await amqp.connect(rabbitMqUrl); + } + }); + after(async () => { + instrumentation['_semconvStability'] = SemconvStability.OLD; + + if (shouldTest) { + await conn.close(); + } + }); + + let endHookSpy: SinonSpy; + const expectConsumeEndSpyStatus = ( + expectedEndOperations: EndOperation[] + ): void => { + expect(endHookSpy.callCount).toBe(expectedEndOperations.length); + expectedEndOperations.forEach( + (endOperation: EndOperation, index: number) => { + expect(endHookSpy.args[index][1].endOperation).toEqual(endOperation); + switch (endOperation) { + case EndOperation.AutoAck: + case EndOperation.Ack: + case EndOperation.AckAll: + expect(endHookSpy.args[index][1].rejected).toBeFalsy(); + break; + + case EndOperation.Reject: + case EndOperation.Nack: + case EndOperation.NackAll: + case EndOperation.ChannelClosed: + case EndOperation.ChannelError: + expect(endHookSpy.args[index][1].rejected).toBeTruthy(); + break; + } + } + ); + }; + + describe('channel', () => { + let channel: amqp.Channel & { [CHANNEL_CLOSED_IN_TEST]?: boolean }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + }); + + channel = await conn.createChannel(); + await channel.assertQueue(queueName, { durable: false }); + await channel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!channel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + channel.on('close', resolve); + channel.close(); + }); + } catch {} + } + }); + + it('simple publish and consume from queue', async () => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + await asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual(`publish ${queueName}`); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, // for default exchange, destination name is the routing key (queue name) + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual(`consume ${queueName}`); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert context propagation + expect(consumeSpan.spanContext().traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('ending consume spans', () => { + it('message acked sync', async () => { + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [msg => channel.ack(msg)]); + // assert consumed message span has ended + expect(getTestSpans().length).toBe(2); + expectConsumeEndSpyStatus([EndOperation.Ack]); + }); + + it('message acked async', async () => { + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + + // start async timer and ack the message after the callback returns + await new Promise(resolve => { + asyncConsume(channel, queueName, [ + msg => + setTimeout(() => { + channel.ack(msg); + resolve(); + }, 1), + ]); + }); + // assert consumed message span has ended + expect(getTestSpans().length).toBe(2); + expectConsumeEndSpyStatus([EndOperation.Ack]); + }); + + it('message nack no requeue', async () => { + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [ + msg => channel.nack(msg, false, false), + ]); + await new Promise(resolve => setTimeout(resolve, 20)); // just make sure we don't get it again + // assert consumed message span has ended + expect(getTestSpans().length).toBe(2); + const [_, consumerSpan] = getTestSpans(); + expect(consumerSpan.status.code).toEqual(SpanStatusCode.ERROR); + expect(consumerSpan.status.message).toEqual( + 'nack called on message without requeue' + ); + expectConsumeEndSpyStatus([EndOperation.Nack]); + }); + + it('message nack requeue, then acked', async () => { + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [ + (msg: amqp.Message) => channel.nack(msg, false, true), + (msg: amqp.Message) => channel.ack(msg), + ]); + // assert we have the requeued message sent again + expect(getTestSpans().length).toBe(3); + const [_, rejectedConsumerSpan, successConsumerSpan] = getTestSpans(); + expect(rejectedConsumerSpan.status.code).toEqual(SpanStatusCode.ERROR); + expect(rejectedConsumerSpan.status.message).toEqual( + 'nack called on message with requeue' + ); + expect(successConsumerSpan.status.code).toEqual(SpanStatusCode.UNSET); + expectConsumeEndSpyStatus([EndOperation.Nack, EndOperation.Ack]); + }); + + it('ack allUpTo 2 msgs sync', async () => { + lodash.times(3, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + await asyncConsume(channel, queueName, [ + null, + msg => channel.ack(msg, true), + msg => channel.ack(msg), + ]); + // assert all 3 messages are acked, including the first one which is acked by allUpTo + expect(getTestSpans().length).toBe(6); + expectConsumeEndSpyStatus([ + EndOperation.Ack, + EndOperation.Ack, + EndOperation.Ack, + ]); + }); + + it('nack allUpTo 2 msgs sync', async () => { + lodash.times(3, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + await asyncConsume(channel, queueName, [ + null, + msg => channel.nack(msg, true, false), + msg => channel.nack(msg, false, false), + ]); + // assert all 3 messages are acked, including the first one which is acked by allUpTo + expect(getTestSpans().length).toBe(6); + lodash.range(3, 6).forEach(i => { + expect(getTestSpans()[i].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[i].status.message).toEqual( + 'nack called on message without requeue' + ); + }); + expectConsumeEndSpyStatus([ + EndOperation.Nack, + EndOperation.Nack, + EndOperation.Nack, + ]); + }); + + it('ack not in received order', async () => { + lodash.times(3, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + const msgs = await asyncConsume(channel, queueName, [null, null, null]); + channel.ack(msgs[1]); + channel.ack(msgs[2]); + channel.ack(msgs[0]); + // assert all 3 span messages are ended + expect(getTestSpans().length).toBe(6); + expectConsumeEndSpyStatus([ + EndOperation.Ack, + EndOperation.Ack, + EndOperation.Ack, + ]); + }); + + it('ackAll', async () => { + lodash.times(2, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + await asyncConsume(channel, queueName, [null, () => channel.ackAll()]); + // assert all 2 span messages are ended by call to ackAll + expect(getTestSpans().length).toBe(4); + expectConsumeEndSpyStatus([EndOperation.AckAll, EndOperation.AckAll]); + }); + + it('nackAll', async () => { + lodash.times(2, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + await asyncConsume(channel, queueName, [ + null, + () => channel.nackAll(false), + ]); + // assert all 2 span messages are ended by calling nackAll + expect(getTestSpans().length).toBe(4); + lodash.range(2, 4).forEach(i => { + expect(getTestSpans()[i].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[i].status.message).toEqual( + 'nackAll called on message without requeue' + ); + }); + expectConsumeEndSpyStatus([EndOperation.NackAll, EndOperation.NackAll]); + }); + + it('reject', async () => { + lodash.times(1, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + await asyncConsume(channel, queueName, [ + msg => channel.reject(msg, false), + ]); + expect(getTestSpans().length).toBe(2); + expect(getTestSpans()[1].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[1].status.message).toEqual( + 'reject called on message without requeue' + ); + expectConsumeEndSpyStatus([EndOperation.Reject]); + }); + + it('reject with requeue', async () => { + lodash.times(1, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + await asyncConsume(channel, queueName, [ + msg => channel.reject(msg, true), + msg => channel.reject(msg, false), + ]); + expect(getTestSpans().length).toBe(3); + expect(getTestSpans()[1].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[1].status.message).toEqual( + 'reject called on message with requeue' + ); + expect(getTestSpans()[2].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[2].status.message).toEqual( + 'reject called on message without requeue' + ); + expectConsumeEndSpyStatus([EndOperation.Reject, EndOperation.Reject]); + }); + + it('closing channel should end all open spans on it', async () => { + lodash.times(1, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + await new Promise(resolve => + asyncConsume(channel, queueName, [ + async msg => { + await channel.close(); + resolve(); + channel[CHANNEL_CLOSED_IN_TEST] = true; + }, + ]) + ); + + expect(getTestSpans().length).toBe(2); + expect(getTestSpans()[1].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[1].status.message).toEqual('channel closed'); + expectConsumeEndSpyStatus([EndOperation.ChannelClosed]); + }); + + it('error on channel should end all open spans on it', done => { + lodash.times(2, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + channel.on('close', () => { + expect(getTestSpans().length).toBe(4); + // second consume ended with valid ack, previous message not acked when channel is errored. + // since we first ack the second message, it appear first in the finished spans array + expect(getTestSpans()[2].status.code).toEqual(SpanStatusCode.UNSET); + expect(getTestSpans()[3].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[3].status.message).toEqual('channel error'); + expectConsumeEndSpyStatus([ + EndOperation.Ack, + EndOperation.ChannelError, + ]); + done(); + }); + asyncConsume(channel, queueName, [ + null, + msg => { + try { + channel.ack(msg); + channel[CHANNEL_CLOSED_IN_TEST] = true; + // ack the same msg again, this is not valid and should close the channel + channel.ack(msg); + } catch {} + }, + ]); + }); + + it('not acking the message trigger timeout', async () => { + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + consumeTimeoutMs: 1, + }); + + lodash.times(1, () => + channel.sendToQueue(queueName, Buffer.from(msgPayload)) + ); + + await asyncConsume(channel, queueName, [null]); + + // we have timeout of 1 ms, so we wait more than that and check span indeed ended + await new Promise(resolve => setTimeout(resolve, 10)); + + expect(getTestSpans().length).toBe(2); + expectConsumeEndSpyStatus([EndOperation.InstrumentationTimeout]); + }); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await channel.assertExchange(exchangeName, 'topic', { durable: false }); + + const { queue: queueName } = await channel.assertQueue('', { + durable: false, + }); + await channel.bindQueue(queueName, exchangeName, '#'); + + channel.publish(exchangeName, routingKey, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual( + `publish ${exchangeName}:${routingKey}` + ); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: `${exchangeName}:${routingKey}`, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: routingKey, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual( + `consume ${exchangeName}:${routingKey}:${queueName}` + ); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_DESTINATION_NAME]: `${exchangeName}:${routingKey}:${queueName}`, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: routingKey, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert context propagation + expect(consumeSpan.spanContext().traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + }); + + describe('hooks', () => { + it('publish and consume hooks success', async () => { + const attributeNameFromHook = 'attribute.name.from.hook'; + const hookAttributeValue = 'attribute value from hook'; + const attributeNameFromEndHook = 'attribute.name.from.endhook'; + const endHookAttributeValue = 'attribute value from end hook'; + instrumentation.setConfig({ + publishHook: (span: Span, publishParams: PublishInfo): void => { + expect(publishParams.exchange).toEqual(''); + expect(publishParams.routingKey).toEqual(queueName); + expect(publishParams.content.toString()).toEqual(msgPayload); + span.setAttribute(attributeNameFromHook, hookAttributeValue); + }, + consumeHook: (span: Span, consumeInfo: ConsumeInfo): void => { + expect(consumeInfo.msg!.content.toString()).toEqual(msgPayload); + span.setAttribute(attributeNameFromHook, hookAttributeValue); + }, + consumeEndHook: ( + span: Span, + consumeEndInfo: ConsumeEndInfo + ): void => { + expect(consumeEndInfo.endOperation).toEqual(EndOperation.AutoAck); + span.setAttribute(attributeNameFromEndHook, endHookAttributeValue); + }, + }); + + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [null], { + noAck: true, + }); + expect(getTestSpans().length).toBe(2); + expect(getTestSpans()[0].attributes[attributeNameFromHook]).toEqual( + hookAttributeValue + ); + expect(getTestSpans()[1].attributes[attributeNameFromHook]).toEqual( + hookAttributeValue + ); + expect(getTestSpans()[1].attributes[attributeNameFromEndHook]).toEqual( + endHookAttributeValue + ); + }); + + it('hooks throw should not affect user flow or span creation', async () => { + const attributeNameFromHook = 'attribute.name.from.hook'; + const hookAttributeValue = 'attribute value from hook'; + instrumentation.setConfig({ + publishHook: (span: Span, publishParams: PublishInfo): void => { + span.setAttribute(attributeNameFromHook, hookAttributeValue); + throw new Error('error from hook'); + }, + consumeHook: (span: Span, consumeInfo: ConsumeInfo): void => { + span.setAttribute(attributeNameFromHook, hookAttributeValue); + throw new Error('error from hook'); + }, + }); + + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [null], { + noAck: true, + }); + expect(getTestSpans().length).toBe(2); + getTestSpans().forEach(s => + expect(s.attributes[attributeNameFromHook]).toEqual( + hookAttributeValue + ) + ); + }); + }); + + describe('delete queue', () => { + it('consumer receives null msg when a queue is deleted in broker', async () => { + const queueNameForDeletion = 'queue-to-be-deleted'; + await channel.assertQueue(queueNameForDeletion, { durable: false }); + await channel.purgeQueue(queueNameForDeletion); + + await channel.consume( + queueNameForDeletion, + (msg: ConsumeMessage | null) => {}, + { noAck: true } + ); + await channel.deleteQueue(queueNameForDeletion); + }); + }); + }); + + describe('confirm channel', () => { + let confirmChannel: amqp.ConfirmChannel & { + [CHANNEL_CLOSED_IN_TEST]?: boolean; + }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + }); + + confirmChannel = await conn.createConfirmChannel(); + await confirmChannel.assertQueue(queueName, { durable: false }); + await confirmChannel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!confirmChannel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + confirmChannel.on('close', resolve); + confirmChannel.close(); + }); + } catch {} + } + }); + + it('simple publish with confirm and consume from queue', async () => { + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual(`publish ${queueName}`); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual(`consume ${queueName}`); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert context propagation + expect(consumeSpan.spanContext().traceId).toEqual( + publishSpan.spanContext().traceId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + it('confirm throw should not affect span end', async () => { + const confirmUserError = new Error('callback error'); + await asyncConfirmSend(confirmChannel, queueName, msgPayload, () => { + throw confirmUserError; + }).catch(reject => expect(reject).toEqual(confirmUserError)); + + await asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + + expect(getTestSpans()).toHaveLength(2); + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('ending consume spans', () => { + it('message acked sync', async () => { + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume(confirmChannel, queueName, [ + msg => confirmChannel.ack(msg), + ]); + // assert consumed message span has ended + expect(getTestSpans().length).toBe(2); + expectConsumeEndSpyStatus([EndOperation.Ack]); + }); + + it('message acked async', async () => { + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + // start async timer and ack the message after the callback returns + await new Promise(resolve => { + asyncConsume(confirmChannel, queueName, [ + msg => + setTimeout(() => { + confirmChannel.ack(msg); + resolve(); + }, 1), + ]); + }); + // assert consumed message span has ended + expect(getTestSpans().length).toBe(2); + expectConsumeEndSpyStatus([EndOperation.Ack]); + }); + + it('message nack no requeue', async () => { + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume(confirmChannel, queueName, [ + msg => confirmChannel.nack(msg, false, false), + ]); + await new Promise(resolve => setTimeout(resolve, 20)); // just make sure we don't get it again + // assert consumed message span has ended + expect(getTestSpans().length).toBe(2); + const [_, consumerSpan] = getTestSpans(); + expect(consumerSpan.status.code).toEqual(SpanStatusCode.ERROR); + expect(consumerSpan.status.message).toEqual( + 'nack called on message without requeue' + ); + expectConsumeEndSpyStatus([EndOperation.Nack]); + }); + + it('message nack requeue, then acked', async () => { + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume(confirmChannel, queueName, [ + (msg: amqp.Message) => confirmChannel.nack(msg, false, true), + (msg: amqp.Message) => confirmChannel.ack(msg), + ]); + // assert we have the requeued message sent again + expect(getTestSpans().length).toBe(3); + const [_, rejectedConsumerSpan, successConsumerSpan] = getTestSpans(); + expect(rejectedConsumerSpan.status.code).toEqual(SpanStatusCode.ERROR); + expect(rejectedConsumerSpan.status.message).toEqual( + 'nack called on message with requeue' + ); + expect(successConsumerSpan.status.code).toEqual(SpanStatusCode.UNSET); + expectConsumeEndSpyStatus([EndOperation.Nack, EndOperation.Ack]); + }); + + it('ack allUpTo 2 msgs sync', async () => { + await Promise.all( + lodash.times(3, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ); + + await asyncConsume(confirmChannel, queueName, [ + null, + msg => confirmChannel.ack(msg, true), + msg => confirmChannel.ack(msg), + ]); + // assert all 3 messages are acked, including the first one which is acked by allUpTo + expect(getTestSpans().length).toBe(6); + expectConsumeEndSpyStatus([ + EndOperation.Ack, + EndOperation.Ack, + EndOperation.Ack, + ]); + }); + + it('nack allUpTo 2 msgs sync', async () => { + await Promise.all( + lodash.times(3, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ); + + await asyncConsume(confirmChannel, queueName, [ + null, + msg => confirmChannel.nack(msg, true, false), + msg => confirmChannel.nack(msg, false, false), + ]); + // assert all 3 messages are acked, including the first one which is acked by allUpTo + expect(getTestSpans().length).toBe(6); + lodash.range(3, 6).forEach(i => { + expect(getTestSpans()[i].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[i].status.message).toEqual( + 'nack called on message without requeue' + ); + }); + expectConsumeEndSpyStatus([ + EndOperation.Nack, + EndOperation.Nack, + EndOperation.Nack, + ]); + }); + + it('ack not in received order', async () => { + await Promise.all( + lodash.times(3, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ); + + const msgs = await asyncConsume(confirmChannel, queueName, [ + null, + null, + null, + ]); + confirmChannel.ack(msgs[1]); + confirmChannel.ack(msgs[2]); + confirmChannel.ack(msgs[0]); + // assert all 3 span messages are ended + expect(getTestSpans().length).toBe(6); + expectConsumeEndSpyStatus([ + EndOperation.Ack, + EndOperation.Ack, + EndOperation.Ack, + ]); + }); + + it('ackAll', async () => { + await Promise.all( + lodash.times(2, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ); + + await asyncConsume(confirmChannel, queueName, [ + null, + () => confirmChannel.ackAll(), + ]); + // assert all 2 span messages are ended by call to ackAll + expect(getTestSpans().length).toBe(4); + expectConsumeEndSpyStatus([EndOperation.AckAll, EndOperation.AckAll]); + }); + + it('nackAll', async () => { + await Promise.all( + lodash.times(2, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ); + + await asyncConsume(confirmChannel, queueName, [ + null, + () => confirmChannel.nackAll(false), + ]); + // assert all 2 span messages are ended by calling nackAll + expect(getTestSpans().length).toBe(4); + lodash.range(2, 4).forEach(i => { + expect(getTestSpans()[i].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[i].status.message).toEqual( + 'nackAll called on message without requeue' + ); + }); + expectConsumeEndSpyStatus([EndOperation.NackAll, EndOperation.NackAll]); + }); + + it('reject', async () => { + await Promise.all( + lodash.times(1, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ); + + await asyncConsume(confirmChannel, queueName, [ + msg => confirmChannel.reject(msg, false), + ]); + expect(getTestSpans().length).toBe(2); + expect(getTestSpans()[1].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[1].status.message).toEqual( + 'reject called on message without requeue' + ); + expectConsumeEndSpyStatus([EndOperation.Reject]); + }); + + it('reject with requeue', async () => { + await Promise.all( + lodash.times(1, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ); + + await asyncConsume(confirmChannel, queueName, [ + msg => confirmChannel.reject(msg, true), + msg => confirmChannel.reject(msg, false), + ]); + expect(getTestSpans().length).toBe(3); + expect(getTestSpans()[1].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[1].status.message).toEqual( + 'reject called on message with requeue' + ); + expect(getTestSpans()[2].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[2].status.message).toEqual( + 'reject called on message without requeue' + ); + expectConsumeEndSpyStatus([EndOperation.Reject, EndOperation.Reject]); + }); + + it('closing channel should end all open spans on it', async () => { + await Promise.all( + lodash.times(1, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ); + + await new Promise(resolve => + asyncConsume(confirmChannel, queueName, [ + async msg => { + await confirmChannel.close(); + resolve(); + confirmChannel[CHANNEL_CLOSED_IN_TEST] = true; + }, + ]) + ); + + expect(getTestSpans().length).toBe(2); + expect(getTestSpans()[1].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[1].status.message).toEqual('channel closed'); + expectConsumeEndSpyStatus([EndOperation.ChannelClosed]); + }); + + it('error on channel should end all open spans on it', done => { + Promise.all( + lodash.times(2, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ).then(() => { + confirmChannel.on('close', () => { + expect(getTestSpans().length).toBe(4); + // second consume ended with valid ack, previous message not acked when channel is errored. + // since we first ack the second message, it appear first in the finished spans array + expect(getTestSpans()[2].status.code).toEqual(SpanStatusCode.UNSET); + expect(getTestSpans()[3].status.code).toEqual(SpanStatusCode.ERROR); + expect(getTestSpans()[3].status.message).toEqual('channel error'); + expectConsumeEndSpyStatus([ + EndOperation.Ack, + EndOperation.ChannelError, + ]); + done(); + }); + asyncConsume(confirmChannel, queueName, [ + null, + msg => { + try { + confirmChannel.ack(msg); + confirmChannel[CHANNEL_CLOSED_IN_TEST] = true; + // ack the same msg again, this is not valid and should close the channel + confirmChannel.ack(msg); + } catch {} + }, + ]); + }); + }); + + it('not acking the message trigger timeout', async () => { + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + consumeTimeoutMs: 1, + }); + + await Promise.all( + lodash.times(1, () => + asyncConfirmSend(confirmChannel, queueName, msgPayload) + ) + ); + + await asyncConsume(confirmChannel, queueName, [null]); + + // we have timeout of 1 ms, so we wait more than that and check span indeed ended + await new Promise(resolve => setTimeout(resolve, 10)); + + expect(getTestSpans().length).toBe(2); + expectConsumeEndSpyStatus([EndOperation.InstrumentationTimeout]); + }); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await confirmChannel.assertExchange(exchangeName, 'topic', { + durable: false, + }); + + const { queue: queueName } = await confirmChannel.assertQueue('', { + durable: false, + }); + await confirmChannel.bindQueue(queueName, exchangeName, '#'); + + await asyncConfirmPublish( + confirmChannel, + exchangeName, + routingKey, + msgPayload + ); + + await asyncConsume(confirmChannel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual( + `publish ${exchangeName}:${routingKey}` + ); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: `${exchangeName}:${routingKey}`, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: routingKey, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual( + `consume ${exchangeName}:${routingKey}:${queueName}` + ); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: `${exchangeName}:${routingKey}:${queueName}`, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: routingKey, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert context propagation + expect(consumeSpan.spanContext().traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + + describe('hooks', () => { + it('publish and consume hooks success', async () => { + const attributeNameFromHook = 'attribute.name.from.hook'; + const hookAttributeValue = 'attribute value from hook'; + const attributeNameFromConfirmEndHook = + 'attribute.name.from.confirm.endhook'; + const confirmEndHookAttributeValue = + 'attribute value from confirm end hook'; + const attributeNameFromConsumeEndHook = + 'attribute.name.from.consume.endhook'; + const consumeEndHookAttributeValue = + 'attribute value from consume end hook'; + instrumentation.setConfig({ + publishHook: (span: Span, publishParams: PublishInfo) => { + expect(publishParams.exchange).toEqual(''); + expect(publishParams.routingKey).toEqual(queueName); + expect(publishParams.content.toString()).toEqual(msgPayload); + expect(publishParams.isConfirmChannel).toBe(true); + span.setAttribute(attributeNameFromHook, hookAttributeValue); + }, + publishConfirmHook: (span, publishParams) => { + expect(publishParams.exchange).toEqual(''); + expect(publishParams.routingKey).toEqual(queueName); + expect(publishParams.content.toString()).toEqual(msgPayload); + span.setAttribute( + attributeNameFromConfirmEndHook, + confirmEndHookAttributeValue + ); + }, + consumeHook: (span: Span, consumeInfo: ConsumeInfo) => { + expect(consumeInfo.msg!.content.toString()).toEqual(msgPayload); + span.setAttribute(attributeNameFromHook, hookAttributeValue); + }, + consumeEndHook: ( + span: Span, + consumeEndInfo: ConsumeEndInfo + ): void => { + span.setAttribute( + attributeNameFromConsumeEndHook, + consumeEndHookAttributeValue + ); + expect(consumeEndInfo.endOperation).toEqual(EndOperation.AutoAck); + }, + }); + + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume(confirmChannel, queueName, [null], { + noAck: true, + }); + expect(getTestSpans().length).toBe(2); + expect(getTestSpans()[0].attributes[attributeNameFromHook]).toEqual( + hookAttributeValue + ); + expect( + getTestSpans()[0].attributes[attributeNameFromConfirmEndHook] + ).toEqual(confirmEndHookAttributeValue); + expect(getTestSpans()[1].attributes[attributeNameFromHook]).toEqual( + hookAttributeValue + ); + expect( + getTestSpans()[1].attributes[attributeNameFromConsumeEndHook] + ).toEqual(consumeEndHookAttributeValue); + }); + + it('hooks throw should not affect user flow or span creation', async () => { + const attributeNameFromHook = 'attribute.name.from.hook'; + const hookAttributeValue = 'attribute value from hook'; + instrumentation.setConfig({ + publishHook: (span: Span, publishParams: PublishInfo): void => { + span.setAttribute(attributeNameFromHook, hookAttributeValue); + throw new Error('error from hook'); + }, + publishConfirmHook: ( + span: Span, + publishParams: PublishInfo + ): void => { + span.setAttribute(attributeNameFromHook, hookAttributeValue); + throw new Error('error from hook'); + }, + consumeHook: (span: Span, consumeInfo: ConsumeInfo): void => { + span.setAttribute(attributeNameFromHook, hookAttributeValue); + throw new Error('error from hook'); + }, + }); + + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume(confirmChannel, queueName, [null], { + noAck: true, + }); + expect(getTestSpans().length).toBe(2); + getTestSpans().forEach(s => + expect(s.attributes[attributeNameFromHook]).toEqual( + hookAttributeValue + ) + ); + }); + }); + }); + }); + + describe('channel using links config', () => { + let channel: amqp.Channel & { [CHANNEL_CLOSED_IN_TEST]?: boolean }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + useLinksForConsume: true, + }); + + channel = await conn.createChannel(); + await channel.assertQueue(queueName, { durable: false }); + await channel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + channel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!channel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + channel.on('close', resolve); + channel.close(); + }); + } catch {} + } + }); + + it('simple publish and consume from queue', async () => { + const hadSpaceInBuffer = channel.sendToQueue( + queueName, + Buffer.from(msgPayload) + ); + expect(hadSpaceInBuffer).toBeTruthy(); + + await asyncConsume( + channel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual(`publish ${queueName}`); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual(`consume ${queueName}`); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await channel.assertExchange(exchangeName, 'topic', { + durable: false, + }); + + const { queue: queueName } = await channel.assertQueue('', { + durable: false, + }); + await channel.bindQueue(queueName, exchangeName, '#'); + + channel.publish(exchangeName, routingKey, Buffer.from(msgPayload)); + + await asyncConsume(channel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual( + `publish ${exchangeName}:${routingKey}` + ); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: `${exchangeName}:${routingKey}`, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: routingKey, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual( + `consume ${exchangeName}:${routingKey}:${queueName}` + ); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: `${exchangeName}:${routingKey}:${queueName}`, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: routingKey, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + }); + }); + + describe('confirm channel links config', () => { + let confirmChannel: amqp.ConfirmChannel & { + [CHANNEL_CLOSED_IN_TEST]?: boolean; + }; + beforeEach(async () => { + endHookSpy = sinon.spy(); + instrumentation.setConfig({ + consumeEndHook: endHookSpy, + useLinksForConsume: true, + }); + + confirmChannel = await conn.createConfirmChannel(); + await confirmChannel.assertQueue(queueName, { durable: false }); + await confirmChannel.purgeQueue(queueName); + // install an error handler, otherwise when we have tests that create error on the channel, + // it throws and crash process + confirmChannel.on('error', (err: Error) => {}); + }); + afterEach(async () => { + if (!confirmChannel[CHANNEL_CLOSED_IN_TEST]) { + try { + await new Promise(resolve => { + confirmChannel.on('close', resolve); + confirmChannel.close(); + }); + } catch {} + } + }); + + it('simple publish with confirm and consume from queue', async () => { + await asyncConfirmSend(confirmChannel, queueName, msgPayload); + + await asyncConsume( + confirmChannel, + queueName, + [msg => expect(msg.content.toString()).toEqual(msgPayload)], + { + noAck: true, + } + ); + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual(`publish ${queueName}`); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual(`consume ${queueName}`); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: queueName, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: queueName, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + + expectConsumeEndSpyStatus([EndOperation.AutoAck]); + }); + + describe('routing and exchange', () => { + it('topic exchange', async () => { + const exchangeName = 'topic exchange'; + const routingKey = 'topic.name.from.unittest'; + await confirmChannel.assertExchange(exchangeName, 'topic', { + durable: false, + }); + + const { queue: queueName } = await confirmChannel.assertQueue('', { + durable: false, + }); + await confirmChannel.bindQueue(queueName, exchangeName, '#'); + + await asyncConfirmPublish( + confirmChannel, + exchangeName, + routingKey, + msgPayload + ); + + await asyncConsume(confirmChannel, queueName, [null], { + noAck: true, + }); + + const [publishSpan, consumeSpan] = getTestSpans(); + + // assert publish span + expect(publishSpan.kind).toEqual(SpanKind.PRODUCER); + expect(publishSpan.name).toEqual( + `publish ${exchangeName}:${routingKey}` + ); + expect(publishSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND, + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: `${exchangeName}:${routingKey}`, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: routingKey, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // assert consume span + expect(consumeSpan.kind).toEqual(SpanKind.CONSUMER); + expect(consumeSpan.name).toEqual( + `consume ${exchangeName}:${routingKey}:${queueName}` + ); + expect(consumeSpan.attributes).toEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: msgPayload.length, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: `${exchangeName}:${routingKey}:${queueName}`, + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: routingKey, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_NETWORK_PEER_PORT]: TEST_RABBITMQ_PORT, + [ATTR_SERVER_ADDRESS]: TEST_RABBITMQ_HOST, + [ATTR_SERVER_PORT]: TEST_RABBITMQ_PORT, + }); + + // new trace should be created + expect(consumeSpan.spanContext().traceId).not.toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.parentSpanContext?.spanId).toBeUndefined(); + + // link back to publish span + expect(consumeSpan.links.length).toBe(1); + expect(consumeSpan.links[0].context.traceId).toEqual( + publishSpan.spanContext().traceId + ); + expect(consumeSpan.links[0].context.spanId).toEqual( + publishSpan.spanContext().spanId + ); + }); + }); + }); +}); diff --git a/packages/instrumentation-amqplib/test/utils.test.ts b/packages/instrumentation-amqplib/test/utils.test.ts index 6369398969..84496ed4ae 100644 --- a/packages/instrumentation-amqplib/test/utils.test.ts +++ b/packages/instrumentation-amqplib/test/utils.test.ts @@ -18,10 +18,28 @@ import { expect } from 'expect'; import { getConnectionAttributesFromServer, getConnectionAttributesFromUrl, + getConsumeAttributes, + getConsumeSpanName, + getPublishAttributes, + getPublishSpanName, } from '../src/utils'; import { - SEMATTRS_MESSAGING_PROTOCOL, + ATTR_NETWORK_PEER_ADDRESS, + ATTR_NETWORK_PEER_PORT, + ATTR_NETWORK_PROTOCOL_NAME, + ATTR_NETWORK_PROTOCOL_VERSION, + ATTR_SERVER_ADDRESS, + ATTR_SERVER_PORT, + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + MESSAGINGOPERATIONVALUES_PROCESS, + SEMATTRS_MESSAGING_CONVERSATION_ID, + SEMATTRS_MESSAGING_DESTINATION_KIND, + SEMATTRS_MESSAGING_DESTINATION, + SEMATTRS_MESSAGING_MESSAGE_ID, + SEMATTRS_MESSAGING_OPERATION, SEMATTRS_MESSAGING_PROTOCOL_VERSION, + SEMATTRS_MESSAGING_PROTOCOL, + SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY, SEMATTRS_MESSAGING_SYSTEM, SEMATTRS_MESSAGING_URL, SEMATTRS_NET_PEER_NAME, @@ -30,6 +48,19 @@ import { import * as amqp from 'amqplib'; import { shouldTest } from './utils'; import { rabbitMqUrl } from './config'; +import { SemconvStability } from '@opentelemetry/instrumentation'; +import { + ATTR_MESSAGING_DESTINATION_NAME, + ATTR_MESSAGING_MESSAGE_BODY_SIZE, + ATTR_MESSAGING_MESSAGE_CONVERSATION_ID, + ATTR_MESSAGING_MESSAGE_ID, + ATTR_MESSAGING_OPERATION_NAME, + ATTR_MESSAGING_OPERATION_TYPE, + ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY, + ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG, + ATTR_MESSAGING_SYSTEM, + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, +} from '@opentelemetry/semantic-conventions/incubating'; describe('utils', () => { describe('getConnectionAttributesFromServer', () => { @@ -47,128 +78,1228 @@ describe('utils', () => { } }); - it('messaging system attribute', () => { - const attributes = getConnectionAttributesFromServer(conn.connection); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_SYSTEM]: 'rabbitmq', + describe('Old attributes', () => { + it('messaging system attribute', () => { + const attributes = getConnectionAttributesFromServer(conn.connection); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_SYSTEM]: 'rabbitmq', + }); + }); + }); + + describe('Stable attributes', () => { + it('messaging system attribute', () => { + const attributes = getConnectionAttributesFromServer(conn.connection); + expect(attributes).toStrictEqual({ + [ATTR_MESSAGING_SYSTEM]: 'rabbitmq', + }); }); }); }); describe('getConnectionAttributesFromUrl', () => { - it('all features', () => { - const attributes = getConnectionAttributesFromUrl( - 'amqp://user:pass@host:10000/vhost' - ); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_NET_PEER_NAME]: 'host', - [SEMATTRS_NET_PEER_PORT]: 10000, - [SEMATTRS_MESSAGING_URL]: 'amqp://user:***@host:10000/vhost', + describe('Old attributes', () => { + it('all features', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://user:pass@host:10000/vhost', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_NET_PEER_NAME]: 'host', + [SEMATTRS_NET_PEER_PORT]: 10000, + [SEMATTRS_MESSAGING_URL]: 'amqp://user:***@host:10000/vhost', + }); + }); + + it('all features encoded', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://user%61:%61pass@ho%61st:10000/v%2fhost', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_NET_PEER_NAME]: 'ho%61st', + [SEMATTRS_NET_PEER_PORT]: 10000, + [SEMATTRS_MESSAGING_URL]: 'amqp://user%61:***@ho%61st:10000/v%2fhost', + }); + }); + + it('only protocol', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_NET_PEER_NAME]: 'localhost', + [SEMATTRS_NET_PEER_PORT]: 5672, + [SEMATTRS_MESSAGING_URL]: 'amqp://', + }); + }); + + it('empty username and password', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://:@/', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_MESSAGING_URL]: 'amqp://:***@/', + }); + }); + + it('username and no password', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://user@', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_MESSAGING_URL]: 'amqp://user@', + }); + }); + + it('username and password, no host', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://user:pass@', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_MESSAGING_URL]: 'amqp://user:***@', + }); + }); + + it('host only', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://host', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_NET_PEER_NAME]: 'host', + [SEMATTRS_NET_PEER_PORT]: 5672, + [SEMATTRS_MESSAGING_URL]: 'amqp://host', + }); + }); + + it('vhost only', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp:///vhost', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_NET_PEER_NAME]: 'localhost', + [SEMATTRS_NET_PEER_PORT]: 5672, + [SEMATTRS_MESSAGING_URL]: 'amqp:///vhost', + }); + }); + + it('host only, trailing slash', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://host/', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_NET_PEER_NAME]: 'host', + [SEMATTRS_NET_PEER_PORT]: 5672, + [SEMATTRS_MESSAGING_URL]: 'amqp://host/', + }); + }); + + it('vhost encoded', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://host/%2f', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_NET_PEER_NAME]: 'host', + [SEMATTRS_NET_PEER_PORT]: 5672, + [SEMATTRS_MESSAGING_URL]: 'amqp://host/%2f', + }); + }); + + it('IPv6 host', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://[::1]', + SemconvStability.OLD + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_NET_PEER_NAME]: '[::1]', + [SEMATTRS_NET_PEER_PORT]: 5672, + [SEMATTRS_MESSAGING_URL]: 'amqp://[::1]', + }); + }); + }); + + describe('Stable attributes', () => { + it('all features', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://user:pass@host:10000/vhost', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: 'host', + [ATTR_NETWORK_PEER_PORT]: 10000, + [ATTR_SERVER_ADDRESS]: 'host', + [ATTR_SERVER_PORT]: 10000, + }); + }); + + it('all features encoded', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://user%61:%61pass@ho%61st:10000/v%2fhost', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: 'ho%61st', + [ATTR_NETWORK_PEER_PORT]: 10000, + [ATTR_SERVER_ADDRESS]: 'ho%61st', + [ATTR_SERVER_PORT]: 10000, + }); + }); + + it('only protocol', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: 'localhost', + [ATTR_NETWORK_PEER_PORT]: 5672, + [ATTR_SERVER_ADDRESS]: 'localhost', + [ATTR_SERVER_PORT]: 5672, + }); + }); + + it('empty username and password', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://:@/', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + }); + }); + + it('username and no password', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://user@', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + }); + }); + + it('username and password, no host', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://user:pass@', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + }); + }); + + it('host only', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://host', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: 'host', + [ATTR_NETWORK_PEER_PORT]: 5672, + [ATTR_SERVER_ADDRESS]: 'host', + [ATTR_SERVER_PORT]: 5672, + }); + }); + + it('vhost only', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp:///vhost', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: 'localhost', + [ATTR_NETWORK_PEER_PORT]: 5672, + [ATTR_SERVER_ADDRESS]: 'localhost', + [ATTR_SERVER_PORT]: 5672, + }); + }); + + it('host only, trailing slash', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://host/', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: 'host', + [ATTR_NETWORK_PEER_PORT]: 5672, + [ATTR_SERVER_ADDRESS]: 'host', + [ATTR_SERVER_PORT]: 5672, + }); + }); + + it('vhost encoded', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://host/%2f', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: 'host', + [ATTR_NETWORK_PEER_PORT]: 5672, + [ATTR_SERVER_ADDRESS]: 'host', + [ATTR_SERVER_PORT]: 5672, + }); + }); + + it('IPv6 host', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://[::1]', + SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: '[::1]', + [ATTR_NETWORK_PEER_PORT]: 5672, + [ATTR_SERVER_ADDRESS]: '[::1]', + [ATTR_SERVER_PORT]: 5672, + }); + }); + }); + + describe('Both old and stable attributes', () => { + it('all features', () => { + const attributes = getConnectionAttributesFromUrl( + 'amqp://user:pass@host:10000/vhost', + SemconvStability.OLD | SemconvStability.STABLE + ); + expect(attributes).toStrictEqual({ + [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', + [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', + [SEMATTRS_NET_PEER_NAME]: 'host', + [SEMATTRS_NET_PEER_PORT]: 10000, + [SEMATTRS_MESSAGING_URL]: 'amqp://user:***@host:10000/vhost', + [ATTR_NETWORK_PROTOCOL_NAME]: 'AMQP', + [ATTR_NETWORK_PROTOCOL_VERSION]: '0.9.1', + [ATTR_NETWORK_PEER_ADDRESS]: 'host', + [ATTR_NETWORK_PEER_PORT]: 10000, + [ATTR_SERVER_ADDRESS]: 'host', + [ATTR_SERVER_PORT]: 10000, + }); + }); + }); + }); + + describe('getPublishSpanName', () => { + describe('Old attributes', () => { + it('should return the exchange name', () => { + expect( + getPublishSpanName( + 'test-exchange', + 'routing.key', + SemconvStability.OLD + ) + ).toBe('publish test-exchange'); + }); + + it('should handle empty exchange as ', () => { + expect( + getPublishSpanName('', 'routing.key', SemconvStability.OLD) + ).toBe('publish '); + }); + + it('should handle special characters in exchange name', () => { + expect( + getPublishSpanName( + 'exchange.with-special_chars', + 'routing.key', + SemconvStability.OLD + ) + ).toBe('publish exchange.with-special_chars'); + }); + + it('should handle long exchange names', () => { + expect( + getPublishSpanName( + 'very-long-exchange-name-with-many-characters', + 'routing.key', + SemconvStability.OLD + ) + ).toBe('publish very-long-exchange-name-with-many-characters'); + }); + + it('should ignore the routing key value', () => { + expect( + getPublishSpanName( + 'test-exchange', + 'different.routing.key', + SemconvStability.OLD + ) + ).toBe('publish test-exchange'); + }); + }); + + describe('Stable attributes', () => { + it('should return exchange:routingKey when both are present', () => { + expect( + getPublishSpanName( + 'test-exchange', + 'routing.key', + SemconvStability.STABLE + ) + ).toBe('publish test-exchange:routing.key'); + }); + + it('should return only exchange when routing key is empty', () => { + expect( + getPublishSpanName('test-exchange', '', SemconvStability.STABLE) + ).toBe('publish test-exchange'); + }); + + it('should return only routing key when exchange is empty', () => { + expect( + getPublishSpanName('', 'routing.key', SemconvStability.STABLE) + ).toBe('publish routing.key'); + }); + + it('should use amq.default when both are empty', () => { + expect(getPublishSpanName('', '', SemconvStability.STABLE)).toBe( + 'publish amq.default' + ); + }); + + it('should handle dots in exchange and routing key', () => { + expect( + getPublishSpanName( + 'app.service.exchange', + 'user.created.event', + SemconvStability.STABLE + ) + ).toBe('publish app.service.exchange:user.created.event'); + }); + + it('should handle special characters', () => { + expect( + getPublishSpanName( + 'exchange-with_special.chars', + 'routing.key-with_special.chars', + SemconvStability.STABLE + ) + ).toBe( + 'publish exchange-with_special.chars:routing.key-with_special.chars' + ); }); }); - it('all features encoded', () => { - const attributes = getConnectionAttributesFromUrl( - 'amqp://user%61:%61pass@ho%61st:10000/v%2fhost' - ); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_NET_PEER_NAME]: 'ho%61st', - [SEMATTRS_NET_PEER_PORT]: 10000, - [SEMATTRS_MESSAGING_URL]: 'amqp://user%61:***@ho%61st:10000/v%2fhost', + describe('Both old and stable attributes', () => { + it('should use stable format when both flags are set', () => { + expect( + getPublishSpanName( + 'test-exchange', + 'routing.key', + SemconvStability.OLD | SemconvStability.STABLE + ) + ).toBe('publish test-exchange:routing.key'); + }); + + it('should prioritize stable format over old format', () => { + const spanName = getPublishSpanName( + 'my-exchange', + 'my.key', + SemconvStability.OLD | SemconvStability.STABLE + ); + expect(spanName).toBe('publish my-exchange:my.key'); + expect(spanName).not.toBe('publish my-exchange'); + }); + + it('should use stable default when both are empty', () => { + const spanName = getPublishSpanName( + '', + '', + SemconvStability.OLD | SemconvStability.STABLE + ); + expect(spanName).toBe('publish amq.default'); + expect(spanName).not.toBe('publish '); }); }); + }); + + describe('getPublishAttributes', () => { + describe('Old attributes', () => { + it('should return minimal attributes', () => { + expect( + getPublishAttributes( + 'test-exchange', + 'routing.key', + 1024, + {}, + SemconvStability.OLD + ) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: 'test-exchange', + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: 'routing.key', + [SEMATTRS_MESSAGING_MESSAGE_ID]: undefined, + [SEMATTRS_MESSAGING_CONVERSATION_ID]: undefined, + }); + }); + + it('should support messageId and correlationId', () => { + expect( + getPublishAttributes( + 'test-exchange', + 'routing.key', + 2048, + { messageId: 'msg-123', correlationId: 'corr-456' }, + SemconvStability.OLD + ) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: 'test-exchange', + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: 'routing.key', + [SEMATTRS_MESSAGING_MESSAGE_ID]: 'msg-123', + [SEMATTRS_MESSAGING_CONVERSATION_ID]: 'corr-456', + }); + }); + + it('should handle empty exchange', () => { + expect( + getPublishAttributes('', 'routing.key', 512, {}, SemconvStability.OLD) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: '', + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: 'routing.key', + [SEMATTRS_MESSAGING_MESSAGE_ID]: undefined, + [SEMATTRS_MESSAGING_CONVERSATION_ID]: undefined, + }); + }); + + it('should handle empty routing key', () => { + expect( + getPublishAttributes( + 'test-exchange', + '', + 256, + {}, + SemconvStability.OLD + ) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: 'test-exchange', + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: '', + [SEMATTRS_MESSAGING_MESSAGE_ID]: undefined, + [SEMATTRS_MESSAGING_CONVERSATION_ID]: undefined, + }); + }); - it('only protocol', () => { - const attributes = getConnectionAttributesFromUrl('amqp://'); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_NET_PEER_NAME]: 'localhost', - [SEMATTRS_NET_PEER_PORT]: 5672, - [SEMATTRS_MESSAGING_URL]: 'amqp://', + it('should handle special characters', () => { + expect( + getPublishAttributes( + 'test.exchange-with_special.chars', + 'routing.key.with-special_chars', + 100, + { messageId: 'special-chars-msg' }, + SemconvStability.OLD + ) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: 'test.exchange-with_special.chars', + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: + 'routing.key.with-special_chars', + [SEMATTRS_MESSAGING_MESSAGE_ID]: 'special-chars-msg', + [SEMATTRS_MESSAGING_CONVERSATION_ID]: undefined, + }); }); }); - it('empty username and password', () => { - const attributes = getConnectionAttributesFromUrl('amqp://:@/'); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_MESSAGING_URL]: 'amqp://:***@/', + describe('Stable attributes', () => { + it('should return minimal attributes', () => { + expect( + getPublishAttributes( + 'test-exchange', + 'routing.key', + 1024, + {}, + SemconvStability.STABLE + ) + ).toStrictEqual({ + [ATTR_MESSAGING_OPERATION_TYPE]: 'send', + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: 'test-exchange:routing.key', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'routing.key', + [ATTR_MESSAGING_MESSAGE_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: 1024, + }); + }); + + it('should support messageId and correlationId', () => { + expect( + getPublishAttributes( + 'test-exchange', + 'routing.key', + 2048, + { messageId: 'msg-123', correlationId: 'corr-456' }, + SemconvStability.STABLE + ) + ).toStrictEqual({ + [ATTR_MESSAGING_OPERATION_TYPE]: 'send', + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: 'test-exchange:routing.key', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'routing.key', + [ATTR_MESSAGING_MESSAGE_ID]: 'msg-123', + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: 'corr-456', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: 2048, + }); + }); + + it('should handle empty exchange', () => { + expect( + getPublishAttributes( + '', + 'routing.key', + 512, + {}, + SemconvStability.STABLE + ) + ).toStrictEqual({ + [ATTR_MESSAGING_OPERATION_TYPE]: 'send', + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: 'routing.key', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'routing.key', + [ATTR_MESSAGING_MESSAGE_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: 512, + }); + }); + + it('should handle empty routing key', () => { + expect( + getPublishAttributes( + 'test-exchange', + '', + 256, + {}, + SemconvStability.STABLE + ) + ).toStrictEqual({ + [ATTR_MESSAGING_OPERATION_TYPE]: 'send', + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: 'test-exchange', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: '', + [ATTR_MESSAGING_MESSAGE_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: 256, + }); + }); + + it('should handle zero content length', () => { + expect( + getPublishAttributes( + 'test-exchange', + 'routing.key', + 0, + {}, + SemconvStability.STABLE + ) + ).toStrictEqual({ + [ATTR_MESSAGING_OPERATION_TYPE]: 'send', + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: 'test-exchange:routing.key', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'routing.key', + [ATTR_MESSAGING_MESSAGE_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: 0, + }); }); }); - it('username and no password', () => { - const attributes = getConnectionAttributesFromUrl('amqp://user@'); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_MESSAGING_URL]: 'amqp://user@', + describe('Both old and stable attributes', () => { + it('should combine minimal attributes', () => { + expect( + getPublishAttributes( + 'exchange', + '', + 256, + {}, + SemconvStability.OLD | SemconvStability.STABLE + ) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: 'exchange', + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: '', + [SEMATTRS_MESSAGING_MESSAGE_ID]: undefined, + [SEMATTRS_MESSAGING_CONVERSATION_ID]: undefined, + [ATTR_MESSAGING_OPERATION_TYPE]: 'send', + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: 'exchange', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: '', + [ATTR_MESSAGING_MESSAGE_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: 256, + }); + }); + + it('should combine with all options', () => { + expect( + getPublishAttributes( + 'test-exchange', + 'routing.key', + 1024, + { messageId: 'msg-123', correlationId: 'corr-456' }, + SemconvStability.OLD | SemconvStability.STABLE + ) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: 'test-exchange', + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: 'routing.key', + [SEMATTRS_MESSAGING_MESSAGE_ID]: 'msg-123', + [SEMATTRS_MESSAGING_CONVERSATION_ID]: 'corr-456', + [ATTR_MESSAGING_OPERATION_TYPE]: 'send', + [ATTR_MESSAGING_OPERATION_NAME]: 'publish', + [ATTR_MESSAGING_DESTINATION_NAME]: 'test-exchange:routing.key', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'routing.key', + [ATTR_MESSAGING_MESSAGE_ID]: 'msg-123', + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: 'corr-456', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: 1024, + }); }); }); + }); + + describe('getConsumeSpanName', () => { + describe('Old attributes', () => { + it('should return " process" with a basic queue name', () => { + expect( + getConsumeSpanName( + 'test-queue', + {} as amqp.ConsumeMessage, + SemconvStability.OLD + ) + ).toBe('test-queue process'); + }); + + it('should handle empty queue', () => { + expect( + getConsumeSpanName( + '', + {} as amqp.ConsumeMessage, + SemconvStability.OLD + ) + ).toBe(' process'); + }); - it('username and password, no host', () => { - const attributes = getConnectionAttributesFromUrl('amqp://user:pass@'); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_MESSAGING_URL]: 'amqp://user:***@', + it('should allow special characters in queue name', () => { + expect( + getConsumeSpanName( + 'queue.with-special_chars', + {} as amqp.ConsumeMessage, + SemconvStability.OLD + ) + ).toBe('queue.with-special_chars process'); + }); + + it('should handle long queue name', () => { + expect( + getConsumeSpanName( + 'very-long-queue-name-with-many-characters', + {} as amqp.ConsumeMessage, + SemconvStability.OLD + ) + ).toBe('very-long-queue-name-with-many-characters process'); + }); + + it('should allow underscores in queue name', () => { + expect( + getConsumeSpanName( + 'my_queue_name', + {} as amqp.ConsumeMessage, + SemconvStability.OLD + ) + ).toBe('my_queue_name process'); }); }); - it('host only', () => { - const attributes = getConnectionAttributesFromUrl('amqp://host'); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_NET_PEER_NAME]: 'host', - [SEMATTRS_NET_PEER_PORT]: 5672, - [SEMATTRS_MESSAGING_URL]: 'amqp://host', + describe('Stable attributes', () => { + it('should return "consume " with a queue', () => { + const msg = { + fields: { exchange: '', routingKey: '' }, + } as amqp.ConsumeMessage; + expect( + getConsumeSpanName('my-special-queue', msg, SemconvStability.STABLE) + ).toBe('consume my-special-queue'); + }); + + it('should use "amq.default" when queue is empty and fields empty', () => { + const msg = { + fields: { exchange: '', routingKey: '' }, + } as amqp.ConsumeMessage; + expect(getConsumeSpanName('', msg, SemconvStability.STABLE)).toBe( + 'consume amq.default' + ); + }); + + it('should return "consume exchange:routingKey" when queue and routingKey equals and exchange is present', () => { + const msg = { + fields: { exchange: 'a', routingKey: 'b' }, + } as amqp.ConsumeMessage; + expect(getConsumeSpanName('b', msg, SemconvStability.STABLE)).toBe( + 'consume a:b' + ); + }); + + it('should return "consume queue" when queue==routingKey and no exchange', () => { + const msg = { + fields: { exchange: '', routingKey: 'c' }, + } as amqp.ConsumeMessage; + expect(getConsumeSpanName('c', msg, SemconvStability.STABLE)).toBe( + 'consume c' + ); + }); + + it('should return "consume exchange" when exchange==queue==routingKey', () => { + const msg = { + fields: { exchange: 'x', routingKey: 'x' }, + } as amqp.ConsumeMessage; + expect(getConsumeSpanName('x', msg, SemconvStability.STABLE)).toBe( + 'consume x' + ); + }); + + it('should return "consume exchange:routingKey:queue" when all are different', () => { + const msg = { + fields: { exchange: 'test-exchange', routingKey: 'routing.key' }, + } as amqp.ConsumeMessage; + expect( + getConsumeSpanName('different-queue', msg, SemconvStability.STABLE) + ).toBe('consume test-exchange:routing.key:different-queue'); + }); + + it('should return "consume exchange:routingKey" when queue is empty', () => { + const msg = { + fields: { exchange: 'ex', routingKey: 'rk' }, + } as amqp.ConsumeMessage; + expect(getConsumeSpanName('', msg, SemconvStability.STABLE)).toBe( + 'consume ex:rk' + ); + }); + + it('should return "consume exchange:queue" when routingKey is empty', () => { + const msg = { + fields: { exchange: 'ex', routingKey: '' }, + } as amqp.ConsumeMessage; + expect(getConsumeSpanName('q', msg, SemconvStability.STABLE)).toBe( + 'consume ex:q' + ); + }); + + it('should return "consume routingKey:queue" when exchange is empty', () => { + const msg = { + fields: { exchange: '', routingKey: 'r' }, + } as amqp.ConsumeMessage; + expect(getConsumeSpanName('q', msg, SemconvStability.STABLE)).toBe( + 'consume r:q' + ); + }); + + it('should allow special characters in queue name', () => { + const msg = { + fields: { exchange: '', routingKey: '' }, + } as amqp.ConsumeMessage; + expect( + getConsumeSpanName( + 'queue.with-special_chars', + msg, + SemconvStability.STABLE + ) + ).toBe('consume queue.with-special_chars'); + }); + + it('should handle long queue names', () => { + const msg = { + fields: { exchange: '', routingKey: '' }, + } as amqp.ConsumeMessage; + expect( + getConsumeSpanName( + 'very-long-queue-name-with-many-characters', + msg, + SemconvStability.STABLE + ) + ).toBe('consume very-long-queue-name-with-many-characters'); + }); + + it('should handle numeric queue name', () => { + expect( + getConsumeSpanName( + '123456', + {} as amqp.ConsumeMessage, + SemconvStability.STABLE + ) + ).toBe('consume 123456'); + }); + + it('should handle dotted queue name', () => { + expect( + getConsumeSpanName( + 'app.service-worker.queue', + {} as amqp.ConsumeMessage, + SemconvStability.STABLE + ) + ).toBe('consume app.service-worker.queue'); + }); + + it('should handle queue name with unicode', () => { + expect( + getConsumeSpanName( + 'queue-émojis-🚀', + {} as amqp.ConsumeMessage, + SemconvStability.STABLE + ) + ).toBe('consume queue-émojis-🚀'); }); }); - it('vhost only', () => { - const attributes = getConnectionAttributesFromUrl('amqp:///vhost'); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_NET_PEER_NAME]: 'localhost', - [SEMATTRS_NET_PEER_PORT]: 5672, - [SEMATTRS_MESSAGING_URL]: 'amqp:///vhost', + describe('Both old and stable attributes', () => { + it('should prioritize stable format over old format', () => { + const spanName = getConsumeSpanName( + 'my-queue', + {} as amqp.ConsumeMessage, + SemconvStability.OLD | SemconvStability.STABLE + ); + expect(spanName).toBe('consume my-queue'); + expect(spanName).not.toBe('my-queue process'); }); }); + }); + + describe('getConsumeAttributes', () => { + describe('Old attributes', () => { + it('should return minimal attributes', () => { + const msg = {} as amqp.ConsumeMessage; + expect( + getConsumeAttributes('queue-name', msg, SemconvStability.OLD) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: undefined, + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_OPERATION]: MESSAGINGOPERATIONVALUES_PROCESS, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: undefined, + [SEMATTRS_MESSAGING_MESSAGE_ID]: undefined, + [SEMATTRS_MESSAGING_CONVERSATION_ID]: undefined, + }); + }); - it('host only, trailing slash', () => { - const attributes = getConnectionAttributesFromUrl('amqp://host/'); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_NET_PEER_NAME]: 'host', - [SEMATTRS_NET_PEER_PORT]: 5672, - [SEMATTRS_MESSAGING_URL]: 'amqp://host/', + it('should return all consume attributes from fields/properties', () => { + const msg = { + fields: { + exchange: 'test-exchange', + routingKey: 'routing.key', + deliveryTag: 2, + }, + properties: { + messageId: 'msg-123', + correlationId: 'corr-456', + }, + content: Buffer.from('test message with properties'), + } as amqp.ConsumeMessage; + expect( + getConsumeAttributes('queue-name', msg, SemconvStability.OLD) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: 'test-exchange', + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_OPERATION]: MESSAGINGOPERATIONVALUES_PROCESS, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: 'routing.key', + [SEMATTRS_MESSAGING_MESSAGE_ID]: 'msg-123', + [SEMATTRS_MESSAGING_CONVERSATION_ID]: 'corr-456', + }); }); }); - it('vhost encoded', () => { - const attributes = getConnectionAttributesFromUrl('amqp://host/%2f'); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_NET_PEER_NAME]: 'host', - [SEMATTRS_NET_PEER_PORT]: 5672, - [SEMATTRS_MESSAGING_URL]: 'amqp://host/%2f', + describe('Stable attributes', () => { + it('should return minimal stable attributes', () => { + const msg = {} as amqp.ConsumeMessage; + expect( + getConsumeAttributes('', msg, SemconvStability.STABLE) + ).toStrictEqual({ + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: 'amq.default', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: undefined, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: undefined, + [ATTR_MESSAGING_MESSAGE_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: undefined, + }); + }); + + it('should return all consume attributes from fields/properties', () => { + const msg = { + fields: { + exchange: 'test-exchange', + routingKey: 'routing.key', + deliveryTag: 2, + }, + properties: { + messageId: 'msg-123', + correlationId: 'corr-456', + }, + content: Buffer.from('test message with properties'), + } as amqp.ConsumeMessage; + expect( + getConsumeAttributes('queue-name', msg, SemconvStability.STABLE) + ).toStrictEqual({ + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: + 'test-exchange:routing.key:queue-name', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'routing.key', + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 2, + [ATTR_MESSAGING_MESSAGE_ID]: 'msg-123', + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: 'corr-456', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: 28, + }); + }); + + it('should use exchange:routingKey when queue == routingKey', () => { + const msg = { + fields: { + exchange: 'test-exchange', + routingKey: 'queue-name', + deliveryTag: 1, + }, + properties: {}, + content: Buffer.from('test'), + } as amqp.ConsumeMessage; + const attrs = getConsumeAttributes( + 'queue-name', + msg, + SemconvStability.STABLE + ); + expect(attrs[ATTR_MESSAGING_DESTINATION_NAME]).toBe( + 'test-exchange:queue-name' + ); + }); + + it('should use exchange:routingKey:queue when all different', () => { + const msg = { + fields: { + exchange: 'test-exchange', + routingKey: 'routing.key', + deliveryTag: 1, + }, + properties: {}, + content: Buffer.from('test'), + } as amqp.ConsumeMessage; + const attrs = getConsumeAttributes( + 'different-queue', + msg, + SemconvStability.STABLE + ); + expect(attrs[ATTR_MESSAGING_DESTINATION_NAME]).toBe( + 'test-exchange:routing.key:different-queue' + ); + }); + + it('should use exchange:routingKey when queue is missing', () => { + const msg = { + fields: { + exchange: 'test-exchange', + routingKey: 'routing.key', + deliveryTag: 1, + }, + properties: {}, + content: Buffer.from('test'), + } as amqp.ConsumeMessage; + const attrs = getConsumeAttributes('', msg, SemconvStability.STABLE); + expect(attrs[ATTR_MESSAGING_DESTINATION_NAME]).toBe( + 'test-exchange:routing.key' + ); + }); + + it('should use exchange:queue when routingKey is missing', () => { + const msg = { + fields: { exchange: 'test-exchange', routingKey: '', deliveryTag: 1 }, + properties: {}, + content: Buffer.from('test'), + } as amqp.ConsumeMessage; + const attrs = getConsumeAttributes( + 'test-queue', + msg, + SemconvStability.STABLE + ); + expect(attrs[ATTR_MESSAGING_DESTINATION_NAME]).toBe( + 'test-exchange:test-queue' + ); + }); + + it('should use routingKey:queue when exchange is empty', () => { + const msg = { + fields: { exchange: '', routingKey: 'routing.key', deliveryTag: 1 }, + properties: {}, + content: Buffer.from('test'), + } as amqp.ConsumeMessage; + const attrs = getConsumeAttributes( + 'test-queue', + msg, + SemconvStability.STABLE + ); + expect(attrs[ATTR_MESSAGING_DESTINATION_NAME]).toBe( + 'routing.key:test-queue' + ); + }); + + it('should use only exchange if only exchange is present', () => { + const msg = { + fields: { exchange: 'test-exchange', routingKey: '', deliveryTag: 1 }, + properties: {}, + content: Buffer.from('test'), + } as amqp.ConsumeMessage; + const attrs = getConsumeAttributes('', msg, SemconvStability.STABLE); + expect(attrs[ATTR_MESSAGING_DESTINATION_NAME]).toBe('test-exchange'); + }); + + it('should use only routingKey if only routingKey is present', () => { + const msg = { + fields: { exchange: '', routingKey: 'routing.key', deliveryTag: 1 }, + properties: {}, + content: Buffer.from('test'), + } as amqp.ConsumeMessage; + const attrs = getConsumeAttributes('', msg, SemconvStability.STABLE); + expect(attrs[ATTR_MESSAGING_DESTINATION_NAME]).toBe('routing.key'); + }); + + it('should use only queue if only queue is present', () => { + const msg = { + fields: { exchange: '', routingKey: '', deliveryTag: 1 }, + properties: {}, + content: Buffer.from('test'), + } as amqp.ConsumeMessage; + const attrs = getConsumeAttributes( + 'test-queue', + msg, + SemconvStability.STABLE + ); + expect(attrs[ATTR_MESSAGING_DESTINATION_NAME]).toBe('test-queue'); + }); + + it('should use amq.default if all are empty', () => { + const msg = { + fields: { exchange: '', routingKey: '', deliveryTag: 1 }, + properties: {}, + content: Buffer.from('test'), + } as amqp.ConsumeMessage; + const attrs = getConsumeAttributes('', msg, SemconvStability.STABLE); + expect(attrs[ATTR_MESSAGING_DESTINATION_NAME]).toBe('amq.default'); }); }); - it('IPv6 host', () => { - const attributes = getConnectionAttributesFromUrl('amqp://[::1]'); - expect(attributes).toStrictEqual({ - [SEMATTRS_MESSAGING_PROTOCOL]: 'AMQP', - [SEMATTRS_MESSAGING_PROTOCOL_VERSION]: '0.9.1', - [SEMATTRS_NET_PEER_NAME]: '[::1]', - [SEMATTRS_NET_PEER_PORT]: 5672, - [SEMATTRS_MESSAGING_URL]: 'amqp://[::1]', + describe('Both old and stable attributes', () => { + it('should combine minimal attributes', () => { + const msg = {} as amqp.ConsumeMessage; + expect( + getConsumeAttributes( + '', + msg, + SemconvStability.OLD | SemconvStability.STABLE + ) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: undefined, + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_OPERATION]: MESSAGINGOPERATIONVALUES_PROCESS, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: undefined, + [SEMATTRS_MESSAGING_MESSAGE_ID]: undefined, + [SEMATTRS_MESSAGING_CONVERSATION_ID]: undefined, + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: 'amq.default', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: undefined, + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: undefined, + [ATTR_MESSAGING_MESSAGE_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: undefined, + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: undefined, + }); + }); + + it('should combine all options', () => { + const msg = { + fields: { + exchange: 'test-exchange', + routingKey: 'routing.key', + deliveryTag: 1, + }, + properties: { + messageId: 'msg-123', + correlationId: 'corr-456', + }, + content: Buffer.from('complete test message'), + } as amqp.ConsumeMessage; + expect( + getConsumeAttributes( + 'queue-name', + msg, + SemconvStability.OLD | SemconvStability.STABLE + ) + ).toStrictEqual({ + [SEMATTRS_MESSAGING_DESTINATION]: 'test-exchange', + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_TOPIC, + [SEMATTRS_MESSAGING_OPERATION]: MESSAGINGOPERATIONVALUES_PROCESS, + [SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: 'routing.key', + [SEMATTRS_MESSAGING_MESSAGE_ID]: 'msg-123', + [SEMATTRS_MESSAGING_CONVERSATION_ID]: 'corr-456', + [ATTR_MESSAGING_OPERATION_TYPE]: + MESSAGING_OPERATION_TYPE_VALUE_PROCESS, + [ATTR_MESSAGING_OPERATION_NAME]: 'consume', + [ATTR_MESSAGING_DESTINATION_NAME]: + 'test-exchange:routing.key:queue-name', + [ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY]: 'routing.key', + [ATTR_MESSAGING_RABBITMQ_MESSAGE_DELIVERY_TAG]: 1, + [ATTR_MESSAGING_MESSAGE_ID]: 'msg-123', + [ATTR_MESSAGING_MESSAGE_CONVERSATION_ID]: 'corr-456', + [ATTR_MESSAGING_MESSAGE_BODY_SIZE]: 21, + }); }); }); });