Skip to content
Open
84 changes: 66 additions & 18 deletions packages/instrumentation-amqplib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,72 @@ By default, the tests that connect to RabbitMQ are skipped. To make sure these t

## Semantic Conventions

This package uses `@opentelemetry/semantic-conventions` version `1.22+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md)

Attributes collected:

| Attribute | Short Description |
| -------------------------------- | ---------------------------------------------------------------------- |
| `messaging.destination` | The message destination name. |
| `messaging.destination_kind` | The kind of message destination. |
| `messaging.rabbitmq.routing_key` | RabbitMQ message routing key. |
| `messaging.operation` | A string identifying the kind of message consumption. |
| `messaging.message_id` | A value used by the messaging system as an identifier for the message. |
| `messaging.conversation_id` | The ID identifying the conversation to which the message belongs. |
| `messaging.protocol` | The name of the transport protocol. |
| `messaging.protocol_version` | The version of the transport protocol. |
| `messaging.system` | A string identifying the messaging system. |
| `messaging.url` | The connection string. |
| `net.peer.name` | Remote hostname or similar. |
| `net.peer.port` | Remote port number. |
This package supports both legacy and future stable OpenTelemetry semantic conventions for messaging systems. The behavior is controlled by the `OTEL_SEMCONV_STABILITY_OPT_IN` environment variable.

**Note**: The v1.36.0+ conventions are not yet stable but will become stable in the future. This instrumentation is progressively implementing the new attributes and span names in preparation for the transition to stable conventions.

Configure the instrumentation using one of the following options:

- **Empty (default)**: Emit only legacy v1.7.0 conventions ([messaging spec](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md))
- **`messaging`**: Emit only stable v1.36.0+ conventions ([messaging spec](https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/messaging/messaging-spans.md) or [RabbitMQ messaging spec](https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/messaging/rabbitmq.md))
- **`messaging/dup`**: Emit both legacy and stable conventions simultaneously for migration purposes

### Attributes Collected

| v1.7.0 semconv | v1.36.0+ semconv | Description |
| --------------------------------- | --------------------------------------------- | ----------- |
| `messaging.protocol` | `network.protocol.name` | The name of the transport protocol (`AMQP`) |
| `messaging.protocol_version` | `network.protocol.version` | The version of the transport protocol (`0.9.1`) |
| `net.peer.name` | `network.peer.address` | Remote hostname or similar |
| `net.peer.port` | `network.peer.port` | Remote port number |
| - | `server.address` | Server hostname or similar |
| - | `server.port` | Server port number |
| `messaging.system` | `messaging.system` | A string identifying the messaging system (`rabbitmq`) |
| `messaging.url` | Removed | The connection string (with credentials masked) |
| `messaging.destination_kind` | Removed | The kind of message destination (always `topic` for RabbitMQ) |
| - | `messaging.operation.type` | A string identifying the type of operation (`send`, `receive`) |
| `messaging.operation` | `messaging.operation.name` | A string identifying the name of operation (`publish`, `consume`) |
| `messaging.destination` | `messaging.destination.name` | The message destination name (exchange name or destination) |
| `messaging.message_id` | `messaging.message.id` | A value used by the messaging system as an identifier for the message |
| `messaging.conversation_id` | `messaging.message.conversation_id` | The ID identifying the conversation to which the message belongs |
| - | `messaging.message.body.size` | The size of the message body in bytes |
| `messaging.rabbitmq.routing_key` | `messaging.rabbitmq.destination.routing_key` | RabbitMQ message routing key |
| - | `messaging.rabbitmq.message.delivery_tag` | RabbitMQ message delivery tag (consume operations only) |

### Span Naming Conventions

The instrumentation generates different span names based on the semantic convention version:

#### Publish Operations

- **Legacy**: `publish {exchange}` (or `publish <default>` for default exchange)
- **Stable**: `publish {destination}` where destination follows the pattern:
- `{exchange}:{routing_key}` when both are present
- `{exchange}` when only exchange is present
- `{routing_key}` when only routing key is present
- `amq.default` when neither is present

#### Consume Operations

- **Legacy**: `{queue} process`
- **Stable**: `consume {destination}` where destination follows this priority pattern:
- `{exchange}:{routing_key}:{queue}` when all are present and routing_key ≠ queue
- `{exchange}:{routing_key}` when all are present and routing_key = queue, or when exchange and routing_key are present
- `{exchange}:{queue}` when exchange and queue are present (no routing_key)
- `{routing_key}:{queue}` when routing_key and queue are present (no exchange)
- `{exchange}` when only exchange is present
- `{routing_key}` when only routing_key is present
- `{queue}` when only queue is present
- `amq.default` when none are present

### Migration Guide

When upgrading to the new semantic conventions, it is recommended to follow this migration path:

1. **Upgrade** `@opentelemetry/instrumentation-amqplib` to the latest version
2. **Enable dual mode**: Set `OTEL_SEMCONV_STABILITY_OPT_IN=messaging/dup` to emit both old and new semantic conventions
3. **Update monitoring**: Modify alerts, dashboards, metrics, and other processes to use the new semantic conventions
4. **Switch to stable**: Set `OTEL_SEMCONV_STABILITY_OPT_IN=messaging` to emit only the new semantic conventions

## Useful links

Expand Down
75 changes: 39 additions & 36 deletions packages/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,9 @@ import {
InstrumentationNodeModuleFile,
isWrapped,
safeExecuteInTheMiddle,
SemconvStability,
semconvStabilityFromStr,
} from '@opentelemetry/instrumentation';
import {
SEMATTRS_MESSAGING_DESTINATION,
SEMATTRS_MESSAGING_DESTINATION_KIND,
MESSAGINGDESTINATIONKINDVALUES_TOPIC,
SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY,
SEMATTRS_MESSAGING_OPERATION,
MESSAGINGOPERATIONVALUES_PROCESS,
SEMATTRS_MESSAGING_MESSAGE_ID,
SEMATTRS_MESSAGING_CONVERSATION_ID,
} from '@opentelemetry/semantic-conventions';
import type {
Connection,
ConsumeMessage,
Expand All @@ -65,13 +57,16 @@ import {
CONNECTION_ATTRIBUTES,
getConnectionAttributesFromServer,
getConnectionAttributesFromUrl,
getConsumeAttributes,
getConsumeSpanName,
getPublishAttributes,
getPublishSpanName,
InstrumentationConsumeChannel,
InstrumentationMessage,
InstrumentationPublishChannel,
isConfirmChannelTracing,
markConfirmChannelTracing,
MESSAGE_STORED_SPAN,
normalizeExchange,
unmarkConfirmChannelTracing,
} from './utils';
/** @knipignore */
Expand All @@ -80,8 +75,14 @@ import { PACKAGE_NAME, PACKAGE_VERSION } from './version';
const supportedVersions = ['>=0.5.5 <1'];

export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumentationConfig> {
private _semconvStability: SemconvStability = SemconvStability.OLD;

constructor(config: AmqplibInstrumentationConfig = {}) {
super(PACKAGE_NAME, PACKAGE_VERSION, { ...DEFAULT_CONFIG, ...config });
this._semconvStability = semconvStabilityFromStr(
'messaging',
process.env.OTEL_SEMCONV_STABILITY_OPT_IN
);
}

override setConfig(config: AmqplibInstrumentationConfig = {}) {
Expand Down Expand Up @@ -243,6 +244,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
openCallback: (err: any, connection: Connection) => void
) => Connection
) {
const self = this;
return function patchedConnect(
this: unknown,
url: string | Options.Connect,
Expand All @@ -255,7 +257,10 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
socketOptions,
function (this: unknown, err, conn: Connection) {
if (err == null) {
const urlAttributes = getConnectionAttributesFromUrl(url);
const urlAttributes = getConnectionAttributesFromUrl(
url,
self._semconvStability
);
// the type of conn in @types/amqplib is amqp.Connection, but in practice the library send the
// `serverProperties` on the `conn` and not in a property `connection`.
// I don't have capacity to debug it currently but it should probably be fixed in @types or
Expand Down Expand Up @@ -416,7 +421,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
ROOT_CONTEXT,
headers
);
const exchange = msg.fields?.exchange;

let links: Link[] | undefined;
if (self._config.useLinksForConsume) {
const parentSpanContext = parentContext
Expand All @@ -432,19 +437,12 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
}
}
const span = self.tracer.startSpan(
`${queue} process`,
getConsumeSpanName(queue, msg, self._semconvStability),
{
kind: SpanKind.CONSUMER,
attributes: {
...channel?.connection?.[CONNECTION_ATTRIBUTES],
[SEMATTRS_MESSAGING_DESTINATION]: exchange,
[SEMATTRS_MESSAGING_DESTINATION_KIND]:
MESSAGINGDESTINATIONKINDVALUES_TOPIC,
[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: msg.fields?.routingKey,
[SEMATTRS_MESSAGING_OPERATION]: MESSAGINGOPERATIONVALUES_PROCESS,
[SEMATTRS_MESSAGING_MESSAGE_ID]: msg?.properties.messageId,
[SEMATTRS_MESSAGING_CONVERSATION_ID]:
msg?.properties.correlationId,
...getConsumeAttributes(queue, msg, self._semconvStability),
},
links,
},
Expand Down Expand Up @@ -509,6 +507,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
self,
exchange,
routingKey,
content.length,
channel,
options
);
Expand Down Expand Up @@ -611,6 +610,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
self,
exchange,
routingKey,
content.length,
channel,
options
);
Expand Down Expand Up @@ -651,23 +651,26 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
self: this,
exchange: string,
routingKey: string,
contentLength: number,
channel: InstrumentationPublishChannel,
options?: Options.Publish
) {
const normalizedExchange = normalizeExchange(exchange);

const span = self.tracer.startSpan(`publish ${normalizedExchange}`, {
kind: SpanKind.PRODUCER,
attributes: {
...channel.connection[CONNECTION_ATTRIBUTES],
[SEMATTRS_MESSAGING_DESTINATION]: exchange,
[SEMATTRS_MESSAGING_DESTINATION_KIND]:
MESSAGINGDESTINATIONKINDVALUES_TOPIC,
[SEMATTRS_MESSAGING_RABBITMQ_ROUTING_KEY]: routingKey,
[SEMATTRS_MESSAGING_MESSAGE_ID]: options?.messageId,
[SEMATTRS_MESSAGING_CONVERSATION_ID]: options?.correlationId,
},
});
const span = self.tracer.startSpan(
getPublishSpanName(exchange, routingKey, self._semconvStability),
{
kind: SpanKind.PRODUCER,
attributes: {
...channel.connection[CONNECTION_ATTRIBUTES],
...getPublishAttributes(
exchange,
routingKey,
contentLength,
options,
self._semconvStability
),
},
}
);
const modifiedOptions = options ?? {};
modifiedOptions.headers = modifiedOptions.headers ?? {};

Expand Down
Loading
Loading