Skip to content

Commit befea74

Browse files
authored
Merge branch 'main' into sfns-pr-real
2 parents b8794c1 + 1d263e5 commit befea74

File tree

14 files changed

+180
-188
lines changed

14 files changed

+180
-188
lines changed

package-lock.json

Lines changed: 9 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/instrumentation-amqplib/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
"devDependencies": {
5959
"@opentelemetry/api": "^1.3.0",
6060
"@opentelemetry/contrib-test-utils": "^0.49.0",
61-
"@types/amqplib": "^0.5.17",
61+
"@types/amqplib": "^0.10.7",
6262
"@types/lodash": "4.14.199",
6363
"@types/mocha": "10.0.10",
6464
"@types/node": "18.18.14",

packages/instrumentation-amqplib/src/amqplib.ts

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ import {
6565
CONNECTION_ATTRIBUTES,
6666
getConnectionAttributesFromServer,
6767
getConnectionAttributesFromUrl,
68+
InstrumentationConnection,
6869
InstrumentationConsumeChannel,
70+
InstrumentationConsumeMessage,
6971
InstrumentationMessage,
7072
InstrumentationPublishChannel,
7173
isConfirmChannelTracing,
@@ -253,18 +255,11 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
253255
this,
254256
url,
255257
socketOptions,
256-
function (this: unknown, err, conn: Connection) {
258+
function (this: unknown, err, conn: InstrumentationConnection) {
257259
if (err == null) {
258260
const urlAttributes = getConnectionAttributesFromUrl(url);
259-
// the type of conn in @types/amqplib is amqp.Connection, but in practice the library send the
260-
// `serverProperties` on the `conn` and not in a property `connection`.
261-
// I don't have capacity to debug it currently but it should probably be fixed in @types or
262-
// in the package itself
263-
// currently setting as any to calm typescript
264-
const serverAttributes = getConnectionAttributesFromServer(
265-
conn as any
266-
);
267-
(conn as any)[CONNECTION_ATTRIBUTES] = {
261+
const serverAttributes = getConnectionAttributesFromServer(conn);
262+
conn[CONNECTION_ATTRIBUTES] = {
268263
...urlAttributes,
269264
...serverAttributes,
270265
};
@@ -402,7 +397,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
402397

403398
const patchedOnMessage = function (
404399
this: unknown,
405-
msg: InstrumentationMessage | null
400+
msg: InstrumentationConsumeMessage | null
406401
) {
407402
// msg is expected to be null for signaling consumer cancel notification
408403
// https://www.rabbitmq.com/consumer-cancel.html
@@ -724,7 +719,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
724719

725720
private callConsumeEndHook(
726721
span: Span,
727-
msg: ConsumeMessage,
722+
msg: InstrumentationMessage,
728723
rejected: boolean | null,
729724
endOperation: EndOperation
730725
) {

packages/instrumentation-amqplib/src/utils.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,15 @@ export const CONNECTION_ATTRIBUTES: unique symbol = Symbol(
4545
'opentelemetry.amqplib.connection.attributes'
4646
);
4747

48+
export type InstrumentationConnection = amqp.Connection & {
49+
[CONNECTION_ATTRIBUTES]?: Attributes;
50+
};
4851
export type InstrumentationPublishChannel = (
4952
| amqp.Channel
5053
| amqp.ConfirmChannel
51-
) & { connection: { [CONNECTION_ATTRIBUTES]: Attributes } };
54+
) & { connection: InstrumentationConnection };
5255
export type InstrumentationConsumeChannel = amqp.Channel & {
53-
connection: { [CONNECTION_ATTRIBUTES]: Attributes };
56+
connection: InstrumentationConnection;
5457
[CHANNEL_SPANS_NOT_ENDED]?: {
5558
msg: amqp.ConsumeMessage;
5659
timeOfConsume: HrTime;
@@ -60,6 +63,9 @@ export type InstrumentationConsumeChannel = amqp.Channel & {
6063
export type InstrumentationMessage = amqp.Message & {
6164
[MESSAGE_STORED_SPAN]?: Span;
6265
};
66+
export type InstrumentationConsumeMessage = amqp.ConsumeMessage & {
67+
[MESSAGE_STORED_SPAN]?: Span;
68+
};
6369

6470
const IS_CONFIRM_CHANNEL_CONTEXT_KEY: symbol = createContextKey(
6571
'opentelemetry.amqplib.channel.is-confirm-channel'
@@ -117,7 +123,7 @@ const extractConnectionAttributeOrLog = (
117123
};
118124

119125
export const getConnectionAttributesFromServer = (
120-
conn: amqp.Connection['connection']
126+
conn: amqp.Connection
121127
): Attributes => {
122128
const product = conn.serverProperties.product?.toLowerCase?.();
123129
if (product) {

packages/instrumentation-amqplib/test/amqplib-promise.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ const CHANNEL_CLOSED_IN_TEST = Symbol(
6868
);
6969

7070
describe('amqplib instrumentation promise model', () => {
71-
let conn: amqp.Connection;
71+
let conn: amqp.ChannelModel;
7272
before(async function () {
7373
if (!shouldTest) {
7474
this.skip();

packages/instrumentation-amqplib/test/utils.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { rabbitMqUrl } from './config';
3333

3434
describe('utils', () => {
3535
describe('getConnectionAttributesFromServer', () => {
36-
let conn: amqp.Connection;
36+
let conn: amqp.ChannelModel;
3737
before(async function () {
3838
if (!shouldTest) {
3939
this.skip();

packages/instrumentation-aws-sdk/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ aws-sdk instrumentation has few options available to choose from. You can set th
5353
| `preRequestHook` | `AwsSdkRequestCustomAttributeFunction` | Hook called before request send, which allow to add custom attributes to span. |
5454
| `responseHook` | `AwsSdkResponseCustomAttributeFunction` | Hook for adding custom attributes when response is received from aws. |
5555
| `exceptionHook` | `AwsSdkExceptionCustomAttributeFunction` | Hook for adding custom attributes when exception is received from aws. |
56-
| `sqsProcessHook` | `AwsSdkSqsProcessCustomAttributeFunction` | Hook called after starting sqs `process` span (for each sqs received message), which allow to add custom attributes to it. |
5756
| `suppressInternalInstrumentation` | `boolean` | Most aws operation use http requests under the hood. Set this to `true` to hide all underlying http spans. |
5857
| `sqsExtractContextPropagationFromPayload` | `boolean` | Will parse and extract context propagation headers from SQS Payload, false by default. [When should it be used?](./doc/sns.md#integration-with-sqs) |
5958
| `dynamoDBStatementSerializer` | `AwsSdkDynamoDBStatementSerializer` | AWS SDK instrumentation will serialize DynamoDB commands to the `db.statement` attribute using the specified function. Defaults to using a serializer that returns `undefined`. |
Lines changed: 5 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,19 @@
11
# SQS
22

3-
SQS is amazon's managed message queue. Thus, it should follow the [OpenTelemetry specification for Messaging systems](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md).
3+
SQS is Amazon's managed message queue. Thus, it should follow the [OpenTelemetry specification for Messaging systems](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/).
44

55
## Specific trace semantic
66

77
The following methods are automatically enhanced:
88

99
### sendMessage / sendMessageBatch
1010

11-
- [Messaging Attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#messaging-attributes) are added by this instrumentation according to the spec.
11+
- [Messaging Attributes](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes) are added by this instrumentation according to the spec.
1212
- OpenTelemetry trace context is injected as SQS MessageAttributes, so the service receiving the message can link cascading spans to the trace which created the message.
1313

1414
### receiveMessage
1515

16-
- [Messaging Attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#messaging-attributes) are added by this instrumentation according to the spec.
17-
- Additional "processing spans" are created for each message received by the application.
18-
If an application invoked `receiveMessage`, and received a 10 messages batch, a single `messaging.operation` = `receive` span will be created for the `receiveMessage` operation, and 10 `messaging.operation` = `process` spans will be created, one for each message.
19-
Those processing spans are created by the library. This behavior is partially implemented, [See discussion below](#processing-spans).
20-
- Sets the inter process context correctly, so that additional spans created through the process will be linked to parent spans correctly.
21-
This behavior is partially implemented, [See discussion below](#processing-spans).
16+
- [Messaging Attributes](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes) are added by this instrumentation according to the spec.
17+
- Sets the inter process context correctly, so that additional spans created through the process will be linked to parent spans correctly.
18+
When multiple messages are received, the instrumentation will attach spank links to the receiving span containing the trace context and message ID of each message.
2219
- Extract trace context from SQS MessageAttributes, and set span's `parent` and `links` correctly according to the spec.
23-
24-
#### Processing Spans
25-
26-
See GH issue [here](https://github.com/open-telemetry/opentelemetry-js-contrib/issues/707)
27-
28-
According to OpenTelemetry specification (and to reasonable expectation for trace structure), user of this library would expect to see one span for the operation of receiving messages batch from SQS, and then, **for each message**, a span with it's own sub-tree for the processing of this specific message.
29-
30-
For example, if a `receiveMessages` returned 2 messages:
31-
32-
- `msg1` resulting in storing something to a DB.
33-
- `msg2` resulting in calling an external HTTP endpoint.
34-
35-
This will result in a creating a DB span that would be the child of `msg1` process span, and an HTTP span that would be the child of `msg2` process span (in opposed to mixing all those operations under the single `receive` span, or start a new trace for each of them).
36-
37-
Unfortunately, this is not so easy to implement in JS:
38-
39-
1. The SDK is calling a single callback for the messages batch, and it's not straightforward to understand when each individual message processing starts and ends (and set the context correctly for cascading spans).
40-
2. If async/await is used, context can be lost when returning data from async functions, for example:
41-
42-
```js
43-
async function asyncRecv() {
44-
const data = await sqs.receiveMessage(recvParams).promise();
45-
// context of receiveMessage is set here
46-
return data;
47-
}
48-
49-
async function poll() {
50-
const result = await asyncRecv();
51-
// context is lost when asyncRecv returns. following spans are created with root context.
52-
await Promise.all(
53-
result.Messages.map((message) => this.processMessage(message))
54-
);
55-
}
56-
```
57-
58-
Current implementation partially solves this issue by patching the `map` \ `forEach` \ `Filter` functions on the `Messages` array of `receiveMessage` result. This handles issues like the one above, but will not handle situations where the processing is done in other patterns (multiple map\forEach calls, index access to the array, other array operations, etc). This is currently an open issue in the instrumentation.
59-
60-
User can add custom attributes to the `process` span, by setting a function to `sqsProcessHook` in instrumentation config. For example:
61-
62-
```js
63-
awsInstrumentationConfig = {
64-
sqsProcessHook: (span, message) => {
65-
span.setAttribute("sqs.receipt_handle", message.params?.ReceiptHandle);
66-
},
67-
};
68-
```

packages/instrumentation-aws-sdk/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
"dependencies": {
5151
"@opentelemetry/core": "^2.0.0",
5252
"@opentelemetry/instrumentation": "^0.203.0",
53-
"@opentelemetry/propagation-utils": "^0.31.3",
5453
"@opentelemetry/semantic-conventions": "^1.34.0"
5554
},
5655
"devDependencies": {

packages/instrumentation-aws-sdk/src/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ export type {
2121
AwsSdkRequestHookInformation,
2222
AwsSdkResponseCustomAttributeFunction,
2323
AwsSdkResponseHookInformation,
24-
AwsSdkSqsProcessCustomAttributeFunction,
25-
AwsSdkSqsProcessHookInformation,
2624
CommandInput,
2725
NormalizedRequest,
2826
NormalizedResponse,

0 commit comments

Comments
 (0)