Skip to content

Commit 5f2c160

Browse files
feat: Add the ability to use span links when consuming a message amqp plugin (#1972)
A config option was added that when set to true, it follows the spec behavior to link to the produce context instead of continuing the context. The default behavior will be as it is today, so one has to opt-in to this change via the config setting. * Add the ability to use span links in AMQP plugin * add default behavior * Add in tests for useLinks option * Updating tests to correct Semantic attributes --------- Co-authored-by: Jamie Danielson <[email protected]>
1 parent 2c32e58 commit 5f2c160

File tree

5 files changed

+724
-11
lines changed

5 files changed

+724
-11
lines changed

plugins/node/instrumentation-amqplib/README.md

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ registerInstrumentations({
4040
// publishConfirmHook: (span: Span, publishConfirmedInto: PublishConfirmedInfo) => { },
4141
// consumeHook: (span: Span, consumeInfo: ConsumeInfo) => { },
4242
// consumeEndHook: (span: Span, consumeEndInfo: ConsumeEndInfo) => { },
43+
// useLinksForConsume: boolean,
4344
}),
4445
],
4546
})
@@ -49,13 +50,14 @@ registerInstrumentations({
4950

5051
amqplib instrumentation has few options available to choose from. You can set the following:
5152

52-
| Options | Type | Description |
53-
| --------------------------------- | ----------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- |
54-
| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. |
55-
| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. |
56-
| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. |
57-
| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. |
58-
| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below |
53+
| Options | Type | Description |
54+
| -------------------- | ---------------------------------------------- | ----------------------------------------------------------------------------------- |
55+
| `publishHook` | `AmqplibPublishCustomAttributeFunction` | hook for adding custom attributes before publish message is sent. |
56+
| `publishConfirmHook` | `AmqplibPublishConfirmCustomAttributeFunction` | hook for adding custom attributes after publish message is confirmed by the broker. |
57+
| `consumeHook` | `AmqplibConsumeCustomAttributeFunction` | hook for adding custom attributes before consumer message is processed. |
58+
| `consumeEndHook` | `AmqplibConsumeEndCustomAttributeFunction` | hook for adding custom attributes after consumer message is acked to server. |
59+
| `consumeTimeoutMs` | `number` | read [Consume Timeout](#consume-timeout) below |
60+
| `useLinksForConsume` | `boolean` | read [Links for Consume](#links-for-consume) below |
5961

6062
### Consume Timeout
6163

@@ -69,6 +71,22 @@ If timeout is not big enough, span might be closed with 'InstrumentationTimeout'
6971

7072
Default is 1 minute
7173

74+
### Links for Consume
75+
76+
By default, consume spans continue the trace where a message was produced. However, per the [spec](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#consumer-spans), consume spans should be linked to the message's creation context. Setting to true, this will enable the behavior to follow the spec.
77+
78+
Default is false
79+
80+
## Running Tests Locally
81+
82+
To run the tests locally, you need to have a RabbitMQ server running. You can use the following command to start a RabbitMQ server using Docker:
83+
84+
```bash
85+
npm run test:docker:run
86+
```
87+
88+
By default, the tests that connect to RabbitMQ are skipped. To make sure these tests are run, you can set the `RUN_RABBIT_TESTS` environment variable to `true`
89+
7290
## Semantic Conventions
7391

7492
This package uses `@opentelemetry/semantic-conventions` version `1.22+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md)

plugins/node/instrumentation-amqplib/src/amqplib.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import {
2222
SpanKind,
2323
SpanStatusCode,
2424
ROOT_CONTEXT,
25+
Link,
26+
Context,
2527
} from '@opentelemetry/api';
2628
import {
2729
hrTime,
@@ -414,8 +416,25 @@ export class AmqplibInstrumentation extends InstrumentationBase {
414416
}
415417

416418
const headers = msg.properties.headers ?? {};
417-
const parentContext = propagation.extract(ROOT_CONTEXT, headers);
419+
let parentContext: Context | undefined = propagation.extract(
420+
ROOT_CONTEXT,
421+
headers
422+
);
418423
const exchange = msg.fields?.exchange;
424+
let links: Link[] | undefined;
425+
if (self._config.useLinksForConsume) {
426+
const parentSpanContext = parentContext
427+
? trace.getSpan(parentContext)?.spanContext()
428+
: undefined;
429+
parentContext = undefined;
430+
if (parentSpanContext) {
431+
links = [
432+
{
433+
context: parentSpanContext,
434+
},
435+
];
436+
}
437+
}
419438
const span = self.tracer.startSpan(
420439
`${queue} process`,
421440
{
@@ -431,6 +450,7 @@ export class AmqplibInstrumentation extends InstrumentationBase {
431450
[SEMATTRS_MESSAGING_CONVERSATION_ID]:
432451
msg?.properties.correlationId,
433452
},
453+
links,
434454
},
435455
parentContext
436456
);
@@ -457,8 +477,10 @@ export class AmqplibInstrumentation extends InstrumentationBase {
457477
// store the span on the message, so we can end it when user call 'ack' on it
458478
msg[MESSAGE_STORED_SPAN] = span;
459479
}
460-
461-
context.with(trace.setSpan(parentContext, span), () => {
480+
const setContext: Context = parentContext
481+
? parentContext
482+
: ROOT_CONTEXT;
483+
context.with(trace.setSpan(setContext, span), () => {
462484
onMessage.call(this, msg);
463485
});
464486

plugins/node/instrumentation-amqplib/src/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,14 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig {
9696
* Default is 1 minute
9797
*/
9898
consumeTimeoutMs?: number;
99+
100+
/** option to use a span link for the consume message instead of continuing a trace */
101+
useLinksForConsume?: boolean;
99102
}
100103

101104
export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = {
102105
consumeTimeoutMs: 1000 * 60, // 1 minute
106+
useLinksForConsume: false,
103107
};
104108

105109
// The following types are vendored from `@types/[email protected]` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510

0 commit comments

Comments
 (0)